|
William Benton |
42acb0c |
From c321a448cd80331df07cda0f5f20955b3b148aac Mon Sep 17 00:00:00 2001
|
|
William Benton |
42acb0c |
From: William Benton <willb@redhat.com>
|
|
William Benton |
42acb0c |
Date: Thu, 27 Feb 2014 17:05:12 -0600
|
|
William Benton |
42acb0c |
Subject: [PATCH 7/7] Removed mesos
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
---
|
|
William Benton |
42acb0c |
.../main/scala/org/apache/spark/SparkContext.scala | 15 -
|
|
William Benton |
42acb0c |
.../main/scala/org/apache/spark/TaskState.scala | 21 --
|
|
William Benton |
42acb0c |
.../spark/executor/MesosExecutorBackend.scala | 104 -------
|
|
William Benton |
42acb0c |
.../mesos/CoarseMesosSchedulerBackend.scala | 289 -----------------
|
|
William Benton |
42acb0c |
.../cluster/mesos/MesosSchedulerBackend.scala | 344 ---------------------
|
|
William Benton |
42acb0c |
5 files changed, 773 deletions(-)
|
|
William Benton |
42acb0c |
delete mode 100644 core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
|
|
William Benton |
42acb0c |
delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
|
|
William Benton |
42acb0c |
delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
|
|
William Benton |
42acb0c |
index 566472e..f3b2941 100644
|
|
William Benton |
42acb0c |
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
|
|
William Benton |
42acb0c |
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
|
|
William Benton |
42acb0c |
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
|
|
William Benton |
42acb0c |
TextInputFormat}
|
|
William Benton |
42acb0c |
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
|
|
William Benton |
42acb0c |
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
|
|
William Benton |
42acb0c |
-import org.apache.mesos.MesosNativeLibrary
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
|
|
William Benton |
42acb0c |
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
|
|
William Benton |
42acb0c |
@@ -44,7 +43,6 @@ import org.apache.spark.rdd._
|
|
William Benton |
42acb0c |
import org.apache.spark.scheduler._
|
|
William Benton |
42acb0c |
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
|
|
William Benton |
42acb0c |
SparkDeploySchedulerBackend, SimrSchedulerBackend}
|
|
William Benton |
42acb0c |
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
|
|
William Benton |
42acb0c |
import org.apache.spark.scheduler.local.LocalBackend
|
|
William Benton |
42acb0c |
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
|
|
William Benton |
42acb0c |
import org.apache.spark.ui.SparkUI
|
|
William Benton |
42acb0c |
@@ -1281,19 +1279,6 @@ object SparkContext {
|
|
William Benton |
42acb0c |
scheduler.initialize(backend)
|
|
William Benton |
42acb0c |
scheduler
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
- case mesosUrl @ MESOS_REGEX(_) =>
|
|
William Benton |
42acb0c |
- MesosNativeLibrary.load()
|
|
William Benton |
42acb0c |
- val scheduler = new TaskSchedulerImpl(sc)
|
|
William Benton |
42acb0c |
- val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
|
|
William Benton |
42acb0c |
- val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
|
|
William Benton |
42acb0c |
- val backend = if (coarseGrained) {
|
|
William Benton |
42acb0c |
- new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
|
|
William Benton |
42acb0c |
- } else {
|
|
William Benton |
42acb0c |
- new MesosSchedulerBackend(scheduler, sc, url, appName)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- scheduler.initialize(backend)
|
|
William Benton |
42acb0c |
- scheduler
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
case SIMR_REGEX(simrUrl) =>
|
|
William Benton |
42acb0c |
val scheduler = new TaskSchedulerImpl(sc)
|
|
William Benton |
42acb0c |
val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
|
|
William Benton |
42acb0c |
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
|
|
William Benton |
42acb0c |
index 0bf1e4a..cdd8baf 100644
|
|
William Benton |
42acb0c |
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
|
|
William Benton |
42acb0c |
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
|
|
William Benton |
42acb0c |
@@ -17,8 +17,6 @@
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
package org.apache.spark
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
private[spark] object TaskState extends Enumeration {
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
|
|
William Benton |
42acb0c |
@@ -28,23 +26,4 @@ private[spark] object TaskState extends Enumeration {
|
|
William Benton |
42acb0c |
type TaskState = Value
|
|
William Benton |
42acb0c |
|
|
William Benton |
42acb0c |
def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def toMesos(state: TaskState): MesosTaskState = state match {
|
|
William Benton |
42acb0c |
- case LAUNCHING => MesosTaskState.TASK_STARTING
|
|
William Benton |
42acb0c |
- case RUNNING => MesosTaskState.TASK_RUNNING
|
|
William Benton |
42acb0c |
- case FINISHED => MesosTaskState.TASK_FINISHED
|
|
William Benton |
42acb0c |
- case FAILED => MesosTaskState.TASK_FAILED
|
|
William Benton |
42acb0c |
- case KILLED => MesosTaskState.TASK_KILLED
|
|
William Benton |
42acb0c |
- case LOST => MesosTaskState.TASK_LOST
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
|
|
William Benton |
42acb0c |
- case MesosTaskState.TASK_STAGING => LAUNCHING
|
|
William Benton |
42acb0c |
- case MesosTaskState.TASK_STARTING => LAUNCHING
|
|
William Benton |
42acb0c |
- case MesosTaskState.TASK_RUNNING => RUNNING
|
|
William Benton |
42acb0c |
- case MesosTaskState.TASK_FINISHED => FINISHED
|
|
William Benton |
42acb0c |
- case MesosTaskState.TASK_FAILED => FAILED
|
|
William Benton |
42acb0c |
- case MesosTaskState.TASK_KILLED => KILLED
|
|
William Benton |
42acb0c |
- case MesosTaskState.TASK_LOST => LOST
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
}
|
|
William Benton |
42acb0c |
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
|
|
William Benton |
42acb0c |
deleted file mode 100644
|
|
William Benton |
42acb0c |
index b56d8c9..0000000
|
|
William Benton |
42acb0c |
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
|
|
William Benton |
42acb0c |
+++ /dev/null
|
|
William Benton |
42acb0c |
@@ -1,104 +0,0 @@
|
|
William Benton |
42acb0c |
-/*
|
|
William Benton |
42acb0c |
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
William Benton |
42acb0c |
- * contributor license agreements. See the NOTICE file distributed with
|
|
William Benton |
42acb0c |
- * this work for additional information regarding copyright ownership.
|
|
William Benton |
42acb0c |
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
William Benton |
42acb0c |
- * (the "License"); you may not use this file except in compliance with
|
|
William Benton |
42acb0c |
- * the License. You may obtain a copy of the License at
|
|
William Benton |
42acb0c |
- *
|
|
William Benton |
42acb0c |
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
William Benton |
42acb0c |
- *
|
|
William Benton |
42acb0c |
- * Unless required by applicable law or agreed to in writing, software
|
|
William Benton |
42acb0c |
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
William Benton |
42acb0c |
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
William Benton |
42acb0c |
- * See the License for the specific language governing permissions and
|
|
William Benton |
42acb0c |
- * limitations under the License.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-package org.apache.spark.executor
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import java.nio.ByteBuffer
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import com.google.protobuf.ByteString
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver}
|
|
William Benton |
42acb0c |
-import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import org.apache.spark.Logging
|
|
William Benton |
42acb0c |
-import org.apache.spark.TaskState
|
|
William Benton |
42acb0c |
-import org.apache.spark.TaskState.TaskState
|
|
William Benton |
42acb0c |
-import org.apache.spark.util.Utils
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-private[spark] class MesosExecutorBackend
|
|
William Benton |
42acb0c |
- extends MesosExecutor
|
|
William Benton |
42acb0c |
- with ExecutorBackend
|
|
William Benton |
42acb0c |
- with Logging {
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- var executor: Executor = null
|
|
William Benton |
42acb0c |
- var driver: ExecutorDriver = null
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
|
|
William Benton |
42acb0c |
- val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
|
|
William Benton |
42acb0c |
- driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
|
|
William Benton |
42acb0c |
- .setTaskId(mesosTaskId)
|
|
William Benton |
42acb0c |
- .setState(TaskState.toMesos(state))
|
|
William Benton |
42acb0c |
- .setData(ByteString.copyFrom(data))
|
|
William Benton |
42acb0c |
- .build())
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def registered(
|
|
William Benton |
42acb0c |
- driver: ExecutorDriver,
|
|
William Benton |
42acb0c |
- executorInfo: ExecutorInfo,
|
|
William Benton |
42acb0c |
- frameworkInfo: FrameworkInfo,
|
|
William Benton |
42acb0c |
- slaveInfo: SlaveInfo) {
|
|
William Benton |
42acb0c |
- logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
|
|
William Benton |
42acb0c |
- this.driver = driver
|
|
William Benton |
42acb0c |
- val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
|
|
William Benton |
42acb0c |
- executor = new Executor(
|
|
William Benton |
42acb0c |
- executorInfo.getExecutorId.getValue,
|
|
William Benton |
42acb0c |
- slaveInfo.getHostname,
|
|
William Benton |
42acb0c |
- properties)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
|
|
William Benton |
42acb0c |
- val taskId = taskInfo.getTaskId.getValue.toLong
|
|
William Benton |
42acb0c |
- if (executor == null) {
|
|
William Benton |
42acb0c |
- logError("Received launchTask but executor was null")
|
|
William Benton |
42acb0c |
- } else {
|
|
William Benton |
42acb0c |
- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def error(d: ExecutorDriver, message: String) {
|
|
William Benton |
42acb0c |
- logError("Error from Mesos: " + message)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def killTask(d: ExecutorDriver, t: TaskID) {
|
|
William Benton |
42acb0c |
- if (executor == null) {
|
|
William Benton |
42acb0c |
- logError("Received KillTask but executor was null")
|
|
William Benton |
42acb0c |
- } else {
|
|
William Benton |
42acb0c |
- executor.killTask(t.getValue.toLong)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def disconnected(d: ExecutorDriver) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def shutdown(d: ExecutorDriver) {}
|
|
William Benton |
42acb0c |
-}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-/**
|
|
William Benton |
42acb0c |
- * Entry point for Mesos executor.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
-private[spark] object MesosExecutorBackend {
|
|
William Benton |
42acb0c |
- def main(args: Array[String]) {
|
|
William Benton |
42acb0c |
- MesosNativeLibrary.load()
|
|
William Benton |
42acb0c |
- // Create a new Executor and start it running
|
|
William Benton |
42acb0c |
- val runner = new MesosExecutorBackend()
|
|
William Benton |
42acb0c |
- new MesosExecutorDriver(runner).run()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-}
|
|
William Benton |
42acb0c |
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
|
|
William Benton |
42acb0c |
deleted file mode 100644
|
|
William Benton |
42acb0c |
index c27049b..0000000
|
|
William Benton |
42acb0c |
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
|
|
William Benton |
42acb0c |
+++ /dev/null
|
|
William Benton |
42acb0c |
@@ -1,289 +0,0 @@
|
|
William Benton |
42acb0c |
-/*
|
|
William Benton |
42acb0c |
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
William Benton |
42acb0c |
- * contributor license agreements. See the NOTICE file distributed with
|
|
William Benton |
42acb0c |
- * this work for additional information regarding copyright ownership.
|
|
William Benton |
42acb0c |
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
William Benton |
42acb0c |
- * (the "License"); you may not use this file except in compliance with
|
|
William Benton |
42acb0c |
- * the License. You may obtain a copy of the License at
|
|
William Benton |
42acb0c |
- *
|
|
William Benton |
42acb0c |
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
William Benton |
42acb0c |
- *
|
|
William Benton |
42acb0c |
- * Unless required by applicable law or agreed to in writing, software
|
|
William Benton |
42acb0c |
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
William Benton |
42acb0c |
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
William Benton |
42acb0c |
- * See the License for the specific language governing permissions and
|
|
William Benton |
42acb0c |
- * limitations under the License.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-package org.apache.spark.scheduler.cluster.mesos
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import java.io.File
|
|
William Benton |
42acb0c |
-import java.util.{ArrayList => JArrayList, List => JList}
|
|
William Benton |
42acb0c |
-import java.util.Collections
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
|
|
William Benton |
42acb0c |
-import scala.collection.JavaConversions._
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import com.google.protobuf.ByteString
|
|
William Benton |
42acb0c |
-import org.apache.mesos.{Scheduler => MScheduler}
|
|
William Benton |
42acb0c |
-import org.apache.mesos._
|
|
William Benton |
42acb0c |
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
|
|
William Benton |
42acb0c |
-import org.apache.spark.scheduler.TaskSchedulerImpl
|
|
William Benton |
42acb0c |
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-/**
|
|
William Benton |
42acb0c |
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
|
|
William Benton |
42acb0c |
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
|
|
William Benton |
42acb0c |
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
|
|
William Benton |
42acb0c |
- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
|
|
William Benton |
42acb0c |
- * latency.
|
|
William Benton |
42acb0c |
- *
|
|
William Benton |
42acb0c |
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
|
|
William Benton |
42acb0c |
- * remove this.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
-private[spark] class CoarseMesosSchedulerBackend(
|
|
William Benton |
42acb0c |
- scheduler: TaskSchedulerImpl,
|
|
William Benton |
42acb0c |
- sc: SparkContext,
|
|
William Benton |
42acb0c |
- master: String,
|
|
William Benton |
42acb0c |
- appName: String)
|
|
William Benton |
42acb0c |
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
|
|
William Benton |
42acb0c |
- with MScheduler
|
|
William Benton |
42acb0c |
- with Logging {
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Lock used to wait for scheduler to be registered
|
|
William Benton |
42acb0c |
- var isRegistered = false
|
|
William Benton |
42acb0c |
- val registeredLock = new Object()
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Driver for talking to Mesos
|
|
William Benton |
42acb0c |
- var driver: SchedulerDriver = null
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
|
|
William Benton |
42acb0c |
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Cores we have acquired with each Mesos task ID
|
|
William Benton |
42acb0c |
- val coresByTaskId = new HashMap[Int, Int]
|
|
William Benton |
42acb0c |
- var totalCoresAcquired = 0
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- val slaveIdsWithExecutors = new HashSet[String]
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- val taskIdToSlaveId = new HashMap[Int, String]
|
|
William Benton |
42acb0c |
- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
|
|
William Benton |
42acb0c |
- "Spark home is not set; set it through the spark.home system " +
|
|
William Benton |
42acb0c |
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- var nextMesosTaskId = 0
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def newMesosTaskId(): Int = {
|
|
William Benton |
42acb0c |
- val id = nextMesosTaskId
|
|
William Benton |
42acb0c |
- nextMesosTaskId += 1
|
|
William Benton |
42acb0c |
- id
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def start() {
|
|
William Benton |
42acb0c |
- super.start()
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- new Thread("CoarseMesosSchedulerBackend driver") {
|
|
William Benton |
42acb0c |
- setDaemon(true)
|
|
William Benton |
42acb0c |
- override def run() {
|
|
William Benton |
42acb0c |
- val scheduler = CoarseMesosSchedulerBackend.this
|
|
William Benton |
42acb0c |
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
|
|
William Benton |
42acb0c |
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
|
|
William Benton |
42acb0c |
- try { {
|
|
William Benton |
42acb0c |
- val ret = driver.run()
|
|
William Benton |
42acb0c |
- logInfo("driver.run() returned with code " + ret)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- } catch {
|
|
William Benton |
42acb0c |
- case e: Exception => logError("driver.run() failed", e)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }.start()
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- waitForRegister()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def createCommand(offer: Offer, numCores: Int): CommandInfo = {
|
|
William Benton |
42acb0c |
- val environment = Environment.newBuilder()
|
|
William Benton |
42acb0c |
- sc.executorEnvs.foreach { case (key, value) =>
|
|
William Benton |
42acb0c |
- environment.addVariables(Environment.Variable.newBuilder()
|
|
William Benton |
42acb0c |
- .setName(key)
|
|
William Benton |
42acb0c |
- .setValue(value)
|
|
William Benton |
42acb0c |
- .build())
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- val command = CommandInfo.newBuilder()
|
|
William Benton |
42acb0c |
- .setEnvironment(environment)
|
|
William Benton |
42acb0c |
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
|
|
William Benton |
42acb0c |
- conf.get("spark.driver.host"),
|
|
William Benton |
42acb0c |
- conf.get("spark.driver.port"),
|
|
William Benton |
42acb0c |
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
|
William Benton |
42acb0c |
- val uri = conf.get("spark.executor.uri", null)
|
|
William Benton |
42acb0c |
- if (uri == null) {
|
|
William Benton |
42acb0c |
- val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
|
|
William Benton |
42acb0c |
- command.setValue(
|
|
William Benton |
42acb0c |
- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
|
|
William Benton |
42acb0c |
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
|
William Benton |
42acb0c |
- } else {
|
|
William Benton |
42acb0c |
- // Grab everything to the first '.'. We'll use that and '*' to
|
|
William Benton |
42acb0c |
- // glob the directory "correctly".
|
|
William Benton |
42acb0c |
- val basename = uri.split('/').last.split('.').head
|
|
William Benton |
42acb0c |
- command.setValue(
|
|
William Benton |
42acb0c |
- "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
|
|
William Benton |
42acb0c |
- .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
|
William Benton |
42acb0c |
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- command.build()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
|
|
William Benton |
42acb0c |
- logInfo("Registered as framework ID " + frameworkId.getValue)
|
|
William Benton |
42acb0c |
- registeredLock.synchronized {
|
|
William Benton |
42acb0c |
- isRegistered = true
|
|
William Benton |
42acb0c |
- registeredLock.notifyAll()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def waitForRegister() {
|
|
William Benton |
42acb0c |
- registeredLock.synchronized {
|
|
William Benton |
42acb0c |
- while (!isRegistered) {
|
|
William Benton |
42acb0c |
- registeredLock.wait()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def disconnected(d: SchedulerDriver) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /**
|
|
William Benton |
42acb0c |
- * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
|
|
William Benton |
42acb0c |
- * unless we've already launched more than we wanted to.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- for (offer <- offers) {
|
|
William Benton |
42acb0c |
- val slaveId = offer.getSlaveId.toString
|
|
William Benton |
42acb0c |
- val mem = getResource(offer.getResourcesList, "mem")
|
|
William Benton |
42acb0c |
- val cpus = getResource(offer.getResourcesList, "cpus").toInt
|
|
William Benton |
42acb0c |
- if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
|
|
William Benton |
42acb0c |
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
|
|
William Benton |
42acb0c |
- !slaveIdsWithExecutors.contains(slaveId)) {
|
|
William Benton |
42acb0c |
- // Launch an executor on the slave
|
|
William Benton |
42acb0c |
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
|
|
William Benton |
42acb0c |
- totalCoresAcquired += cpusToUse
|
|
William Benton |
42acb0c |
- val taskId = newMesosTaskId()
|
|
William Benton |
42acb0c |
- taskIdToSlaveId(taskId) = slaveId
|
|
William Benton |
42acb0c |
- slaveIdsWithExecutors += slaveId
|
|
William Benton |
42acb0c |
- coresByTaskId(taskId) = cpusToUse
|
|
William Benton |
42acb0c |
- val task = MesosTaskInfo.newBuilder()
|
|
William Benton |
42acb0c |
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
|
|
William Benton |
42acb0c |
- .setSlaveId(offer.getSlaveId)
|
|
William Benton |
42acb0c |
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
|
|
William Benton |
42acb0c |
- .setName("Task " + taskId)
|
|
William Benton |
42acb0c |
- .addResources(createResource("cpus", cpusToUse))
|
|
William Benton |
42acb0c |
- .addResources(createResource("mem", sc.executorMemory))
|
|
William Benton |
42acb0c |
- .build()
|
|
William Benton |
42acb0c |
- d.launchTasks(offer.getId, Collections.singletonList(task), filters)
|
|
William Benton |
42acb0c |
- } else {
|
|
William Benton |
42acb0c |
- // Filter it out
|
|
William Benton |
42acb0c |
- d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
|
|
William Benton |
42acb0c |
- private def getResource(res: JList[Resource], name: String): Double = {
|
|
William Benton |
42acb0c |
- for (r <- res if r.getName == name) {
|
|
William Benton |
42acb0c |
- return r.getScalar.getValue
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- // If we reached here, no resource with the required name was present
|
|
William Benton |
42acb0c |
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /** Build a Mesos resource protobuf object */
|
|
William Benton |
42acb0c |
- private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
|
|
William Benton |
42acb0c |
- Resource.newBuilder()
|
|
William Benton |
42acb0c |
- .setName(resourceName)
|
|
William Benton |
42acb0c |
- .setType(Value.Type.SCALAR)
|
|
William Benton |
42acb0c |
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
|
|
William Benton |
42acb0c |
- .build()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /** Check whether a Mesos task state represents a finished task */
|
|
William Benton |
42acb0c |
- private def isFinished(state: MesosTaskState) = {
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_FINISHED ||
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_FAILED ||
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_KILLED ||
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_LOST
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
|
|
William Benton |
42acb0c |
- val taskId = status.getTaskId.getValue.toInt
|
|
William Benton |
42acb0c |
- val state = status.getState
|
|
William Benton |
42acb0c |
- logInfo("Mesos task " + taskId + " is now " + state)
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- if (isFinished(state)) {
|
|
William Benton |
42acb0c |
- val slaveId = taskIdToSlaveId(taskId)
|
|
William Benton |
42acb0c |
- slaveIdsWithExecutors -= slaveId
|
|
William Benton |
42acb0c |
- taskIdToSlaveId -= taskId
|
|
William Benton |
42acb0c |
- // Remove the cores we have remembered for this task, if it's in the hashmap
|
|
William Benton |
42acb0c |
- for (cores <- coresByTaskId.get(taskId)) {
|
|
William Benton |
42acb0c |
- totalCoresAcquired -= cores
|
|
William Benton |
42acb0c |
- coresByTaskId -= taskId
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- // If it was a failure, mark the slave as failed for blacklisting purposes
|
|
William Benton |
42acb0c |
- if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
|
|
William Benton |
42acb0c |
- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
|
|
William Benton |
42acb0c |
- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
|
|
William Benton |
42acb0c |
- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
|
|
William Benton |
42acb0c |
- "is Spark installed on it?")
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def error(d: SchedulerDriver, message: String) {
|
|
William Benton |
42acb0c |
- logError("Mesos error: " + message)
|
|
William Benton |
42acb0c |
- scheduler.error(message)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def stop() {
|
|
William Benton |
42acb0c |
- super.stop()
|
|
William Benton |
42acb0c |
- if (driver != null) {
|
|
William Benton |
42acb0c |
- driver.stop()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
|
|
William Benton |
42acb0c |
- logInfo("Mesos slave lost: " + slaveId.getValue)
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
|
|
William Benton |
42acb0c |
- // Note that the slave ID corresponds to the executor ID on that slave
|
|
William Benton |
42acb0c |
- slaveIdsWithExecutors -= slaveId.getValue
|
|
William Benton |
42acb0c |
- removeExecutor(slaveId.getValue, "Mesos slave lost")
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
|
|
William Benton |
42acb0c |
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
|
|
William Benton |
42acb0c |
- slaveLost(d, s)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-}
|
|
William Benton |
42acb0c |
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
|
|
William Benton |
42acb0c |
deleted file mode 100644
|
|
William Benton |
42acb0c |
index 4978148..0000000
|
|
William Benton |
42acb0c |
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
|
|
William Benton |
42acb0c |
+++ /dev/null
|
|
William Benton |
42acb0c |
@@ -1,344 +0,0 @@
|
|
William Benton |
42acb0c |
-/*
|
|
William Benton |
42acb0c |
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
William Benton |
42acb0c |
- * contributor license agreements. See the NOTICE file distributed with
|
|
William Benton |
42acb0c |
- * this work for additional information regarding copyright ownership.
|
|
William Benton |
42acb0c |
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
William Benton |
42acb0c |
- * (the "License"); you may not use this file except in compliance with
|
|
William Benton |
42acb0c |
- * the License. You may obtain a copy of the License at
|
|
William Benton |
42acb0c |
- *
|
|
William Benton |
42acb0c |
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
William Benton |
42acb0c |
- *
|
|
William Benton |
42acb0c |
- * Unless required by applicable law or agreed to in writing, software
|
|
William Benton |
42acb0c |
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
William Benton |
42acb0c |
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
William Benton |
42acb0c |
- * See the License for the specific language governing permissions and
|
|
William Benton |
42acb0c |
- * limitations under the License.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-package org.apache.spark.scheduler.cluster.mesos
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import java.io.File
|
|
William Benton |
42acb0c |
-import java.util.{ArrayList => JArrayList, List => JList}
|
|
William Benton |
42acb0c |
-import java.util.Collections
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
|
|
William Benton |
42acb0c |
-import scala.collection.JavaConversions._
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import com.google.protobuf.ByteString
|
|
William Benton |
42acb0c |
-import org.apache.mesos.{Scheduler => MScheduler}
|
|
William Benton |
42acb0c |
-import org.apache.mesos._
|
|
William Benton |
42acb0c |
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
|
|
William Benton |
42acb0c |
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
|
|
William Benton |
42acb0c |
- TaskDescription, TaskSchedulerImpl, WorkerOffer}
|
|
William Benton |
42acb0c |
-import org.apache.spark.util.Utils
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
-/**
|
|
William Benton |
42acb0c |
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
|
|
William Benton |
42acb0c |
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
|
|
William Benton |
42acb0c |
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
-private[spark] class MesosSchedulerBackend(
|
|
William Benton |
42acb0c |
- scheduler: TaskSchedulerImpl,
|
|
William Benton |
42acb0c |
- sc: SparkContext,
|
|
William Benton |
42acb0c |
- master: String,
|
|
William Benton |
42acb0c |
- appName: String)
|
|
William Benton |
42acb0c |
- extends SchedulerBackend
|
|
William Benton |
42acb0c |
- with MScheduler
|
|
William Benton |
42acb0c |
- with Logging {
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Lock used to wait for scheduler to be registered
|
|
William Benton |
42acb0c |
- var isRegistered = false
|
|
William Benton |
42acb0c |
- val registeredLock = new Object()
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Driver for talking to Mesos
|
|
William Benton |
42acb0c |
- var driver: SchedulerDriver = null
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Which slave IDs we have executors on
|
|
William Benton |
42acb0c |
- val slaveIdsWithExecutors = new HashSet[String]
|
|
William Benton |
42acb0c |
- val taskIdToSlaveId = new HashMap[Long, String]
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // An ExecutorInfo for our tasks
|
|
William Benton |
42acb0c |
- var execArgs: Array[Byte] = null
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- var classLoader: ClassLoader = null
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def start() {
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- classLoader = Thread.currentThread.getContextClassLoader
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- new Thread("MesosSchedulerBackend driver") {
|
|
William Benton |
42acb0c |
- setDaemon(true)
|
|
William Benton |
42acb0c |
- override def run() {
|
|
William Benton |
42acb0c |
- val scheduler = MesosSchedulerBackend.this
|
|
William Benton |
42acb0c |
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
|
|
William Benton |
42acb0c |
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
|
|
William Benton |
42acb0c |
- try {
|
|
William Benton |
42acb0c |
- val ret = driver.run()
|
|
William Benton |
42acb0c |
- logInfo("driver.run() returned with code " + ret)
|
|
William Benton |
42acb0c |
- } catch {
|
|
William Benton |
42acb0c |
- case e: Exception => logError("driver.run() failed", e)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }.start()
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- waitForRegister()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def createExecutorInfo(execId: String): ExecutorInfo = {
|
|
William Benton |
42acb0c |
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
|
|
William Benton |
42acb0c |
- "Spark home is not set; set it through the spark.home system " +
|
|
William Benton |
42acb0c |
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
|
|
William Benton |
42acb0c |
- val environment = Environment.newBuilder()
|
|
William Benton |
42acb0c |
- sc.executorEnvs.foreach { case (key, value) =>
|
|
William Benton |
42acb0c |
- environment.addVariables(Environment.Variable.newBuilder()
|
|
William Benton |
42acb0c |
- .setName(key)
|
|
William Benton |
42acb0c |
- .setValue(value)
|
|
William Benton |
42acb0c |
- .build())
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- val command = CommandInfo.newBuilder()
|
|
William Benton |
42acb0c |
- .setEnvironment(environment)
|
|
William Benton |
42acb0c |
- val uri = sc.conf.get("spark.executor.uri", null)
|
|
William Benton |
42acb0c |
- if (uri == null) {
|
|
William Benton |
42acb0c |
- command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
|
|
William Benton |
42acb0c |
- } else {
|
|
William Benton |
42acb0c |
- // Grab everything to the first '.'. We'll use that and '*' to
|
|
William Benton |
42acb0c |
- // glob the directory "correctly".
|
|
William Benton |
42acb0c |
- val basename = uri.split('/').last.split('.').head
|
|
William Benton |
42acb0c |
- command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
|
|
William Benton |
42acb0c |
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- val memory = Resource.newBuilder()
|
|
William Benton |
42acb0c |
- .setName("mem")
|
|
William Benton |
42acb0c |
- .setType(Value.Type.SCALAR)
|
|
William Benton |
42acb0c |
- .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
|
|
William Benton |
42acb0c |
- .build()
|
|
William Benton |
42acb0c |
- ExecutorInfo.newBuilder()
|
|
William Benton |
42acb0c |
- .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
|
|
William Benton |
42acb0c |
- .setCommand(command)
|
|
William Benton |
42acb0c |
- .setData(ByteString.copyFrom(createExecArg()))
|
|
William Benton |
42acb0c |
- .addResources(memory)
|
|
William Benton |
42acb0c |
- .build()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /**
|
|
William Benton |
42acb0c |
- * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
|
|
William Benton |
42acb0c |
- * containing all the spark.* system properties in the form of (String, String) pairs.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
- private def createExecArg(): Array[Byte] = {
|
|
William Benton |
42acb0c |
- if (execArgs == null) {
|
|
William Benton |
42acb0c |
- val props = new HashMap[String, String]
|
|
William Benton |
42acb0c |
- val iterator = System.getProperties.entrySet.iterator
|
|
William Benton |
42acb0c |
- while (iterator.hasNext) {
|
|
William Benton |
42acb0c |
- val entry = iterator.next
|
|
William Benton |
42acb0c |
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
|
|
William Benton |
42acb0c |
- if (key.startsWith("spark.")) {
|
|
William Benton |
42acb0c |
- props(key) = value
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- // Serialize the map as an array of (String, String) pairs
|
|
William Benton |
42acb0c |
- execArgs = Utils.serialize(props.toArray)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- execArgs
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- private def setClassLoader(): ClassLoader = {
|
|
William Benton |
42acb0c |
- val oldClassLoader = Thread.currentThread.getContextClassLoader
|
|
William Benton |
42acb0c |
- Thread.currentThread.setContextClassLoader(classLoader)
|
|
William Benton |
42acb0c |
- oldClassLoader
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- private def restoreClassLoader(oldClassLoader: ClassLoader) {
|
|
William Benton |
42acb0c |
- Thread.currentThread.setContextClassLoader(oldClassLoader)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
|
|
William Benton |
42acb0c |
- val oldClassLoader = setClassLoader()
|
|
William Benton |
42acb0c |
- try {
|
|
William Benton |
42acb0c |
- logInfo("Registered as framework ID " + frameworkId.getValue)
|
|
William Benton |
42acb0c |
- registeredLock.synchronized {
|
|
William Benton |
42acb0c |
- isRegistered = true
|
|
William Benton |
42acb0c |
- registeredLock.notifyAll()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- } finally {
|
|
William Benton |
42acb0c |
- restoreClassLoader(oldClassLoader)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def waitForRegister() {
|
|
William Benton |
42acb0c |
- registeredLock.synchronized {
|
|
William Benton |
42acb0c |
- while (!isRegistered) {
|
|
William Benton |
42acb0c |
- registeredLock.wait()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def disconnected(d: SchedulerDriver) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /**
|
|
William Benton |
42acb0c |
- * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
|
|
William Benton |
42acb0c |
- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
|
|
William Benton |
42acb0c |
- * tasks are balanced across the cluster.
|
|
William Benton |
42acb0c |
- */
|
|
William Benton |
42acb0c |
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
|
|
William Benton |
42acb0c |
- val oldClassLoader = setClassLoader()
|
|
William Benton |
42acb0c |
- try {
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- // Build a big list of the offerable workers, and remember their indices so that we can
|
|
William Benton |
42acb0c |
- // figure out which Offer to reply to for each worker
|
|
William Benton |
42acb0c |
- val offerableIndices = new ArrayBuffer[Int]
|
|
William Benton |
42acb0c |
- val offerableWorkers = new ArrayBuffer[WorkerOffer]
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- def enoughMemory(o: Offer) = {
|
|
William Benton |
42acb0c |
- val mem = getResource(o.getResourcesList, "mem")
|
|
William Benton |
42acb0c |
- val slaveId = o.getSlaveId.getValue
|
|
William Benton |
42acb0c |
- mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
|
|
William Benton |
42acb0c |
- offerableIndices += index
|
|
William Benton |
42acb0c |
- offerableWorkers += new WorkerOffer(
|
|
William Benton |
42acb0c |
- offer.getSlaveId.getValue,
|
|
William Benton |
42acb0c |
- offer.getHostname,
|
|
William Benton |
42acb0c |
- getResource(offer.getResourcesList, "cpus").toInt)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Call into the ClusterScheduler
|
|
William Benton |
42acb0c |
- val taskLists = scheduler.resourceOffers(offerableWorkers)
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Build a list of Mesos tasks for each slave
|
|
William Benton |
42acb0c |
- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
|
|
William Benton |
42acb0c |
- for ((taskList, index) <- taskLists.zipWithIndex) {
|
|
William Benton |
42acb0c |
- if (!taskList.isEmpty) {
|
|
William Benton |
42acb0c |
- val offerNum = offerableIndices(index)
|
|
William Benton |
42acb0c |
- val slaveId = offers(offerNum).getSlaveId.getValue
|
|
William Benton |
42acb0c |
- slaveIdsWithExecutors += slaveId
|
|
William Benton |
42acb0c |
- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
|
|
William Benton |
42acb0c |
- for (taskDesc <- taskList) {
|
|
William Benton |
42acb0c |
- taskIdToSlaveId(taskDesc.taskId) = slaveId
|
|
William Benton |
42acb0c |
- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // Reply to the offers
|
|
William Benton |
42acb0c |
- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
|
|
William Benton |
42acb0c |
- for (i <- 0 until offers.size) {
|
|
William Benton |
42acb0c |
- d.launchTasks(offers(i).getId, mesosTasks(i), filters)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- } finally {
|
|
William Benton |
42acb0c |
- restoreClassLoader(oldClassLoader)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
|
|
William Benton |
42acb0c |
- def getResource(res: JList[Resource], name: String): Double = {
|
|
William Benton |
42acb0c |
- for (r <- res if r.getName == name) {
|
|
William Benton |
42acb0c |
- return r.getScalar.getValue
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- // If we reached here, no resource with the required name was present
|
|
William Benton |
42acb0c |
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /** Turn a Spark TaskDescription into a Mesos task */
|
|
William Benton |
42acb0c |
- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
|
|
William Benton |
42acb0c |
- val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
|
|
William Benton |
42acb0c |
- val cpuResource = Resource.newBuilder()
|
|
William Benton |
42acb0c |
- .setName("cpus")
|
|
William Benton |
42acb0c |
- .setType(Value.Type.SCALAR)
|
|
William Benton |
42acb0c |
- .setScalar(Value.Scalar.newBuilder().setValue(1).build())
|
|
William Benton |
42acb0c |
- .build()
|
|
William Benton |
42acb0c |
- MesosTaskInfo.newBuilder()
|
|
William Benton |
42acb0c |
- .setTaskId(taskId)
|
|
William Benton |
42acb0c |
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
|
|
William Benton |
42acb0c |
- .setExecutor(createExecutorInfo(slaveId))
|
|
William Benton |
42acb0c |
- .setName(task.name)
|
|
William Benton |
42acb0c |
- .addResources(cpuResource)
|
|
William Benton |
42acb0c |
- .setData(ByteString.copyFrom(task.serializedTask))
|
|
William Benton |
42acb0c |
- .build()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- /** Check whether a Mesos task state represents a finished task */
|
|
William Benton |
42acb0c |
- def isFinished(state: MesosTaskState) = {
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_FINISHED ||
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_FAILED ||
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_KILLED ||
|
|
William Benton |
42acb0c |
- state == MesosTaskState.TASK_LOST
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
|
|
William Benton |
42acb0c |
- val oldClassLoader = setClassLoader()
|
|
William Benton |
42acb0c |
- try {
|
|
William Benton |
42acb0c |
- val tid = status.getTaskId.getValue.toLong
|
|
William Benton |
42acb0c |
- val state = TaskState.fromMesos(status.getState)
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
|
|
William Benton |
42acb0c |
- // We lost the executor on this slave, so remember that it's gone
|
|
William Benton |
42acb0c |
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- if (isFinished(status.getState)) {
|
|
William Benton |
42acb0c |
- taskIdToSlaveId.remove(tid)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
|
|
William Benton |
42acb0c |
- } finally {
|
|
William Benton |
42acb0c |
- restoreClassLoader(oldClassLoader)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def error(d: SchedulerDriver, message: String) {
|
|
William Benton |
42acb0c |
- val oldClassLoader = setClassLoader()
|
|
William Benton |
42acb0c |
- try {
|
|
William Benton |
42acb0c |
- logError("Mesos error: " + message)
|
|
William Benton |
42acb0c |
- scheduler.error(message)
|
|
William Benton |
42acb0c |
- } finally {
|
|
William Benton |
42acb0c |
- restoreClassLoader(oldClassLoader)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def stop() {
|
|
William Benton |
42acb0c |
- if (driver != null) {
|
|
William Benton |
42acb0c |
- driver.stop()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def reviveOffers() {
|
|
William Benton |
42acb0c |
- driver.reviveOffers()
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
|
|
William Benton |
42acb0c |
- val oldClassLoader = setClassLoader()
|
|
William Benton |
42acb0c |
- try {
|
|
William Benton |
42acb0c |
- logInfo("Mesos slave lost: " + slaveId.getValue)
|
|
William Benton |
42acb0c |
- synchronized {
|
|
William Benton |
42acb0c |
- slaveIdsWithExecutors -= slaveId.getValue
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- scheduler.executorLost(slaveId.getValue, reason)
|
|
William Benton |
42acb0c |
- } finally {
|
|
William Benton |
42acb0c |
- restoreClassLoader(oldClassLoader)
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
|
|
William Benton |
42acb0c |
- recordSlaveLost(d, slaveId, SlaveLost())
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
|
|
William Benton |
42acb0c |
- slaveId: SlaveID, status: Int) {
|
|
William Benton |
42acb0c |
- logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
|
|
William Benton |
42acb0c |
- slaveId.getValue))
|
|
William Benton |
42acb0c |
- recordSlaveLost(d, slaveId, ExecutorExited(status))
|
|
William Benton |
42acb0c |
- }
|
|
William Benton |
42acb0c |
-
|
|
William Benton |
42acb0c |
- // TODO: query Mesos for number of cores
|
|
William Benton |
42acb0c |
- override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
|
|
William Benton |
42acb0c |
-}
|
|
William Benton |
42acb0c |
--
|
|
William Benton |
42acb0c |
1.8.3.4 (Apple Git-47)
|
|
William Benton |
42acb0c |
|