Blob Blame History Raw
diff --git a/src/condor_daemon_client/dc_startd.cpp b/src/condor_daemon_client/dc_startd.cpp
index 7261c4a..09a2689 100644
--- a/src/condor_daemon_client/dc_startd.cpp
+++ b/src/condor_daemon_client/dc_startd.cpp
@@ -51,7 +51,7 @@ DCStartd::DCStartd( const char* tName, const char* tPool, const char* tAddr,
 	}
 }
 
-DCStartd::DCStartd( ClassAd *ad, const char *tPool )
+DCStartd::DCStartd( const ClassAd *ad, const char *tPool )
 	: Daemon(ad,DT_STARTD,tPool),
 	  claim_id(NULL)
 {
diff --git a/src/condor_daemon_client/dc_startd.h b/src/condor_daemon_client/dc_startd.h
index c5f3e89..ff20892 100644
--- a/src/condor_daemon_client/dc_startd.h
+++ b/src/condor_daemon_client/dc_startd.h
@@ -49,7 +49,7 @@ public:
 	DCStartd( const char* const name, const char* const pool,
 			  const char* const addr, const char* const id );
 
-	DCStartd( ClassAd *ad, const char *pool = NULL );
+	DCStartd( const ClassAd *ad, const char *pool = NULL );
 
 		/// Destructor.
 	~DCStartd();
diff --git a/src/defrag/defrag.cpp b/src/defrag/defrag.cpp
index 26aec0a..8710b5d 100644
--- a/src/defrag/defrag.cpp
+++ b/src/defrag/defrag.cpp
@@ -185,6 +185,8 @@ void Defrag::config()
 		}
 	}
 
+	m_can_cancel = param_boolean("DEFRAG_CAN_CANCEL", true);
+
 	param(m_defrag_name,"DEFRAG_NAME");
 
 	int stats_quantum = m_polling_interval;
@@ -487,8 +489,17 @@ void Defrag::poll()
 	int num_whole_machines = countMachines(m_whole_machine_expr.c_str(),"DEFRAG_WHOLE_MACHINE_EXPR",&whole_machines);
 	m_stats.WholeMachines = num_whole_machines;
 
+	MachineSet draining_whole_machines;
+	std::stringstream draining_whole_machines_ss;
+	draining_whole_machines_ss << m_whole_machine_expr << " && Draining && Offline=!=True";
+	int num_draining_whole_machines = countMachines(draining_whole_machines_ss.str().c_str(),
+		"<DEFRAG_WHOLE_MACHINE_EXPR Draining>", &draining_whole_machines);
+
 	dprintf(D_ALWAYS,"There are currently %d draining and %d whole machines.\n",
 			num_draining,num_whole_machines);
+	if (num_draining_whole_machines)
+		dprintf(D_ALWAYS, "Of the %d whole machines, %d are in the draining state.\n",
+			num_whole_machines, num_draining_whole_machines);
 
 	queryDrainingCost();
 
@@ -548,8 +559,7 @@ void Defrag::poll()
 
 	ClassAdList startdAds;
 	std::string requirements;
-	sprintf(requirements,"(%s) && Draining =!= true",m_defrag_requirements.c_str());
-	if( !queryMachines(requirements.c_str(),"DEFRAG_REQUIREMENTS",startdAds) ) {
+	if( !queryMachines(m_defrag_requirements.c_str(),"DEFRAG_REQUIREMENTS",startdAds) ) {
 		dprintf(D_ALWAYS,"Doing nothing, because the query to select machines matching DEFRAG_REQUIREMENTS failed.\n");
 		return;
 	}
@@ -561,12 +571,26 @@ void Defrag::poll()
 	int num_drained = 0;
 	ClassAd *startd_ad;
 	MachineSet machines_done;
+	MachineSet draining_machines_done;
 	while( (startd_ad=startdAds.Next()) ) {
 		std::string machine;
 		std::string name;
 		startd_ad->LookupString(ATTR_NAME,name);
 		slotNameToDaemonName(name,machine);
 
+		if( !draining_machines_done.count(machine) && draining_whole_machines.count(machine) ) {
+			cancel_drain(*startd_ad);
+			draining_machines_done.insert(machine);
+			continue;
+		}
+
+		// Do not consider slots which are already draining.
+		bool startd_currently_draining = false;
+		startd_ad->LookupBool("Draining", startd_currently_draining);
+		if( startd_currently_draining ) {
+			continue;
+		}
+
 		if( machines_done.count(machine) ) {
 			dprintf(D_FULLDEBUG,
 					"Skipping %s: already attempted to drain %s in this cycle.\n",
@@ -581,14 +605,13 @@ void Defrag::poll()
 			continue;
 		}
 
-		if( drain(startd_ad) ) {
+		if( (num_drained++ < num_to_drain) && drain(*startd_ad) ) {
 			machines_done.insert(machine);
 
-			if( ++num_drained >= num_to_drain ) {
+			if( num_drained >= num_to_drain ) {
 				dprintf(D_ALWAYS,
 						"Drained maximum number of machines allowed in this cycle (%d).\n",
 						num_to_drain);
-				break;
 			}
 		}
 	}
@@ -601,26 +624,24 @@ void Defrag::poll()
 }
 
 bool
-Defrag::drain(ClassAd *startd_ad)
+Defrag::drain(const ClassAd &startd_ad)
 {
-	ASSERT( startd_ad );
-
 	std::string name;
-	startd_ad->LookupString(ATTR_NAME,name);
+	startd_ad.LookupString(ATTR_NAME,name);
 
 	dprintf(D_ALWAYS,"Initiating %s draining of %s.\n",
 			m_draining_schedule_str.c_str(),name.c_str());
 
-	DCStartd startd( startd_ad );
+	DCStartd startd( &startd_ad );
 
 	int graceful_completion = 0;
-	startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_COMPLETION,graceful_completion);
+	startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_COMPLETION,graceful_completion);
 	int quick_completion = 0;
-	startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_COMPLETION,quick_completion);
+	startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_COMPLETION,quick_completion);
 	int graceful_badput = 0;
-	startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_BADPUT,graceful_badput);
+	startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_BADPUT,graceful_badput);
 	int quick_badput = 0;
-	startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_BADPUT,quick_badput);
+	startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_BADPUT,quick_badput);
 
 	time_t now = time(NULL);
 	std::string draining_check_expr;
@@ -659,6 +680,27 @@ Defrag::drain(ClassAd *startd_ad)
 	return true;
 }
 
+bool
+Defrag::cancel_drain(const ClassAd &startd_ad)
+{
+
+	std::string name;
+	startd_ad.LookupString(ATTR_NAME,name);
+
+	dprintf(D_ALWAYS,"Initiating %s draining of %s.\n",
+		m_draining_schedule_str.c_str(),name.c_str());
+
+	DCStartd startd( &startd_ad );
+
+	bool rval = startd.cancelDrainJobs( NULL );
+	if ( rval ) {
+		dprintf(D_FULLDEBUG, "Sent request to cancel draining on %s\n", startd.name());
+	} else {
+		dprintf(D_ALWAYS, "Unable to cancel draining on %s: %s\n", startd.name(), startd.error());
+	}
+	return rval;
+}
+
 void
 Defrag::publish(ClassAd *ad)
 {
diff --git a/src/defrag/defrag.h b/src/defrag/defrag.h
index 8c7fd51..909b569 100644
--- a/src/defrag/defrag.h
+++ b/src/defrag/defrag.h
@@ -40,11 +40,11 @@ class Defrag: public Service {
 	void stop();
 
 	void poll(); // do the periodic policy evaluation
-	bool drain(ClassAd *startd_ad);
 
 	typedef std::set< std::string > MachineSet;
 
  private:
+
 	int m_polling_interval; // delay between evaluations of the policy
 	int m_polling_timer;
 	double m_draining_per_hour;
@@ -58,6 +58,7 @@ class Defrag: public Service {
 	ClassAd m_rank_ad;
 	int m_draining_schedule;
 	std::string m_draining_schedule_str;
+	bool m_can_cancel; // Whether condor_defrag can also cancel draining early.
 
 	time_t m_last_poll;
 
@@ -70,6 +71,9 @@ class Defrag: public Service {
 	ClassAd m_public_ad;
 	DefragStats m_stats;
 
+	bool drain(const ClassAd &startd_ad);
+	bool cancel_drain(const ClassAd &startd_ad);
+
 	void validateExpr(char const *constraint,char const *constraint_source);
 	bool queryMachines(char const *constraint,char const *constraint_source,ClassAdList &startdAds);