Blame spark-v0.9.0-0007-Removed-mesos.patch

William Benton 42acb0c
From c321a448cd80331df07cda0f5f20955b3b148aac Mon Sep 17 00:00:00 2001
William Benton 42acb0c
From: William Benton <willb@redhat.com>
William Benton 42acb0c
Date: Thu, 27 Feb 2014 17:05:12 -0600
William Benton 42acb0c
Subject: [PATCH 7/7] Removed mesos
William Benton 42acb0c
William Benton 42acb0c
---
William Benton 42acb0c
 .../main/scala/org/apache/spark/SparkContext.scala |  15 -
William Benton 42acb0c
 .../main/scala/org/apache/spark/TaskState.scala    |  21 --
William Benton 42acb0c
 .../spark/executor/MesosExecutorBackend.scala      | 104 -------
William Benton 42acb0c
 .../mesos/CoarseMesosSchedulerBackend.scala        | 289 -----------------
William Benton 42acb0c
 .../cluster/mesos/MesosSchedulerBackend.scala      | 344 ---------------------
William Benton 42acb0c
 5 files changed, 773 deletions(-)
William Benton 42acb0c
 delete mode 100644 core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
William Benton 42acb0c
 delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
William Benton 42acb0c
 delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
William Benton 42acb0c
William Benton 42acb0c
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
William Benton 42acb0c
index 566472e..f3b2941 100644
William Benton 42acb0c
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
William Benton 42acb0c
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
William Benton 42acb0c
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
William Benton 42acb0c
   TextInputFormat}
William Benton 42acb0c
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
William Benton 42acb0c
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
William Benton 42acb0c
-import org.apache.mesos.MesosNativeLibrary
William Benton 42acb0c
 
William Benton 42acb0c
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
William Benton 42acb0c
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
William Benton 42acb0c
@@ -44,7 +43,6 @@ import org.apache.spark.rdd._
William Benton 42acb0c
 import org.apache.spark.scheduler._
William Benton 42acb0c
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
William Benton 42acb0c
   SparkDeploySchedulerBackend, SimrSchedulerBackend}
William Benton 42acb0c
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
William Benton 42acb0c
 import org.apache.spark.scheduler.local.LocalBackend
William Benton 42acb0c
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
William Benton 42acb0c
 import org.apache.spark.ui.SparkUI
William Benton 42acb0c
@@ -1281,19 +1279,6 @@ object SparkContext {
William Benton 42acb0c
         scheduler.initialize(backend)
William Benton 42acb0c
         scheduler
William Benton 42acb0c
 
William Benton 42acb0c
-      case mesosUrl @ MESOS_REGEX(_) =>
William Benton 42acb0c
-        MesosNativeLibrary.load()
William Benton 42acb0c
-        val scheduler = new TaskSchedulerImpl(sc)
William Benton 42acb0c
-        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
William Benton 42acb0c
-        val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
William Benton 42acb0c
-        val backend = if (coarseGrained) {
William Benton 42acb0c
-          new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
William Benton 42acb0c
-        } else {
William Benton 42acb0c
-          new MesosSchedulerBackend(scheduler, sc, url, appName)
William Benton 42acb0c
-        }
William Benton 42acb0c
-        scheduler.initialize(backend)
William Benton 42acb0c
-        scheduler
William Benton 42acb0c
-
William Benton 42acb0c
       case SIMR_REGEX(simrUrl) =>
William Benton 42acb0c
         val scheduler = new TaskSchedulerImpl(sc)
William Benton 42acb0c
         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
William Benton 42acb0c
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
William Benton 42acb0c
index 0bf1e4a..cdd8baf 100644
William Benton 42acb0c
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
William Benton 42acb0c
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
William Benton 42acb0c
@@ -17,8 +17,6 @@
William Benton 42acb0c
 
William Benton 42acb0c
 package org.apache.spark
William Benton 42acb0c
 
William Benton 42acb0c
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
William Benton 42acb0c
-
William Benton 42acb0c
 private[spark] object TaskState extends Enumeration {
William Benton 42acb0c
 
William Benton 42acb0c
   val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
William Benton 42acb0c
@@ -28,23 +26,4 @@ private[spark] object TaskState extends Enumeration {
William Benton 42acb0c
   type TaskState = Value
William Benton 42acb0c
 
William Benton 42acb0c
   def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
William Benton 42acb0c
-
William Benton 42acb0c
-  def toMesos(state: TaskState): MesosTaskState = state match {
William Benton 42acb0c
-    case LAUNCHING => MesosTaskState.TASK_STARTING
William Benton 42acb0c
-    case RUNNING => MesosTaskState.TASK_RUNNING
William Benton 42acb0c
-    case FINISHED => MesosTaskState.TASK_FINISHED
William Benton 42acb0c
-    case FAILED => MesosTaskState.TASK_FAILED
William Benton 42acb0c
-    case KILLED => MesosTaskState.TASK_KILLED
William Benton 42acb0c
-    case LOST => MesosTaskState.TASK_LOST
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
William Benton 42acb0c
-    case MesosTaskState.TASK_STAGING => LAUNCHING
William Benton 42acb0c
-    case MesosTaskState.TASK_STARTING => LAUNCHING
William Benton 42acb0c
-    case MesosTaskState.TASK_RUNNING => RUNNING
William Benton 42acb0c
-    case MesosTaskState.TASK_FINISHED => FINISHED
William Benton 42acb0c
-    case MesosTaskState.TASK_FAILED => FAILED
William Benton 42acb0c
-    case MesosTaskState.TASK_KILLED => KILLED
William Benton 42acb0c
-    case MesosTaskState.TASK_LOST => LOST
William Benton 42acb0c
-  }
William Benton 42acb0c
 }
William Benton 42acb0c
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
William Benton 42acb0c
deleted file mode 100644
William Benton 42acb0c
index b56d8c9..0000000
William Benton 42acb0c
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
William Benton 42acb0c
+++ /dev/null
William Benton 42acb0c
@@ -1,104 +0,0 @@
William Benton 42acb0c
-/*
William Benton 42acb0c
- * Licensed to the Apache Software Foundation (ASF) under one or more
William Benton 42acb0c
- * contributor license agreements.  See the NOTICE file distributed with
William Benton 42acb0c
- * this work for additional information regarding copyright ownership.
William Benton 42acb0c
- * The ASF licenses this file to You under the Apache License, Version 2.0
William Benton 42acb0c
- * (the "License"); you may not use this file except in compliance with
William Benton 42acb0c
- * the License.  You may obtain a copy of the License at
William Benton 42acb0c
- *
William Benton 42acb0c
- *    http://www.apache.org/licenses/LICENSE-2.0
William Benton 42acb0c
- *
William Benton 42acb0c
- * Unless required by applicable law or agreed to in writing, software
William Benton 42acb0c
- * distributed under the License is distributed on an "AS IS" BASIS,
William Benton 42acb0c
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
William Benton 42acb0c
- * See the License for the specific language governing permissions and
William Benton 42acb0c
- * limitations under the License.
William Benton 42acb0c
- */
William Benton 42acb0c
-
William Benton 42acb0c
-package org.apache.spark.executor
William Benton 42acb0c
-
William Benton 42acb0c
-import java.nio.ByteBuffer
William Benton 42acb0c
-
William Benton 42acb0c
-import com.google.protobuf.ByteString
William Benton 42acb0c
-
William Benton 42acb0c
-import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver}
William Benton 42acb0c
-import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
William Benton 42acb0c
-
William Benton 42acb0c
-import org.apache.spark.Logging
William Benton 42acb0c
-import org.apache.spark.TaskState
William Benton 42acb0c
-import org.apache.spark.TaskState.TaskState
William Benton 42acb0c
-import org.apache.spark.util.Utils
William Benton 42acb0c
-
William Benton 42acb0c
-
William Benton 42acb0c
-private[spark] class MesosExecutorBackend
William Benton 42acb0c
-  extends MesosExecutor
William Benton 42acb0c
-  with ExecutorBackend
William Benton 42acb0c
-  with Logging {
William Benton 42acb0c
-
William Benton 42acb0c
-  var executor: Executor = null
William Benton 42acb0c
-  var driver: ExecutorDriver = null
William Benton 42acb0c
-
William Benton 42acb0c
-  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
William Benton 42acb0c
-    val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
William Benton 42acb0c
-    driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
William Benton 42acb0c
-      .setTaskId(mesosTaskId)
William Benton 42acb0c
-      .setState(TaskState.toMesos(state))
William Benton 42acb0c
-      .setData(ByteString.copyFrom(data))
William Benton 42acb0c
-      .build())
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def registered(
William Benton 42acb0c
-      driver: ExecutorDriver,
William Benton 42acb0c
-      executorInfo: ExecutorInfo,
William Benton 42acb0c
-      frameworkInfo: FrameworkInfo,
William Benton 42acb0c
-      slaveInfo: SlaveInfo) {
William Benton 42acb0c
-    logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
William Benton 42acb0c
-    this.driver = driver
William Benton 42acb0c
-    val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
William Benton 42acb0c
-    executor = new Executor(
William Benton 42acb0c
-      executorInfo.getExecutorId.getValue,
William Benton 42acb0c
-      slaveInfo.getHostname,
William Benton 42acb0c
-      properties)
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
William Benton 42acb0c
-    val taskId = taskInfo.getTaskId.getValue.toLong
William Benton 42acb0c
-    if (executor == null) {
William Benton 42acb0c
-      logError("Received launchTask but executor was null")
William Benton 42acb0c
-    } else {
William Benton 42acb0c
-      executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def error(d: ExecutorDriver, message: String) {
William Benton 42acb0c
-    logError("Error from Mesos: " + message)
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def killTask(d: ExecutorDriver, t: TaskID) {
William Benton 42acb0c
-    if (executor == null) {
William Benton 42acb0c
-      logError("Received KillTask but executor was null")
William Benton 42acb0c
-    } else {
William Benton 42acb0c
-      executor.killTask(t.getValue.toLong)
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def disconnected(d: ExecutorDriver) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def shutdown(d: ExecutorDriver) {}
William Benton 42acb0c
-}
William Benton 42acb0c
-
William Benton 42acb0c
-/**
William Benton 42acb0c
- * Entry point for Mesos executor.
William Benton 42acb0c
- */
William Benton 42acb0c
-private[spark] object MesosExecutorBackend {
William Benton 42acb0c
-  def main(args: Array[String]) {
William Benton 42acb0c
-    MesosNativeLibrary.load()
William Benton 42acb0c
-    // Create a new Executor and start it running
William Benton 42acb0c
-    val runner = new MesosExecutorBackend()
William Benton 42acb0c
-    new MesosExecutorDriver(runner).run()
William Benton 42acb0c
-  }
William Benton 42acb0c
-}
William Benton 42acb0c
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
William Benton 42acb0c
deleted file mode 100644
William Benton 42acb0c
index c27049b..0000000
William Benton 42acb0c
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
William Benton 42acb0c
+++ /dev/null
William Benton 42acb0c
@@ -1,289 +0,0 @@
William Benton 42acb0c
-/*
William Benton 42acb0c
- * Licensed to the Apache Software Foundation (ASF) under one or more
William Benton 42acb0c
- * contributor license agreements.  See the NOTICE file distributed with
William Benton 42acb0c
- * this work for additional information regarding copyright ownership.
William Benton 42acb0c
- * The ASF licenses this file to You under the Apache License, Version 2.0
William Benton 42acb0c
- * (the "License"); you may not use this file except in compliance with
William Benton 42acb0c
- * the License.  You may obtain a copy of the License at
William Benton 42acb0c
- *
William Benton 42acb0c
- *    http://www.apache.org/licenses/LICENSE-2.0
William Benton 42acb0c
- *
William Benton 42acb0c
- * Unless required by applicable law or agreed to in writing, software
William Benton 42acb0c
- * distributed under the License is distributed on an "AS IS" BASIS,
William Benton 42acb0c
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
William Benton 42acb0c
- * See the License for the specific language governing permissions and
William Benton 42acb0c
- * limitations under the License.
William Benton 42acb0c
- */
William Benton 42acb0c
-
William Benton 42acb0c
-package org.apache.spark.scheduler.cluster.mesos
William Benton 42acb0c
-
William Benton 42acb0c
-import java.io.File
William Benton 42acb0c
-import java.util.{ArrayList => JArrayList, List => JList}
William Benton 42acb0c
-import java.util.Collections
William Benton 42acb0c
-
William Benton 42acb0c
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
William Benton 42acb0c
-import scala.collection.JavaConversions._
William Benton 42acb0c
-
William Benton 42acb0c
-import com.google.protobuf.ByteString
William Benton 42acb0c
-import org.apache.mesos.{Scheduler => MScheduler}
William Benton 42acb0c
-import org.apache.mesos._
William Benton 42acb0c
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
William Benton 42acb0c
-
William Benton 42acb0c
-import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
William Benton 42acb0c
-import org.apache.spark.scheduler.TaskSchedulerImpl
William Benton 42acb0c
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
William Benton 42acb0c
-
William Benton 42acb0c
-/**
William Benton 42acb0c
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
William Benton 42acb0c
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
William Benton 42acb0c
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
William Benton 42acb0c
- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
William Benton 42acb0c
- * latency.
William Benton 42acb0c
- *
William Benton 42acb0c
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
William Benton 42acb0c
- * remove this.
William Benton 42acb0c
- */
William Benton 42acb0c
-private[spark] class CoarseMesosSchedulerBackend(
William Benton 42acb0c
-    scheduler: TaskSchedulerImpl,
William Benton 42acb0c
-    sc: SparkContext,
William Benton 42acb0c
-    master: String,
William Benton 42acb0c
-    appName: String)
William Benton 42acb0c
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
William Benton 42acb0c
-  with MScheduler
William Benton 42acb0c
-  with Logging {
William Benton 42acb0c
-
William Benton 42acb0c
-  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
William Benton 42acb0c
-
William Benton 42acb0c
-  // Lock used to wait for scheduler to be registered
William Benton 42acb0c
-  var isRegistered = false
William Benton 42acb0c
-  val registeredLock = new Object()
William Benton 42acb0c
-
William Benton 42acb0c
-  // Driver for talking to Mesos
William Benton 42acb0c
-  var driver: SchedulerDriver = null
William Benton 42acb0c
-
William Benton 42acb0c
-  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
William Benton 42acb0c
-  val maxCores = conf.get("spark.cores.max",  Int.MaxValue.toString).toInt
William Benton 42acb0c
-
William Benton 42acb0c
-  // Cores we have acquired with each Mesos task ID
William Benton 42acb0c
-  val coresByTaskId = new HashMap[Int, Int]
William Benton 42acb0c
-  var totalCoresAcquired = 0
William Benton 42acb0c
-
William Benton 42acb0c
-  val slaveIdsWithExecutors = new HashSet[String]
William Benton 42acb0c
-
William Benton 42acb0c
-  val taskIdToSlaveId = new HashMap[Int, String]
William Benton 42acb0c
-  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
William Benton 42acb0c
-
William Benton 42acb0c
-  val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
William Benton 42acb0c
-    "Spark home is not set; set it through the spark.home system " +
William Benton 42acb0c
-    "property, the SPARK_HOME environment variable or the SparkContext constructor"))
William Benton 42acb0c
-
William Benton 42acb0c
-  val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
William Benton 42acb0c
-
William Benton 42acb0c
-  var nextMesosTaskId = 0
William Benton 42acb0c
-
William Benton 42acb0c
-  def newMesosTaskId(): Int = {
William Benton 42acb0c
-    val id = nextMesosTaskId
William Benton 42acb0c
-    nextMesosTaskId += 1
William Benton 42acb0c
-    id
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def start() {
William Benton 42acb0c
-    super.start()
William Benton 42acb0c
-
William Benton 42acb0c
-    synchronized {
William Benton 42acb0c
-      new Thread("CoarseMesosSchedulerBackend driver") {
William Benton 42acb0c
-        setDaemon(true)
William Benton 42acb0c
-        override def run() {
William Benton 42acb0c
-          val scheduler = CoarseMesosSchedulerBackend.this
William Benton 42acb0c
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
William Benton 42acb0c
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
William Benton 42acb0c
-          try { {
William Benton 42acb0c
-            val ret = driver.run()
William Benton 42acb0c
-            logInfo("driver.run() returned with code " + ret)
William Benton 42acb0c
-          }
William Benton 42acb0c
-          } catch {
William Benton 42acb0c
-            case e: Exception => logError("driver.run() failed", e)
William Benton 42acb0c
-          }
William Benton 42acb0c
-        }
William Benton 42acb0c
-      }.start()
William Benton 42acb0c
-
William Benton 42acb0c
-      waitForRegister()
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
William Benton 42acb0c
-    val environment = Environment.newBuilder()
William Benton 42acb0c
-    sc.executorEnvs.foreach { case (key, value) =>
William Benton 42acb0c
-      environment.addVariables(Environment.Variable.newBuilder()
William Benton 42acb0c
-        .setName(key)
William Benton 42acb0c
-        .setValue(value)
William Benton 42acb0c
-        .build())
William Benton 42acb0c
-    }
William Benton 42acb0c
-    val command = CommandInfo.newBuilder()
William Benton 42acb0c
-      .setEnvironment(environment)
William Benton 42acb0c
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
William Benton 42acb0c
-      conf.get("spark.driver.host"),
William Benton 42acb0c
-      conf.get("spark.driver.port"),
William Benton 42acb0c
-      CoarseGrainedSchedulerBackend.ACTOR_NAME)
William Benton 42acb0c
-    val uri = conf.get("spark.executor.uri", null)
William Benton 42acb0c
-    if (uri == null) {
William Benton 42acb0c
-      val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
William Benton 42acb0c
-      command.setValue(
William Benton 42acb0c
-        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
William Benton 42acb0c
-          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
William Benton 42acb0c
-    } else {
William Benton 42acb0c
-      // Grab everything to the first '.'. We'll use that and '*' to
William Benton 42acb0c
-      // glob the directory "correctly".
William Benton 42acb0c
-      val basename = uri.split('/').last.split('.').head
William Benton 42acb0c
-      command.setValue(
William Benton 42acb0c
-        "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
William Benton 42acb0c
-          .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
William Benton 42acb0c
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
William Benton 42acb0c
-    }
William Benton 42acb0c
-    command.build()
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
William Benton 42acb0c
-    logInfo("Registered as framework ID " + frameworkId.getValue)
William Benton 42acb0c
-    registeredLock.synchronized {
William Benton 42acb0c
-      isRegistered = true
William Benton 42acb0c
-      registeredLock.notifyAll()
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  def waitForRegister() {
William Benton 42acb0c
-    registeredLock.synchronized {
William Benton 42acb0c
-      while (!isRegistered) {
William Benton 42acb0c
-        registeredLock.wait()
William Benton 42acb0c
-      }
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def disconnected(d: SchedulerDriver) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  /**
William Benton 42acb0c
-   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
William Benton 42acb0c
-   * unless we've already launched more than we wanted to.
William Benton 42acb0c
-   */
William Benton 42acb0c
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
William Benton 42acb0c
-    synchronized {
William Benton 42acb0c
-      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
William Benton 42acb0c
-
William Benton 42acb0c
-      for (offer <- offers) {
William Benton 42acb0c
-        val slaveId = offer.getSlaveId.toString
William Benton 42acb0c
-        val mem = getResource(offer.getResourcesList, "mem")
William Benton 42acb0c
-        val cpus = getResource(offer.getResourcesList, "cpus").toInt
William Benton 42acb0c
-        if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
William Benton 42acb0c
-            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
William Benton 42acb0c
-            !slaveIdsWithExecutors.contains(slaveId)) {
William Benton 42acb0c
-          // Launch an executor on the slave
William Benton 42acb0c
-          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
William Benton 42acb0c
-          totalCoresAcquired += cpusToUse
William Benton 42acb0c
-          val taskId = newMesosTaskId()
William Benton 42acb0c
-          taskIdToSlaveId(taskId) = slaveId
William Benton 42acb0c
-          slaveIdsWithExecutors += slaveId
William Benton 42acb0c
-          coresByTaskId(taskId) = cpusToUse
William Benton 42acb0c
-          val task = MesosTaskInfo.newBuilder()
William Benton 42acb0c
-            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
William Benton 42acb0c
-            .setSlaveId(offer.getSlaveId)
William Benton 42acb0c
-            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
William Benton 42acb0c
-            .setName("Task " + taskId)
William Benton 42acb0c
-            .addResources(createResource("cpus", cpusToUse))
William Benton 42acb0c
-            .addResources(createResource("mem", sc.executorMemory))
William Benton 42acb0c
-            .build()
William Benton 42acb0c
-          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
William Benton 42acb0c
-        } else {
William Benton 42acb0c
-          // Filter it out
William Benton 42acb0c
-          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
William Benton 42acb0c
-        }
William Benton 42acb0c
-      }
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
William Benton 42acb0c
-  private def getResource(res: JList[Resource], name: String): Double = {
William Benton 42acb0c
-    for (r <- res if r.getName == name) {
William Benton 42acb0c
-      return r.getScalar.getValue
William Benton 42acb0c
-    }
William Benton 42acb0c
-    // If we reached here, no resource with the required name was present
William Benton 42acb0c
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  /** Build a Mesos resource protobuf object */
William Benton 42acb0c
-  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
William Benton 42acb0c
-    Resource.newBuilder()
William Benton 42acb0c
-      .setName(resourceName)
William Benton 42acb0c
-      .setType(Value.Type.SCALAR)
William Benton 42acb0c
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
William Benton 42acb0c
-      .build()
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  /** Check whether a Mesos task state represents a finished task */
William Benton 42acb0c
-  private def isFinished(state: MesosTaskState) = {
William Benton 42acb0c
-    state == MesosTaskState.TASK_FINISHED ||
William Benton 42acb0c
-      state == MesosTaskState.TASK_FAILED ||
William Benton 42acb0c
-      state == MesosTaskState.TASK_KILLED ||
William Benton 42acb0c
-      state == MesosTaskState.TASK_LOST
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
William Benton 42acb0c
-    val taskId = status.getTaskId.getValue.toInt
William Benton 42acb0c
-    val state = status.getState
William Benton 42acb0c
-    logInfo("Mesos task " + taskId + " is now " + state)
William Benton 42acb0c
-    synchronized {
William Benton 42acb0c
-      if (isFinished(state)) {
William Benton 42acb0c
-        val slaveId = taskIdToSlaveId(taskId)
William Benton 42acb0c
-        slaveIdsWithExecutors -= slaveId
William Benton 42acb0c
-        taskIdToSlaveId -= taskId
William Benton 42acb0c
-        // Remove the cores we have remembered for this task, if it's in the hashmap
William Benton 42acb0c
-        for (cores <- coresByTaskId.get(taskId)) {
William Benton 42acb0c
-          totalCoresAcquired -= cores
William Benton 42acb0c
-          coresByTaskId -= taskId
William Benton 42acb0c
-        }
William Benton 42acb0c
-        // If it was a failure, mark the slave as failed for blacklisting purposes
William Benton 42acb0c
-        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
William Benton 42acb0c
-          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
William Benton 42acb0c
-          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
William Benton 42acb0c
-            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
William Benton 42acb0c
-                "is Spark installed on it?")
William Benton 42acb0c
-          }
William Benton 42acb0c
-        }
William Benton 42acb0c
-        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
William Benton 42acb0c
-      }
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def error(d: SchedulerDriver, message: String) {
William Benton 42acb0c
-    logError("Mesos error: " + message)
William Benton 42acb0c
-    scheduler.error(message)
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def stop() {
William Benton 42acb0c
-    super.stop()
William Benton 42acb0c
-    if (driver != null) {
William Benton 42acb0c
-      driver.stop()
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
William Benton 42acb0c
-    logInfo("Mesos slave lost: " + slaveId.getValue)
William Benton 42acb0c
-    synchronized {
William Benton 42acb0c
-      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
William Benton 42acb0c
-        // Note that the slave ID corresponds to the executor ID on that slave
William Benton 42acb0c
-        slaveIdsWithExecutors -= slaveId.getValue
William Benton 42acb0c
-        removeExecutor(slaveId.getValue, "Mesos slave lost")
William Benton 42acb0c
-      }
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
William Benton 42acb0c
-    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
William Benton 42acb0c
-    slaveLost(d, s)
William Benton 42acb0c
-  }
William Benton 42acb0c
-}
William Benton 42acb0c
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
William Benton 42acb0c
deleted file mode 100644
William Benton 42acb0c
index 4978148..0000000
William Benton 42acb0c
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
William Benton 42acb0c
+++ /dev/null
William Benton 42acb0c
@@ -1,344 +0,0 @@
William Benton 42acb0c
-/*
William Benton 42acb0c
- * Licensed to the Apache Software Foundation (ASF) under one or more
William Benton 42acb0c
- * contributor license agreements.  See the NOTICE file distributed with
William Benton 42acb0c
- * this work for additional information regarding copyright ownership.
William Benton 42acb0c
- * The ASF licenses this file to You under the Apache License, Version 2.0
William Benton 42acb0c
- * (the "License"); you may not use this file except in compliance with
William Benton 42acb0c
- * the License.  You may obtain a copy of the License at
William Benton 42acb0c
- *
William Benton 42acb0c
- *    http://www.apache.org/licenses/LICENSE-2.0
William Benton 42acb0c
- *
William Benton 42acb0c
- * Unless required by applicable law or agreed to in writing, software
William Benton 42acb0c
- * distributed under the License is distributed on an "AS IS" BASIS,
William Benton 42acb0c
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
William Benton 42acb0c
- * See the License for the specific language governing permissions and
William Benton 42acb0c
- * limitations under the License.
William Benton 42acb0c
- */
William Benton 42acb0c
-
William Benton 42acb0c
-package org.apache.spark.scheduler.cluster.mesos
William Benton 42acb0c
-
William Benton 42acb0c
-import java.io.File
William Benton 42acb0c
-import java.util.{ArrayList => JArrayList, List => JList}
William Benton 42acb0c
-import java.util.Collections
William Benton 42acb0c
-
William Benton 42acb0c
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
William Benton 42acb0c
-import scala.collection.JavaConversions._
William Benton 42acb0c
-
William Benton 42acb0c
-import com.google.protobuf.ByteString
William Benton 42acb0c
-import org.apache.mesos.{Scheduler => MScheduler}
William Benton 42acb0c
-import org.apache.mesos._
William Benton 42acb0c
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
William Benton 42acb0c
-
William Benton 42acb0c
-import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
William Benton 42acb0c
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
William Benton 42acb0c
-  TaskDescription, TaskSchedulerImpl, WorkerOffer}
William Benton 42acb0c
-import org.apache.spark.util.Utils
William Benton 42acb0c
-
William Benton 42acb0c
-/**
William Benton 42acb0c
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
William Benton 42acb0c
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
William Benton 42acb0c
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
William Benton 42acb0c
- */
William Benton 42acb0c
-private[spark] class MesosSchedulerBackend(
William Benton 42acb0c
-    scheduler: TaskSchedulerImpl,
William Benton 42acb0c
-    sc: SparkContext,
William Benton 42acb0c
-    master: String,
William Benton 42acb0c
-    appName: String)
William Benton 42acb0c
-  extends SchedulerBackend
William Benton 42acb0c
-  with MScheduler
William Benton 42acb0c
-  with Logging {
William Benton 42acb0c
-
William Benton 42acb0c
-  // Lock used to wait for scheduler to be registered
William Benton 42acb0c
-  var isRegistered = false
William Benton 42acb0c
-  val registeredLock = new Object()
William Benton 42acb0c
-
William Benton 42acb0c
-  // Driver for talking to Mesos
William Benton 42acb0c
-  var driver: SchedulerDriver = null
William Benton 42acb0c
-
William Benton 42acb0c
-  // Which slave IDs we have executors on
William Benton 42acb0c
-  val slaveIdsWithExecutors = new HashSet[String]
William Benton 42acb0c
-  val taskIdToSlaveId = new HashMap[Long, String]
William Benton 42acb0c
-
William Benton 42acb0c
-  // An ExecutorInfo for our tasks
William Benton 42acb0c
-  var execArgs: Array[Byte] = null
William Benton 42acb0c
-
William Benton 42acb0c
-  var classLoader: ClassLoader = null
William Benton 42acb0c
-
William Benton 42acb0c
-  override def start() {
William Benton 42acb0c
-    synchronized {
William Benton 42acb0c
-      classLoader = Thread.currentThread.getContextClassLoader
William Benton 42acb0c
-
William Benton 42acb0c
-      new Thread("MesosSchedulerBackend driver") {
William Benton 42acb0c
-        setDaemon(true)
William Benton 42acb0c
-        override def run() {
William Benton 42acb0c
-          val scheduler = MesosSchedulerBackend.this
William Benton 42acb0c
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
William Benton 42acb0c
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
William Benton 42acb0c
-          try {
William Benton 42acb0c
-            val ret = driver.run()
William Benton 42acb0c
-            logInfo("driver.run() returned with code " + ret)
William Benton 42acb0c
-          } catch {
William Benton 42acb0c
-            case e: Exception => logError("driver.run() failed", e)
William Benton 42acb0c
-          }
William Benton 42acb0c
-        }
William Benton 42acb0c
-      }.start()
William Benton 42acb0c
-
William Benton 42acb0c
-      waitForRegister()
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  def createExecutorInfo(execId: String): ExecutorInfo = {
William Benton 42acb0c
-    val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
William Benton 42acb0c
-      "Spark home is not set; set it through the spark.home system " +
William Benton 42acb0c
-      "property, the SPARK_HOME environment variable or the SparkContext constructor"))
William Benton 42acb0c
-    val environment = Environment.newBuilder()
William Benton 42acb0c
-    sc.executorEnvs.foreach { case (key, value) =>
William Benton 42acb0c
-      environment.addVariables(Environment.Variable.newBuilder()
William Benton 42acb0c
-        .setName(key)
William Benton 42acb0c
-        .setValue(value)
William Benton 42acb0c
-        .build())
William Benton 42acb0c
-    }
William Benton 42acb0c
-    val command = CommandInfo.newBuilder()
William Benton 42acb0c
-      .setEnvironment(environment)
William Benton 42acb0c
-    val uri = sc.conf.get("spark.executor.uri", null)
William Benton 42acb0c
-    if (uri == null) {
William Benton 42acb0c
-      command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
William Benton 42acb0c
-    } else {
William Benton 42acb0c
-      // Grab everything to the first '.'. We'll use that and '*' to
William Benton 42acb0c
-      // glob the directory "correctly".
William Benton 42acb0c
-      val basename = uri.split('/').last.split('.').head
William Benton 42acb0c
-      command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
William Benton 42acb0c
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
William Benton 42acb0c
-    }
William Benton 42acb0c
-    val memory = Resource.newBuilder()
William Benton 42acb0c
-      .setName("mem")
William Benton 42acb0c
-      .setType(Value.Type.SCALAR)
William Benton 42acb0c
-      .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
William Benton 42acb0c
-      .build()
William Benton 42acb0c
-    ExecutorInfo.newBuilder()
William Benton 42acb0c
-      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
William Benton 42acb0c
-      .setCommand(command)
William Benton 42acb0c
-      .setData(ByteString.copyFrom(createExecArg()))
William Benton 42acb0c
-      .addResources(memory)
William Benton 42acb0c
-      .build()
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  /**
William Benton 42acb0c
-   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
William Benton 42acb0c
-   * containing all the spark.* system properties in the form of (String, String) pairs.
William Benton 42acb0c
-   */
William Benton 42acb0c
-  private def createExecArg(): Array[Byte] = {
William Benton 42acb0c
-    if (execArgs == null) {
William Benton 42acb0c
-      val props = new HashMap[String, String]
William Benton 42acb0c
-      val iterator = System.getProperties.entrySet.iterator
William Benton 42acb0c
-      while (iterator.hasNext) {
William Benton 42acb0c
-        val entry = iterator.next
William Benton 42acb0c
-        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
William Benton 42acb0c
-        if (key.startsWith("spark.")) {
William Benton 42acb0c
-          props(key) = value
William Benton 42acb0c
-        }
William Benton 42acb0c
-      }
William Benton 42acb0c
-      // Serialize the map as an array of (String, String) pairs
William Benton 42acb0c
-      execArgs = Utils.serialize(props.toArray)
William Benton 42acb0c
-    }
William Benton 42acb0c
-    execArgs
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  private def setClassLoader(): ClassLoader = {
William Benton 42acb0c
-    val oldClassLoader = Thread.currentThread.getContextClassLoader
William Benton 42acb0c
-    Thread.currentThread.setContextClassLoader(classLoader)
William Benton 42acb0c
-    oldClassLoader
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  private def restoreClassLoader(oldClassLoader: ClassLoader) {
William Benton 42acb0c
-    Thread.currentThread.setContextClassLoader(oldClassLoader)
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
William Benton 42acb0c
-    val oldClassLoader = setClassLoader()
William Benton 42acb0c
-    try {
William Benton 42acb0c
-      logInfo("Registered as framework ID " + frameworkId.getValue)
William Benton 42acb0c
-      registeredLock.synchronized {
William Benton 42acb0c
-        isRegistered = true
William Benton 42acb0c
-        registeredLock.notifyAll()
William Benton 42acb0c
-      }
William Benton 42acb0c
-    } finally {
William Benton 42acb0c
-      restoreClassLoader(oldClassLoader)
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  def waitForRegister() {
William Benton 42acb0c
-    registeredLock.synchronized {
William Benton 42acb0c
-      while (!isRegistered) {
William Benton 42acb0c
-        registeredLock.wait()
William Benton 42acb0c
-      }
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def disconnected(d: SchedulerDriver) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  /**
William Benton 42acb0c
-   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
William Benton 42acb0c
-   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
William Benton 42acb0c
-   * tasks are balanced across the cluster.
William Benton 42acb0c
-   */
William Benton 42acb0c
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
William Benton 42acb0c
-    val oldClassLoader = setClassLoader()
William Benton 42acb0c
-    try {
William Benton 42acb0c
-      synchronized {
William Benton 42acb0c
-        // Build a big list of the offerable workers, and remember their indices so that we can
William Benton 42acb0c
-        // figure out which Offer to reply to for each worker
William Benton 42acb0c
-        val offerableIndices = new ArrayBuffer[Int]
William Benton 42acb0c
-        val offerableWorkers = new ArrayBuffer[WorkerOffer]
William Benton 42acb0c
-
William Benton 42acb0c
-        def enoughMemory(o: Offer) = {
William Benton 42acb0c
-          val mem = getResource(o.getResourcesList, "mem")
William Benton 42acb0c
-          val slaveId = o.getSlaveId.getValue
William Benton 42acb0c
-          mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
William Benton 42acb0c
-        }
William Benton 42acb0c
-
William Benton 42acb0c
-        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
William Benton 42acb0c
-          offerableIndices += index
William Benton 42acb0c
-          offerableWorkers += new WorkerOffer(
William Benton 42acb0c
-            offer.getSlaveId.getValue,
William Benton 42acb0c
-            offer.getHostname,
William Benton 42acb0c
-            getResource(offer.getResourcesList, "cpus").toInt)
William Benton 42acb0c
-        }
William Benton 42acb0c
-
William Benton 42acb0c
-        // Call into the ClusterScheduler
William Benton 42acb0c
-        val taskLists = scheduler.resourceOffers(offerableWorkers)
William Benton 42acb0c
-
William Benton 42acb0c
-        // Build a list of Mesos tasks for each slave
William Benton 42acb0c
-        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
William Benton 42acb0c
-        for ((taskList, index) <- taskLists.zipWithIndex) {
William Benton 42acb0c
-          if (!taskList.isEmpty) {
William Benton 42acb0c
-            val offerNum = offerableIndices(index)
William Benton 42acb0c
-            val slaveId = offers(offerNum).getSlaveId.getValue
William Benton 42acb0c
-            slaveIdsWithExecutors += slaveId
William Benton 42acb0c
-            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
William Benton 42acb0c
-            for (taskDesc <- taskList) {
William Benton 42acb0c
-              taskIdToSlaveId(taskDesc.taskId) = slaveId
William Benton 42acb0c
-              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
William Benton 42acb0c
-            }
William Benton 42acb0c
-          }
William Benton 42acb0c
-        }
William Benton 42acb0c
-
William Benton 42acb0c
-        // Reply to the offers
William Benton 42acb0c
-        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
William Benton 42acb0c
-        for (i <- 0 until offers.size) {
William Benton 42acb0c
-          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
William Benton 42acb0c
-        }
William Benton 42acb0c
-      }
William Benton 42acb0c
-    } finally {
William Benton 42acb0c
-      restoreClassLoader(oldClassLoader)
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
William Benton 42acb0c
-  def getResource(res: JList[Resource], name: String): Double = {
William Benton 42acb0c
-    for (r <- res if r.getName == name) {
William Benton 42acb0c
-      return r.getScalar.getValue
William Benton 42acb0c
-    }
William Benton 42acb0c
-    // If we reached here, no resource with the required name was present
William Benton 42acb0c
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  /** Turn a Spark TaskDescription into a Mesos task */
William Benton 42acb0c
-  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
William Benton 42acb0c
-    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
William Benton 42acb0c
-    val cpuResource = Resource.newBuilder()
William Benton 42acb0c
-      .setName("cpus")
William Benton 42acb0c
-      .setType(Value.Type.SCALAR)
William Benton 42acb0c
-      .setScalar(Value.Scalar.newBuilder().setValue(1).build())
William Benton 42acb0c
-      .build()
William Benton 42acb0c
-    MesosTaskInfo.newBuilder()
William Benton 42acb0c
-      .setTaskId(taskId)
William Benton 42acb0c
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
William Benton 42acb0c
-      .setExecutor(createExecutorInfo(slaveId))
William Benton 42acb0c
-      .setName(task.name)
William Benton 42acb0c
-      .addResources(cpuResource)
William Benton 42acb0c
-      .setData(ByteString.copyFrom(task.serializedTask))
William Benton 42acb0c
-      .build()
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  /** Check whether a Mesos task state represents a finished task */
William Benton 42acb0c
-  def isFinished(state: MesosTaskState) = {
William Benton 42acb0c
-    state == MesosTaskState.TASK_FINISHED ||
William Benton 42acb0c
-      state == MesosTaskState.TASK_FAILED ||
William Benton 42acb0c
-      state == MesosTaskState.TASK_KILLED ||
William Benton 42acb0c
-      state == MesosTaskState.TASK_LOST
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
William Benton 42acb0c
-    val oldClassLoader = setClassLoader()
William Benton 42acb0c
-    try {
William Benton 42acb0c
-      val tid = status.getTaskId.getValue.toLong
William Benton 42acb0c
-      val state = TaskState.fromMesos(status.getState)
William Benton 42acb0c
-      synchronized {
William Benton 42acb0c
-        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
William Benton 42acb0c
-          // We lost the executor on this slave, so remember that it's gone
William Benton 42acb0c
-          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
William Benton 42acb0c
-        }
William Benton 42acb0c
-        if (isFinished(status.getState)) {
William Benton 42acb0c
-          taskIdToSlaveId.remove(tid)
William Benton 42acb0c
-        }
William Benton 42acb0c
-      }
William Benton 42acb0c
-      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
William Benton 42acb0c
-    } finally {
William Benton 42acb0c
-      restoreClassLoader(oldClassLoader)
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def error(d: SchedulerDriver, message: String) {
William Benton 42acb0c
-    val oldClassLoader = setClassLoader()
William Benton 42acb0c
-    try {
William Benton 42acb0c
-      logError("Mesos error: " + message)
William Benton 42acb0c
-      scheduler.error(message)
William Benton 42acb0c
-    } finally {
William Benton 42acb0c
-      restoreClassLoader(oldClassLoader)
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def stop() {
William Benton 42acb0c
-    if (driver != null) {
William Benton 42acb0c
-      driver.stop()
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def reviveOffers() {
William Benton 42acb0c
-    driver.reviveOffers()
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
William Benton 42acb0c
-
William Benton 42acb0c
-  private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
William Benton 42acb0c
-    val oldClassLoader = setClassLoader()
William Benton 42acb0c
-    try {
William Benton 42acb0c
-      logInfo("Mesos slave lost: " + slaveId.getValue)
William Benton 42acb0c
-      synchronized {
William Benton 42acb0c
-        slaveIdsWithExecutors -= slaveId.getValue
William Benton 42acb0c
-      }
William Benton 42acb0c
-      scheduler.executorLost(slaveId.getValue, reason)
William Benton 42acb0c
-    } finally {
William Benton 42acb0c
-      restoreClassLoader(oldClassLoader)
William Benton 42acb0c
-    }
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
William Benton 42acb0c
-    recordSlaveLost(d, slaveId, SlaveLost())
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
William Benton 42acb0c
-                            slaveId: SlaveID, status: Int) {
William Benton 42acb0c
-    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
William Benton 42acb0c
-                                                                 slaveId.getValue))
William Benton 42acb0c
-    recordSlaveLost(d, slaveId, ExecutorExited(status))
William Benton 42acb0c
-  }
William Benton 42acb0c
-
William Benton 42acb0c
-  // TODO: query Mesos for number of cores
William Benton 42acb0c
-  override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
William Benton 42acb0c
-}
William Benton 42acb0c
-- 
William Benton 42acb0c
1.8.3.4 (Apple Git-47)
William Benton 42acb0c