From 8444f883ed7cc426655bde14757d8785b3b88445 Mon Sep 17 00:00:00 2001 From: William Benton Date: Mar 26 2014 02:28:38 +0000 Subject: updated sources, etc --- diff --git a/.gitignore b/.gitignore index 6407f31..da8fb97 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /v0.9.0-incubating.tar.gz +/v0.9.1-rc1.tar.gz diff --git a/sources b/sources index a17853d..36eb710 100644 --- a/sources +++ b/sources @@ -1 +1 @@ -f31821e3d54e0335e55eccbceda49b6c v0.9.0-incubating.tar.gz +5fda1fb06b9d685b8a5bceb8d1ea3922 v0.9.1-rc1.tar.gz diff --git a/spark-v0.9.0-0001-Replace-lift-json-with-json4s-jackson.patch b/spark-v0.9.0-0001-Replace-lift-json-with-json4s-jackson.patch deleted file mode 100644 index 73479d2..0000000 --- a/spark-v0.9.0-0001-Replace-lift-json-with-json4s-jackson.patch +++ /dev/null @@ -1,253 +0,0 @@ -From ab4e57f1acd2ee95622a0d264f9b81c4bbb43f9e Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Sun, 23 Feb 2014 17:22:07 -0600 -Subject: [PATCH 1/7] Replace lift-json with json4s-jackson. - -The aim of the Json4s project is to provide a common API for -Scala JSON libraries. It is Apache-licensed, easier for -downstream distributions to package, and mostly API-compatible -with lift-json. Furthermore, the Jackson-backed implementation -parses faster than lift-json on all but the smallest inputs. - -Backported patch from master to 0.9.0. - -Conflicts: - core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala - core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala - core/src/main/scala/org/apache/spark/ui/JettyUtils.scala - core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala ---- - core/pom.xml | 5 +++-- - .../apache/spark/deploy/FaultToleranceTest.scala | 9 ++++---- - .../org/apache/spark/deploy/JsonProtocol.scala | 2 +- - .../spark/deploy/master/ui/ApplicationPage.scala | 2 +- - .../apache/spark/deploy/master/ui/IndexPage.scala | 2 +- - .../apache/spark/deploy/worker/ui/IndexPage.scala | 2 +- - .../scala/org/apache/spark/ui/JettyUtils.scala | 8 ++++---- - .../apache/spark/deploy/JsonProtocolSuite.scala | 24 ++++++++++++++++++---- - project/SparkBuild.scala | 2 +- - 9 files changed, 37 insertions(+), 19 deletions(-) - -diff --git a/core/pom.xml b/core/pom.xml -index 62ceba1..afae171 100644 ---- a/core/pom.xml -+++ b/core/pom.xml -@@ -108,8 +108,9 @@ - scala-library - - -- net.liftweb -- lift-json_${scala.binary.version} -+ org.json4s -+ json4s-jackson_${scala.binary.version} -+ 3.2.6 - - - it.unimi.dsi -diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala -index 4dfb19e..60a87af 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala -@@ -29,7 +29,8 @@ import scala.concurrent.ExecutionContext.Implicits.global - import scala.collection.mutable.ListBuffer - import scala.sys.process._ - --import net.liftweb.json.JsonParser -+import org.json4s._ -+import org.json4s.jackson.JsonMethods - - import org.apache.spark.{Logging, SparkContext} - import org.apache.spark.deploy.master.RecoveryState -@@ -312,7 +313,7 @@ private[spark] object FaultToleranceTest extends App with Logging { - private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) - extends Logging { - -- implicit val formats = net.liftweb.json.DefaultFormats -+ implicit val formats = org.json4s.DefaultFormats - var state: RecoveryState.Value = _ - var liveWorkerIPs: List[String] = _ - var numLiveApps = 0 -@@ -322,7 +323,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val - def readState() { - try { - val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream) -- val json = JsonParser.parse(masterStream, closeAutomatically = true) -+ val json = JsonMethods.parse(masterStream) - - val workers = json \ "workers" - val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE") -@@ -350,7 +351,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val - private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File) - extends Logging { - -- implicit val formats = net.liftweb.json.DefaultFormats -+ implicit val formats = org.json4s.DefaultFormats - - logDebug("Created worker: " + this) - -diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala -index e607b8c..a43d004 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala -@@ -17,7 +17,7 @@ - - package org.apache.spark.deploy - --import net.liftweb.json.JsonDSL._ -+import org.json4s.JsonDSL._ - - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} - import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} -diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -index 9485bfd..1b234d6 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -@@ -22,7 +22,7 @@ import scala.xml.Node - - import akka.pattern.ask - import javax.servlet.http.HttpServletRequest --import net.liftweb.json.JsonAST.JValue -+import org.json4s.JValue - - import org.apache.spark.deploy.JsonProtocol - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala -index a9af8df..a55264b 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala -@@ -23,7 +23,7 @@ import scala.xml.Node - - import akka.pattern.ask - import javax.servlet.http.HttpServletRequest --import net.liftweb.json.JsonAST.JValue -+import org.json4s.JValue - - import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala -index 925c6fb..de356dc 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala -@@ -22,7 +22,7 @@ import scala.xml.Node - - import akka.pattern.ask - import javax.servlet.http.HttpServletRequest --import net.liftweb.json.JsonAST.JValue -+import org.json4s.JValue - - import org.apache.spark.deploy.JsonProtocol - import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} -diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -index 7211dbc..4e43fd5 100644 ---- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -@@ -23,10 +23,10 @@ import scala.annotation.tailrec - import scala.util.{Try, Success, Failure} - import scala.xml.Node - --import net.liftweb.json.{JValue, pretty, render} -- --import org.eclipse.jetty.server.{Server, Request, Handler} --import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} -+import org.json4s.JValue -+import org.json4s.jackson.JsonMethods.{pretty, render} -+import org.eclipse.jetty.server.{Handler, Request, Server} -+import org.eclipse.jetty.server.handler.{AbstractHandler, ContextHandler, HandlerList, ResourceHandler} - import org.eclipse.jetty.util.thread.QueuedThreadPool - - import org.apache.spark.Logging -diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala -index d05bbd6..8f1df8a 100644 ---- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala -+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala -@@ -20,8 +20,12 @@ package org.apache.spark.deploy - import java.io.File - import java.util.Date - --import net.liftweb.json.{JsonAST, JsonParser} --import net.liftweb.json.JsonAST.JValue -+import org.json4s._ -+ -+import org.json4s.JValue -+import org.json4s.jackson.JsonMethods -+import com.fasterxml.jackson.core.JsonParseException -+ - import org.scalatest.FunSuite - - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -@@ -32,21 +36,31 @@ class JsonProtocolSuite extends FunSuite { - test("writeApplicationInfo") { - val output = JsonProtocol.writeApplicationInfo(createAppInfo()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appInfoJsonStr)) - } - - test("writeWorkerInfo") { - val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerInfoJsonStr)) - } - - test("writeApplicationDescription") { - val output = JsonProtocol.writeApplicationDescription(createAppDesc()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appDescJsonStr)) - } - - test("writeExecutorRunner") { - val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr)) -+ } -+ -+ test("writeDriverInfo") { -+ val output = JsonProtocol.writeDriverInfo(createDriverInfo()) -+ assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.driverInfoJsonStr)) - } - - test("writeMasterState") { -@@ -59,6 +73,7 @@ class JsonProtocolSuite extends FunSuite { - activeDrivers, completedDrivers, RecoveryState.ALIVE) - val output = JsonProtocol.writeMasterState(stateResponse) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr)) - } - - test("writeWorkerState") { -@@ -70,6 +85,7 @@ class JsonProtocolSuite extends FunSuite { - finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") - val output = JsonProtocol.writeWorkerState(stateResponse) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr)) - } - - def createAppDesc(): ApplicationDescription = { -@@ -106,9 +122,9 @@ class JsonProtocolSuite extends FunSuite { - - def assertValidJson(json: JValue) { - try { -- JsonParser.parse(JsonAST.compactRender(json)) -+ JsonMethods.parse(JsonMethods.compact(json)) - } catch { -- case e: JsonParser.ParseException => fail("Invalid Json detected", e) -+ case e: JsonParseException => fail("Invalid Json detected", e) - } - } - } -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index b891ffa..398f5ec 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -260,7 +260,7 @@ object SparkBuild extends Build { - "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", -- "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), -+ "org.json4s" %% "json4s-jackson" % "3.2.6", - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", - "org.apache.mesos" % "mesos" % "0.13.0", --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0002-use-sbt-0.13.1.patch b/spark-v0.9.0-0002-use-sbt-0.13.1.patch deleted file mode 100644 index fb38677..0000000 --- a/spark-v0.9.0-0002-use-sbt-0.13.1.patch +++ /dev/null @@ -1,22 +0,0 @@ -From 64d595b9cc0a92ab133f1e13512c5d7da2982fdb Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 14:25:34 -0600 -Subject: [PATCH 2/7] use sbt 0.13.1 - ---- - project/build.properties | 2 +- - 1 file changed, 1 insertion(+), 1 deletion(-) - -diff --git a/project/build.properties b/project/build.properties -index 839f5fb..4b52bb9 100644 ---- a/project/build.properties -+++ b/project/build.properties -@@ -14,4 +14,4 @@ - # See the License for the specific language governing permissions and - # limitations under the License. - # --sbt.version=0.12.4 -+sbt.version=0.13.1 --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0003-Removed-sbt-plugins.patch b/spark-v0.9.0-0003-Removed-sbt-plugins.patch deleted file mode 100644 index 7e86a1f..0000000 --- a/spark-v0.9.0-0003-Removed-sbt-plugins.patch +++ /dev/null @@ -1,164 +0,0 @@ -From 88286b1bffc4eb65f6a259c71e613fb76667470b Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 15:46:41 -0600 -Subject: [PATCH 3/7] Removed sbt plugins. - ---- - project/SparkBuild.scala | 38 +++++----------------------------- - project/plugins.sbt | 18 ---------------- - project/project/SparkPluginBuild.scala | 24 --------------------- - 3 files changed, 5 insertions(+), 75 deletions(-) - delete mode 100644 project/plugins.sbt - delete mode 100644 project/project/SparkPluginBuild.scala - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 398f5ec..d401c71 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -18,8 +18,6 @@ - import sbt._ - import sbt.Classpaths.publishTask - import Keys._ --import sbtassembly.Plugin._ --import AssemblyKeys._ - import scala.util.Properties - // For Sonatype publishing - //import com.jsuereth.pgp.sbtplugin.PgpKeys._ -@@ -60,11 +58,6 @@ object SparkBuild extends Build { - - lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) - -- lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) -- .dependsOn(core, graphx, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) -- -- lazy val assembleDeps = TaskKey[Unit]("assemble-deps", "Build assembly of dependencies and packages Spark projects") -- - // A configuration to set an alternative publishLocalConfiguration - lazy val MavenCompile = config("m2r") extend(Compile) - lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") -@@ -116,7 +109,7 @@ object SparkBuild extends Build { - // Everything except assembly, tools and examples belong to packageProjects - lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef - -- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) -+ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) - - def sharedSettings = Defaults.defaultSettings ++ Seq( - organization := "org.apache.spark", -@@ -129,7 +122,6 @@ object SparkBuild extends Build { - retrieveManaged := true, - retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", - transitiveClassifiers in Scope.GlobalScope := Seq("sources"), -- testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), - - // Fork new JVMs for tests and set Java options for those - fork := true, -@@ -230,8 +222,8 @@ object SparkBuild extends Build { - publishMavenStyle in MavenCompile := true, - publishLocal in MavenCompile <<= publishTask(publishLocalConfiguration in MavenCompile, deliverLocal), - publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn -- ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings -- -+ ) -+ - val slf4jVersion = "1.7.2" - - val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") -@@ -309,11 +301,11 @@ object SparkBuild extends Build { - excludeAll(excludeSnappy) - excludeAll(excludeCglib) - ) -- ) ++ assemblySettings ++ extraAssemblySettings -+ ) - - def toolsSettings = sharedSettings ++ Seq( - name := "spark-tools" -- ) ++ assemblySettings ++ extraAssemblySettings -+ ) - - def graphxSettings = sharedSettings ++ Seq( - name := "spark-graphx", -@@ -377,26 +369,6 @@ object SparkBuild extends Build { - ) - ) - -- def assemblyProjSettings = sharedSettings ++ Seq( -- libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1", -- name := "spark-assembly", -- assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn, -- jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" }, -- jarName in packageDependency <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + "-deps.jar" } -- ) ++ assemblySettings ++ extraAssemblySettings -- -- def extraAssemblySettings() = Seq( -- test in assembly := {}, -- mergeStrategy in assembly := { -- case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard -- case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard -- case "log4j.properties" => MergeStrategy.discard -- case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines -- case "reference.conf" => MergeStrategy.concat -- case _ => MergeStrategy.first -- } -- ) -- - def twitterSettings() = sharedSettings ++ Seq( - name := "spark-streaming-twitter", - libraryDependencies ++= Seq( -diff --git a/project/plugins.sbt b/project/plugins.sbt -deleted file mode 100644 -index 4ba0e42..0000000 ---- a/project/plugins.sbt -+++ /dev/null -@@ -1,18 +0,0 @@ --resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) -- --resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" -- --resolvers += "Spray Repository" at "http://repo.spray.cc/" -- --addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2") -- --addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") -- --addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1") -- --// For Sonatype publishing --//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) -- --//addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6") -- --addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.3") -diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala -deleted file mode 100644 -index 6a66bd1..0000000 ---- a/project/project/SparkPluginBuild.scala -+++ /dev/null -@@ -1,24 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --import sbt._ -- --object SparkPluginDef extends Build { -- lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener) -- /* This is not published in a Maven repository, so we get it from GitHub directly */ -- lazy val junitXmlListener = uri("git://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce") --} --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0004-removed-examples.patch b/spark-v0.9.0-0004-removed-examples.patch deleted file mode 100644 index ac75621..0000000 --- a/spark-v0.9.0-0004-removed-examples.patch +++ /dev/null @@ -1,121 +0,0 @@ -From e0d10f720e46f3c20a517079cf05642edd18e2bf Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 16:01:11 -0600 -Subject: [PATCH 4/7] removed examples - ---- - project/SparkBuild.scala | 82 ++---------------------------------------------- - 1 file changed, 3 insertions(+), 79 deletions(-) - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index d401c71..068bf74 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -85,31 +85,13 @@ object SparkBuild extends Build { - lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]() - lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]() - -- lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) -- lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) -- -- lazy val examples = Project("examples", file("examples"), settings = examplesSettings) -- .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*) -+ lazy val allExternal = Seq[ClasspathDependency]() -+ lazy val allExternalRefs = Seq[ProjectReference]() - - // Everything except assembly, tools and examples belong to packageProjects - lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef - -- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) -+ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) - - def sharedSettings = Defaults.defaultSettings ++ Seq( - organization := "org.apache.spark", -@@ -284,25 +266,6 @@ object SparkBuild extends Build { - libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) - ) - -- def examplesSettings = sharedSettings ++ Seq( -- name := "spark-examples", -- libraryDependencies ++= Seq( -- "com.twitter" %% "algebird-core" % "0.1.11", -- "org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm), -- "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), -- "org.apache.cassandra" % "cassandra-all" % "1.2.6" -- exclude("com.google.guava", "guava") -- exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") -- exclude("com.ning","compress-lzf") -- exclude("io.netty", "netty") -- exclude("jline","jline") -- exclude("log4j","log4j") -- exclude("org.apache.cassandra.deps", "avro") -- excludeAll(excludeSnappy) -- excludeAll(excludeCglib) -- ) -- ) -- - def toolsSettings = sharedSettings ++ Seq( - name := "spark-tools" - ) -@@ -368,43 +331,4 @@ object SparkBuild extends Build { - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) - ) - ) -- -- def twitterSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-twitter", -- libraryDependencies ++= Seq( -- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) -- ) -- ) -- -- def kafkaSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-kafka", -- libraryDependencies ++= Seq( -- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), -- "org.apache.kafka" %% "kafka" % "0.8.0" -- exclude("com.sun.jdmk", "jmxtools") -- exclude("com.sun.jmx", "jmxri") -- exclude("net.sf.jopt-simple", "jopt-simple") -- excludeAll(excludeNetty) -- ) -- ) -- -- def flumeSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-flume", -- libraryDependencies ++= Seq( -- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy) -- ) -- ) -- -- def zeromqSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-zeromq", -- libraryDependencies ++= Seq( -- "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) -- ) -- ) -- -- def mqttSettings() = streamingSettings ++ Seq( -- name := "spark-streaming-mqtt", -- resolvers ++= Seq("Eclipse Repo" at "https://repo.eclipse.org/content/repositories/paho-releases/"), -- libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") -- ) - } --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0005-Removed-code-depending-on-Kryo.patch b/spark-v0.9.0-0005-Removed-code-depending-on-Kryo.patch deleted file mode 100644 index 18ed918..0000000 --- a/spark-v0.9.0-0005-Removed-code-depending-on-Kryo.patch +++ /dev/null @@ -1,643 +0,0 @@ -From d5dfc0ec1e07c9eebae4fac002e0046b8d0c4f8e Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 16:43:44 -0600 -Subject: [PATCH 5/7] Removed code depending on Kryo - ---- - .../apache/spark/serializer/KryoSerializer.scala | 175 --------------------- - .../apache/spark/storage/StoragePerfTester.scala | 103 ------------ - .../org/apache/spark/storage/ThreadingTest.scala | 115 -------------- - .../util/collection/ExternalAppendOnlyMap.scala | 10 +- - .../apache/spark/graphx/GraphKryoRegistrator.scala | 48 ------ - .../apache/spark/mllib/recommendation/ALS.scala | 12 -- - .../spark/streaming/util/RawTextSender.scala | 82 ---------- - 7 files changed, 1 insertion(+), 544 deletions(-) - delete mode 100644 core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala - delete mode 100644 core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala - delete mode 100644 core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala - delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala - delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala - -diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala -deleted file mode 100644 -index c14cd47..0000000 ---- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala -+++ /dev/null -@@ -1,175 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.serializer -- --import java.nio.ByteBuffer --import java.io.{EOFException, InputStream, OutputStream} -- --import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} --import com.esotericsoftware.kryo.{KryoException, Kryo} --import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} --import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} -- --import org.apache.spark._ --import org.apache.spark.broadcast.HttpBroadcast --import org.apache.spark.scheduler.MapStatus --import org.apache.spark.storage._ --import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} -- --/** -- * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. -- */ --class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { -- private val bufferSize = { -- conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 -- } -- -- def newKryoOutput() = new KryoOutput(bufferSize) -- -- def newKryo(): Kryo = { -- val instantiator = new EmptyScalaKryoInstantiator -- val kryo = instantiator.newKryo() -- val classLoader = Thread.currentThread.getContextClassLoader -- -- // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. -- // Do this before we invoke the user registrator so the user registrator can override this. -- kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true)) -- -- for (cls <- KryoSerializer.toRegister) kryo.register(cls) -- -- // Allow sending SerializableWritable -- kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) -- kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) -- -- // Allow the user to register their own classes by setting spark.kryo.registrator -- try { -- for (regCls <- conf.getOption("spark.kryo.registrator")) { -- logDebug("Running user registrator: " + regCls) -- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] -- reg.registerClasses(kryo) -- } -- } catch { -- case e: Exception => logError("Failed to run spark.kryo.registrator", e) -- } -- -- // Register Chill's classes; we do this after our ranges and the user's own classes to let -- // our code override the generic serialziers in Chill for things like Seq -- new AllScalaRegistrar().apply(kryo) -- -- kryo.setClassLoader(classLoader) -- kryo -- } -- -- def newInstance(): SerializerInstance = { -- new KryoSerializerInstance(this) -- } --} -- --private[spark] --class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { -- val output = new KryoOutput(outStream) -- -- def writeObject[T](t: T): SerializationStream = { -- kryo.writeClassAndObject(output, t) -- this -- } -- -- def flush() { output.flush() } -- def close() { output.close() } --} -- --private[spark] --class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { -- val input = new KryoInput(inStream) -- -- def readObject[T](): T = { -- try { -- kryo.readClassAndObject(input).asInstanceOf[T] -- } catch { -- // DeserializationStream uses the EOF exception to indicate stopping condition. -- case _: KryoException => throw new EOFException -- } -- } -- -- def close() { -- // Kryo's Input automatically closes the input stream it is using. -- input.close() -- } --} -- --private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { -- val kryo = ks.newKryo() -- -- // Make these lazy vals to avoid creating a buffer unless we use them -- lazy val output = ks.newKryoOutput() -- lazy val input = new KryoInput() -- -- def serialize[T](t: T): ByteBuffer = { -- output.clear() -- kryo.writeClassAndObject(output, t) -- ByteBuffer.wrap(output.toBytes) -- } -- -- def deserialize[T](bytes: ByteBuffer): T = { -- input.setBuffer(bytes.array) -- kryo.readClassAndObject(input).asInstanceOf[T] -- } -- -- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { -- val oldClassLoader = kryo.getClassLoader -- kryo.setClassLoader(loader) -- input.setBuffer(bytes.array) -- val obj = kryo.readClassAndObject(input).asInstanceOf[T] -- kryo.setClassLoader(oldClassLoader) -- obj -- } -- -- def serializeStream(s: OutputStream): SerializationStream = { -- new KryoSerializationStream(kryo, s) -- } -- -- def deserializeStream(s: InputStream): DeserializationStream = { -- new KryoDeserializationStream(kryo, s) -- } --} -- --/** -- * Interface implemented by clients to register their classes with Kryo when using Kryo -- * serialization. -- */ --trait KryoRegistrator { -- def registerClasses(kryo: Kryo) --} -- --private[serializer] object KryoSerializer { -- // Commonly used classes. -- private val toRegister: Seq[Class[_]] = Seq( -- ByteBuffer.allocate(1).getClass, -- classOf[StorageLevel], -- classOf[PutBlock], -- classOf[GotBlock], -- classOf[GetBlock], -- classOf[MapStatus], -- classOf[BlockManagerId], -- classOf[Array[Byte]], -- (1 to 10).getClass, -- (1 until 10).getClass, -- (1L to 10L).getClass, -- (1L until 10L).getClass -- ) --} -diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala -deleted file mode 100644 -index 40734aa..0000000 ---- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala -+++ /dev/null -@@ -1,103 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.storage -- --import java.util.concurrent.atomic.AtomicLong --import java.util.concurrent.{CountDownLatch, Executors} -- --import org.apache.spark.serializer.KryoSerializer --import org.apache.spark.SparkContext --import org.apache.spark.util.Utils -- --/** -- * Utility for micro-benchmarking shuffle write performance. -- * -- * Writes simulated shuffle output from several threads and records the observed throughput. -- */ --object StoragePerfTester { -- def main(args: Array[String]) = { -- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ -- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) -- -- /** Number of map tasks. All tasks execute concurrently. */ -- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) -- -- /** Number of reduce splits for each map task. */ -- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) -- -- val recordLength = 1000 // ~1KB records -- val totalRecords = dataSizeMb * 1000 -- val recordsPerMap = totalRecords / numMaps -- -- val writeData = "1" * recordLength -- val executor = Executors.newFixedThreadPool(numMaps) -- -- System.setProperty("spark.shuffle.compress", "false") -- System.setProperty("spark.shuffle.sync", "true") -- -- // This is only used to instantiate a BlockManager. All thread scheduling is done manually. -- val sc = new SparkContext("local[4]", "Write Tester") -- val blockManager = sc.env.blockManager -- -- def writeOutputBytes(mapId: Int, total: AtomicLong) = { -- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, -- new KryoSerializer(sc.conf)) -- val writers = shuffle.writers -- for (i <- 1 to recordsPerMap) { -- writers(i % numOutputSplits).write(writeData) -- } -- writers.map {w => -- w.commit() -- total.addAndGet(w.fileSegment().length) -- w.close() -- } -- -- shuffle.releaseWriters(true) -- } -- -- val start = System.currentTimeMillis() -- val latch = new CountDownLatch(numMaps) -- val totalBytes = new AtomicLong() -- for (task <- 1 to numMaps) { -- executor.submit(new Runnable() { -- override def run() = { -- try { -- writeOutputBytes(task, totalBytes) -- latch.countDown() -- } catch { -- case e: Exception => -- println("Exception in child thread: " + e + " " + e.getMessage) -- System.exit(1) -- } -- } -- }) -- } -- latch.await() -- val end = System.currentTimeMillis() -- val time = (end - start) / 1000.0 -- val bytesPerSecond = totalBytes.get() / time -- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong -- -- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) -- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) -- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) -- -- executor.shutdown() -- sc.stop() -- } --} -diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala -deleted file mode 100644 -index 729ba2c..0000000 ---- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala -+++ /dev/null -@@ -1,115 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.storage -- --import akka.actor._ -- --import java.util.concurrent.ArrayBlockingQueue --import util.Random --import org.apache.spark.serializer.KryoSerializer --import org.apache.spark.{SparkConf, SparkContext} -- --/** -- * This class tests the BlockManager and MemoryStore for thread safety and -- * deadlocks. It spawns a number of producer and consumer threads. Producer -- * threads continuously pushes blocks into the BlockManager and consumer -- * threads continuously retrieves the blocks form the BlockManager and tests -- * whether the block is correct or not. -- */ --private[spark] object ThreadingTest { -- -- val numProducers = 5 -- val numBlocksPerProducer = 20000 -- -- private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { -- val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100) -- -- override def run() { -- for (i <- 1 to numBlocksPerProducer) { -- val blockId = TestBlockId("b-" + id + "-" + i) -- val blockSize = Random.nextInt(1000) -- val block = (1 to blockSize).map(_ => Random.nextInt()) -- val level = randomLevel() -- val startTime = System.currentTimeMillis() -- manager.put(blockId, block.iterator, level, true) -- println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") -- queue.add((blockId, block)) -- } -- println("Producer thread " + id + " terminated") -- } -- -- def randomLevel(): StorageLevel = { -- math.abs(Random.nextInt()) % 4 match { -- case 0 => StorageLevel.MEMORY_ONLY -- case 1 => StorageLevel.MEMORY_ONLY_SER -- case 2 => StorageLevel.MEMORY_AND_DISK -- case 3 => StorageLevel.MEMORY_AND_DISK_SER -- } -- } -- } -- -- private[spark] class ConsumerThread( -- manager: BlockManager, -- queue: ArrayBlockingQueue[(BlockId, Seq[Int])] -- ) extends Thread { -- var numBlockConsumed = 0 -- -- override def run() { -- println("Consumer thread started") -- while(numBlockConsumed < numBlocksPerProducer) { -- val (blockId, block) = queue.take() -- val startTime = System.currentTimeMillis() -- manager.get(blockId) match { -- case Some(retrievedBlock) => -- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, -- "Block " + blockId + " did not match") -- println("Got block " + blockId + " in " + -- (System.currentTimeMillis - startTime) + " ms") -- case None => -- assert(false, "Block " + blockId + " could not be retrieved") -- } -- numBlockConsumed += 1 -- } -- println("Consumer thread terminated") -- } -- } -- -- def main(args: Array[String]) { -- System.setProperty("spark.kryoserializer.buffer.mb", "1") -- val actorSystem = ActorSystem("test") -- val conf = new SparkConf() -- val serializer = new KryoSerializer(conf) -- val blockManagerMaster = new BlockManagerMaster( -- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) -- val blockManager = new BlockManager( -- "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) -- val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) -- val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) -- producers.foreach(_.start) -- consumers.foreach(_.start) -- producers.foreach(_.join) -- consumers.foreach(_.join) -- blockManager.stop() -- blockManagerMaster.stop() -- actorSystem.shutdown() -- actorSystem.awaitTermination() -- println("Everything stopped.") -- println( -- "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") -- } --} -diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -index 3d9b09e..63ec782 100644 ---- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -@@ -27,7 +27,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream - - import org.apache.spark.{Logging, SparkEnv} - import org.apache.spark.io.LZFCompressionCodec --import org.apache.spark.serializer.{KryoDeserializationStream, Serializer} -+import org.apache.spark.serializer.Serializer - import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter} - - /** -@@ -378,14 +378,6 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( - try { - if (objectsRead == serializerBatchSize) { - val newInputStream = deserializeStream match { -- case stream: KryoDeserializationStream => -- // Kryo's serializer stores an internal buffer that pre-fetches from the underlying -- // stream. We need to capture this buffer and feed it to the new serialization -- // stream so that the bytes are not lost. -- val kryoInput = stream.input -- val remainingBytes = kryoInput.limit() - kryoInput.position() -- val extraBuf = kryoInput.readBytes(remainingBytes) -- new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream) - case _ => compressedStream - } - deserializeStream = ser.deserializeStream(newInputStream) -diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala -deleted file mode 100644 -index dd380d8..0000000 ---- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala -+++ /dev/null -@@ -1,48 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.graphx -- --import com.esotericsoftware.kryo.Kryo -- --import org.apache.spark.graphx.impl._ --import org.apache.spark.serializer.KryoRegistrator --import org.apache.spark.util.collection.BitSet --import org.apache.spark.util.BoundedPriorityQueue -- --/** -- * Registers GraphX classes with Kryo for improved performance. -- */ --class GraphKryoRegistrator extends KryoRegistrator { -- -- def registerClasses(kryo: Kryo) { -- kryo.register(classOf[Edge[Object]]) -- kryo.register(classOf[MessageToPartition[Object]]) -- kryo.register(classOf[VertexBroadcastMsg[Object]]) -- kryo.register(classOf[(VertexId, Object)]) -- kryo.register(classOf[EdgePartition[Object]]) -- kryo.register(classOf[BitSet]) -- kryo.register(classOf[VertexIdToIndexMap]) -- kryo.register(classOf[VertexAttributeBlock[Object]]) -- kryo.register(classOf[PartitionStrategy]) -- kryo.register(classOf[BoundedPriorityQueue[Object]]) -- kryo.register(classOf[EdgeDirection]) -- -- // This avoids a large number of hash table lookups. -- kryo.setReferences(false) -- } --} -diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala -index 89ee070..66d581d 100644 ---- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala -+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala -@@ -25,10 +25,8 @@ import org.apache.spark.broadcast.Broadcast - import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf} - import org.apache.spark.storage.StorageLevel - import org.apache.spark.rdd.RDD --import org.apache.spark.serializer.KryoRegistrator - import org.apache.spark.SparkContext._ - --import com.esotericsoftware.kryo.Kryo - import org.jblas.{DoubleMatrix, SimpleBlas, Solve} - - -@@ -560,12 +558,6 @@ object ALS { - trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) - } - -- private class ALSRegistrator extends KryoRegistrator { -- override def registerClasses(kryo: Kryo) { -- kryo.register(classOf[Rating]) -- } -- } -- - def main(args: Array[String]) { - if (args.length < 5 || args.length > 9) { - println("Usage: ALS " + -@@ -579,10 +571,6 @@ object ALS { - val alpha = if (args.length >= 8) args(7).toDouble else 1 - val blocks = if (args.length == 9) args(8).toInt else -1 - val conf = new SparkConf() -- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") -- .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) -- .set("spark.kryo.referenceTracking", "false") -- .set("spark.kryoserializer.buffer.mb", "8") - .set("spark.locality.wait", "10000") - val sc = new SparkContext(master, "ALS", conf) - -diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala -deleted file mode 100644 -index 684b38e..0000000 ---- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala -+++ /dev/null -@@ -1,82 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.streaming.util -- --import java.io.IOException --import java.net.ServerSocket --import java.nio.ByteBuffer -- --import scala.io.Source -- --import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -- --import org.apache.spark.{SparkConf, Logging} --import org.apache.spark.serializer.KryoSerializer --import org.apache.spark.util.IntParam -- --/** -- * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a -- * specified rate. Used to feed data into RawInputDStream. -- */ --private[streaming] --object RawTextSender extends Logging { -- def main(args: Array[String]) { -- if (args.length != 4) { -- System.err.println("Usage: RawTextSender ") -- System.exit(1) -- } -- // Parse the arguments using a pattern match -- val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args -- -- // Repeat the input data multiple times to fill in a buffer -- val lines = Source.fromFile(file).getLines().toArray -- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) -- val ser = new KryoSerializer(new SparkConf()).newInstance() -- val serStream = ser.serializeStream(bufferStream) -- var i = 0 -- while (bufferStream.position < blockSize) { -- serStream.writeObject(lines(i)) -- i = (i + 1) % lines.length -- } -- bufferStream.trim() -- val array = bufferStream.array -- -- val countBuf = ByteBuffer.wrap(new Array[Byte](4)) -- countBuf.putInt(array.length) -- countBuf.flip() -- -- val serverSocket = new ServerSocket(port) -- logInfo("Listening on port " + port) -- -- while (true) { -- val socket = serverSocket.accept() -- logInfo("Got a new connection") -- val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) -- try { -- while (true) { -- out.write(countBuf.array) -- out.write(array) -- } -- } catch { -- case e: IOException => -- logError("Client disconnected") -- socket.close() -- } -- } -- } --} --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0006-Remove-functionality-depending-on-stream-lib.patch b/spark-v0.9.0-0006-Remove-functionality-depending-on-stream-lib.patch deleted file mode 100644 index cbc9a44..0000000 --- a/spark-v0.9.0-0006-Remove-functionality-depending-on-stream-lib.patch +++ /dev/null @@ -1,249 +0,0 @@ -From 324784a03fe0c1d8f8a9fd9ecca60b07b4e867d7 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 16:50:36 -0600 -Subject: [PATCH 6/7] Remove functionality depending on stream-lib. - -Most notably, countApproxDistinctByKey ---- - .../org/apache/spark/api/java/JavaPairRDD.scala | 36 ---------------- - .../org/apache/spark/api/java/JavaRDDLike.scala | 10 ----- - .../org/apache/spark/rdd/PairRDDFunctions.scala | 42 ------------------ - core/src/main/scala/org/apache/spark/rdd/RDD.scala | 16 +------ - .../spark/util/SerializableHyperLogLog.scala | 50 ---------------------- - 5 files changed, 1 insertion(+), 153 deletions(-) - delete mode 100644 core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala - -diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala -index f430a33..348ef04 100644 ---- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala -+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala -@@ -611,42 +611,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K - */ - def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2)) - -- /** -- * Return approximate number of distinct values for each key in this RDD. -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. Uses the provided -- * Partitioner to partition the output RDD. -- */ -- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { -- rdd.countApproxDistinctByKey(relativeSD, partitioner) -- } -- -- /** -- * Return approximate number of distinct values for each key this RDD. -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. The default value of -- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism -- * level. -- */ -- def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { -- rdd.countApproxDistinctByKey(relativeSD) -- } -- -- -- /** -- * Return approximate number of distinct values for each key in this RDD. -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the -- * output RDD into numPartitions. -- * -- */ -- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { -- rdd.countApproxDistinctByKey(relativeSD, numPartitions) -- } -- - /** Assign a name to this RDD */ - def setName(name: String): JavaPairRDD[K, V] = { - rdd.setName(name) -diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala -index ebbbbd8..98834f7 100644 ---- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala -+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala -@@ -450,15 +450,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { - takeOrdered(num, comp) - } - -- /** -- * Return approximate number of distinct elements in the RDD. -- * -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. The default value of -- * relativeSD is 0.05. -- */ -- def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) -- - def name(): String = rdd.name - } -diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala -index 4148581..93190ed 100644 ---- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala -+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala -@@ -38,8 +38,6 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} - import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} - import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} - --import com.clearspring.analytics.stream.cardinality.HyperLogLog -- - // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. - import org.apache.hadoop.mapred.SparkHadoopWriter - import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil -@@ -47,7 +45,6 @@ import org.apache.spark._ - import org.apache.spark.SparkContext._ - import org.apache.spark.partial.{BoundedDouble, PartialResult} - import org.apache.spark.Partitioner.defaultPartitioner --import org.apache.spark.util.SerializableHyperLogLog - - /** - * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. -@@ -210,45 +207,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) - } - - /** -- * Return approximate number of distinct values for each key in this RDD. -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. Uses the provided -- * Partitioner to partition the output RDD. -- */ -- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { -- val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) -- val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) -- val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) -- -- combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) -- } -- -- /** -- * Return approximate number of distinct values for each key in this RDD. -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the -- * output RDD into numPartitions. -- * -- */ -- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { -- countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) -- } -- -- /** -- * Return approximate number of distinct values for each key this RDD. -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. The default value of -- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism -- * level. -- */ -- def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { -- countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) -- } -- -- /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. -diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala -index cd90a15..1bdb80d 100644 ---- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala -+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala -@@ -32,7 +32,6 @@ import org.apache.hadoop.io.Text - import org.apache.hadoop.mapred.TextOutputFormat - - import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} --import com.clearspring.analytics.stream.cardinality.HyperLogLog - - import org.apache.spark.Partitioner._ - import org.apache.spark.api.java.JavaRDD -@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator - import org.apache.spark.partial.GroupedCountEvaluator - import org.apache.spark.partial.PartialResult - import org.apache.spark.storage.StorageLevel --import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog} -+import org.apache.spark.util.{Utils, BoundedPriorityQueue} - - import org.apache.spark.SparkContext._ - import org.apache.spark._ -@@ -798,19 +797,6 @@ abstract class RDD[T: ClassTag]( - } - - /** -- * Return approximate number of distinct elements in the RDD. -- * -- * The accuracy of approximation can be controlled through the relative standard deviation -- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in -- * more accurate counts but increase the memory footprint and vise versa. The default value of -- * relativeSD is 0.05. -- */ -- def countApproxDistinct(relativeSD: Double = 0.05): Long = { -- val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) -- aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() -- } -- -- /** - * Take the first num elements of the RDD. It works by first scanning one partition, and use the - * results from that partition to estimate the number of additional partitions needed to satisfy - * the limit. -diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala -deleted file mode 100644 -index 8b4e7c1..0000000 ---- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala -+++ /dev/null -@@ -1,50 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.util -- --import java.io.{Externalizable, ObjectOutput, ObjectInput} --import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog} -- --/** -- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable. -- */ --private[spark] --class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { -- -- def this() = this(null) // For deserialization -- -- def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) -- -- def add[T](elem: T) = { -- this.value.offer(elem) -- this -- } -- -- def readExternal(in: ObjectInput) { -- val byteLength = in.readInt() -- val bytes = new Array[Byte](byteLength) -- in.readFully(bytes) -- value = HyperLogLog.Builder.build(bytes) -- } -- -- def writeExternal(out: ObjectOutput) { -- val bytes = value.getBytes() -- out.writeInt(bytes.length) -- out.write(bytes) -- } --} --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0007-Removed-mesos.patch b/spark-v0.9.0-0007-Removed-mesos.patch deleted file mode 100644 index 5241ae9..0000000 --- a/spark-v0.9.0-0007-Removed-mesos.patch +++ /dev/null @@ -1,851 +0,0 @@ -From c321a448cd80331df07cda0f5f20955b3b148aac Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 17:05:12 -0600 -Subject: [PATCH 7/7] Removed mesos - ---- - .../main/scala/org/apache/spark/SparkContext.scala | 15 - - .../main/scala/org/apache/spark/TaskState.scala | 21 -- - .../spark/executor/MesosExecutorBackend.scala | 104 ------- - .../mesos/CoarseMesosSchedulerBackend.scala | 289 ----------------- - .../cluster/mesos/MesosSchedulerBackend.scala | 344 --------------------- - 5 files changed, 773 deletions(-) - delete mode 100644 core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala - delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala - delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala - -diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala -index 566472e..f3b2941 100644 ---- a/core/src/main/scala/org/apache/spark/SparkContext.scala -+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala -@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence - TextInputFormat} - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} - import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} --import org.apache.mesos.MesosNativeLibrary - - import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} - import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} -@@ -44,7 +43,6 @@ import org.apache.spark.rdd._ - import org.apache.spark.scheduler._ - import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, - SparkDeploySchedulerBackend, SimrSchedulerBackend} --import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} - import org.apache.spark.scheduler.local.LocalBackend - import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} - import org.apache.spark.ui.SparkUI -@@ -1281,19 +1279,6 @@ object SparkContext { - scheduler.initialize(backend) - scheduler - -- case mesosUrl @ MESOS_REGEX(_) => -- MesosNativeLibrary.load() -- val scheduler = new TaskSchedulerImpl(sc) -- val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) -- val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs -- val backend = if (coarseGrained) { -- new CoarseMesosSchedulerBackend(scheduler, sc, url, appName) -- } else { -- new MesosSchedulerBackend(scheduler, sc, url, appName) -- } -- scheduler.initialize(backend) -- scheduler -- - case SIMR_REGEX(simrUrl) => - val scheduler = new TaskSchedulerImpl(sc) - val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl) -diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala -index 0bf1e4a..cdd8baf 100644 ---- a/core/src/main/scala/org/apache/spark/TaskState.scala -+++ b/core/src/main/scala/org/apache/spark/TaskState.scala -@@ -17,8 +17,6 @@ - - package org.apache.spark - --import org.apache.mesos.Protos.{TaskState => MesosTaskState} -- - private[spark] object TaskState extends Enumeration { - - val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value -@@ -28,23 +26,4 @@ private[spark] object TaskState extends Enumeration { - type TaskState = Value - - def isFinished(state: TaskState) = FINISHED_STATES.contains(state) -- -- def toMesos(state: TaskState): MesosTaskState = state match { -- case LAUNCHING => MesosTaskState.TASK_STARTING -- case RUNNING => MesosTaskState.TASK_RUNNING -- case FINISHED => MesosTaskState.TASK_FINISHED -- case FAILED => MesosTaskState.TASK_FAILED -- case KILLED => MesosTaskState.TASK_KILLED -- case LOST => MesosTaskState.TASK_LOST -- } -- -- def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { -- case MesosTaskState.TASK_STAGING => LAUNCHING -- case MesosTaskState.TASK_STARTING => LAUNCHING -- case MesosTaskState.TASK_RUNNING => RUNNING -- case MesosTaskState.TASK_FINISHED => FINISHED -- case MesosTaskState.TASK_FAILED => FAILED -- case MesosTaskState.TASK_KILLED => KILLED -- case MesosTaskState.TASK_LOST => LOST -- } - } -diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala -deleted file mode 100644 -index b56d8c9..0000000 ---- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala -+++ /dev/null -@@ -1,104 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.executor -- --import java.nio.ByteBuffer -- --import com.google.protobuf.ByteString -- --import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver} --import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -- --import org.apache.spark.Logging --import org.apache.spark.TaskState --import org.apache.spark.TaskState.TaskState --import org.apache.spark.util.Utils -- -- --private[spark] class MesosExecutorBackend -- extends MesosExecutor -- with ExecutorBackend -- with Logging { -- -- var executor: Executor = null -- var driver: ExecutorDriver = null -- -- override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { -- val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() -- driver.sendStatusUpdate(MesosTaskStatus.newBuilder() -- .setTaskId(mesosTaskId) -- .setState(TaskState.toMesos(state)) -- .setData(ByteString.copyFrom(data)) -- .build()) -- } -- -- override def registered( -- driver: ExecutorDriver, -- executorInfo: ExecutorInfo, -- frameworkInfo: FrameworkInfo, -- slaveInfo: SlaveInfo) { -- logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) -- this.driver = driver -- val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) -- executor = new Executor( -- executorInfo.getExecutorId.getValue, -- slaveInfo.getHostname, -- properties) -- } -- -- override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { -- val taskId = taskInfo.getTaskId.getValue.toLong -- if (executor == null) { -- logError("Received launchTask but executor was null") -- } else { -- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) -- } -- } -- -- override def error(d: ExecutorDriver, message: String) { -- logError("Error from Mesos: " + message) -- } -- -- override def killTask(d: ExecutorDriver, t: TaskID) { -- if (executor == null) { -- logError("Received KillTask but executor was null") -- } else { -- executor.killTask(t.getValue.toLong) -- } -- } -- -- override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} -- -- override def disconnected(d: ExecutorDriver) {} -- -- override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} -- -- override def shutdown(d: ExecutorDriver) {} --} -- --/** -- * Entry point for Mesos executor. -- */ --private[spark] object MesosExecutorBackend { -- def main(args: Array[String]) { -- MesosNativeLibrary.load() -- // Create a new Executor and start it running -- val runner = new MesosExecutorBackend() -- new MesosExecutorDriver(runner).run() -- } --} -diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala -deleted file mode 100644 -index c27049b..0000000 ---- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala -+++ /dev/null -@@ -1,289 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.scheduler.cluster.mesos -- --import java.io.File --import java.util.{ArrayList => JArrayList, List => JList} --import java.util.Collections -- --import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} --import scala.collection.JavaConversions._ -- --import com.google.protobuf.ByteString --import org.apache.mesos.{Scheduler => MScheduler} --import org.apache.mesos._ --import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -- --import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} --import org.apache.spark.scheduler.TaskSchedulerImpl --import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -- --/** -- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds -- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever -- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the -- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable -- * latency. -- * -- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to -- * remove this. -- */ --private[spark] class CoarseMesosSchedulerBackend( -- scheduler: TaskSchedulerImpl, -- sc: SparkContext, -- master: String, -- appName: String) -- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) -- with MScheduler -- with Logging { -- -- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures -- -- // Lock used to wait for scheduler to be registered -- var isRegistered = false -- val registeredLock = new Object() -- -- // Driver for talking to Mesos -- var driver: SchedulerDriver = null -- -- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) -- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt -- -- // Cores we have acquired with each Mesos task ID -- val coresByTaskId = new HashMap[Int, Int] -- var totalCoresAcquired = 0 -- -- val slaveIdsWithExecutors = new HashSet[String] -- -- val taskIdToSlaveId = new HashMap[Int, String] -- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed -- -- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( -- "Spark home is not set; set it through the spark.home system " + -- "property, the SPARK_HOME environment variable or the SparkContext constructor")) -- -- val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) -- -- var nextMesosTaskId = 0 -- -- def newMesosTaskId(): Int = { -- val id = nextMesosTaskId -- nextMesosTaskId += 1 -- id -- } -- -- override def start() { -- super.start() -- -- synchronized { -- new Thread("CoarseMesosSchedulerBackend driver") { -- setDaemon(true) -- override def run() { -- val scheduler = CoarseMesosSchedulerBackend.this -- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() -- driver = new MesosSchedulerDriver(scheduler, fwInfo, master) -- try { { -- val ret = driver.run() -- logInfo("driver.run() returned with code " + ret) -- } -- } catch { -- case e: Exception => logError("driver.run() failed", e) -- } -- } -- }.start() -- -- waitForRegister() -- } -- } -- -- def createCommand(offer: Offer, numCores: Int): CommandInfo = { -- val environment = Environment.newBuilder() -- sc.executorEnvs.foreach { case (key, value) => -- environment.addVariables(Environment.Variable.newBuilder() -- .setName(key) -- .setValue(value) -- .build()) -- } -- val command = CommandInfo.newBuilder() -- .setEnvironment(environment) -- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( -- conf.get("spark.driver.host"), -- conf.get("spark.driver.port"), -- CoarseGrainedSchedulerBackend.ACTOR_NAME) -- val uri = conf.get("spark.executor.uri", null) -- if (uri == null) { -- val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath -- command.setValue( -- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( -- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) -- } else { -- // Grab everything to the first '.'. We'll use that and '*' to -- // glob the directory "correctly". -- val basename = uri.split('/').last.split('.').head -- command.setValue( -- "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d" -- .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) -- command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) -- } -- command.build() -- } -- -- override def offerRescinded(d: SchedulerDriver, o: OfferID) {} -- -- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { -- logInfo("Registered as framework ID " + frameworkId.getValue) -- registeredLock.synchronized { -- isRegistered = true -- registeredLock.notifyAll() -- } -- } -- -- def waitForRegister() { -- registeredLock.synchronized { -- while (!isRegistered) { -- registeredLock.wait() -- } -- } -- } -- -- override def disconnected(d: SchedulerDriver) {} -- -- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} -- -- /** -- * Method called by Mesos to offer resources on slaves. We respond by launching an executor, -- * unless we've already launched more than we wanted to. -- */ -- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { -- synchronized { -- val filters = Filters.newBuilder().setRefuseSeconds(-1).build() -- -- for (offer <- offers) { -- val slaveId = offer.getSlaveId.toString -- val mem = getResource(offer.getResourcesList, "mem") -- val cpus = getResource(offer.getResourcesList, "cpus").toInt -- if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 && -- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && -- !slaveIdsWithExecutors.contains(slaveId)) { -- // Launch an executor on the slave -- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -- totalCoresAcquired += cpusToUse -- val taskId = newMesosTaskId() -- taskIdToSlaveId(taskId) = slaveId -- slaveIdsWithExecutors += slaveId -- coresByTaskId(taskId) = cpusToUse -- val task = MesosTaskInfo.newBuilder() -- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) -- .setSlaveId(offer.getSlaveId) -- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) -- .setName("Task " + taskId) -- .addResources(createResource("cpus", cpusToUse)) -- .addResources(createResource("mem", sc.executorMemory)) -- .build() -- d.launchTasks(offer.getId, Collections.singletonList(task), filters) -- } else { -- // Filter it out -- d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters) -- } -- } -- } -- } -- -- /** Helper function to pull out a resource from a Mesos Resources protobuf */ -- private def getResource(res: JList[Resource], name: String): Double = { -- for (r <- res if r.getName == name) { -- return r.getScalar.getValue -- } -- // If we reached here, no resource with the required name was present -- throw new IllegalArgumentException("No resource called " + name + " in " + res) -- } -- -- /** Build a Mesos resource protobuf object */ -- private def createResource(resourceName: String, quantity: Double): Protos.Resource = { -- Resource.newBuilder() -- .setName(resourceName) -- .setType(Value.Type.SCALAR) -- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) -- .build() -- } -- -- /** Check whether a Mesos task state represents a finished task */ -- private def isFinished(state: MesosTaskState) = { -- state == MesosTaskState.TASK_FINISHED || -- state == MesosTaskState.TASK_FAILED || -- state == MesosTaskState.TASK_KILLED || -- state == MesosTaskState.TASK_LOST -- } -- -- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { -- val taskId = status.getTaskId.getValue.toInt -- val state = status.getState -- logInfo("Mesos task " + taskId + " is now " + state) -- synchronized { -- if (isFinished(state)) { -- val slaveId = taskIdToSlaveId(taskId) -- slaveIdsWithExecutors -= slaveId -- taskIdToSlaveId -= taskId -- // Remove the cores we have remembered for this task, if it's in the hashmap -- for (cores <- coresByTaskId.get(taskId)) { -- totalCoresAcquired -= cores -- coresByTaskId -= taskId -- } -- // If it was a failure, mark the slave as failed for blacklisting purposes -- if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) { -- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 -- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { -- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + -- "is Spark installed on it?") -- } -- } -- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node -- } -- } -- } -- -- override def error(d: SchedulerDriver, message: String) { -- logError("Mesos error: " + message) -- scheduler.error(message) -- } -- -- override def stop() { -- super.stop() -- if (driver != null) { -- driver.stop() -- } -- } -- -- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} -- -- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { -- logInfo("Mesos slave lost: " + slaveId.getValue) -- synchronized { -- if (slaveIdsWithExecutors.contains(slaveId.getValue)) { -- // Note that the slave ID corresponds to the executor ID on that slave -- slaveIdsWithExecutors -= slaveId.getValue -- removeExecutor(slaveId.getValue, "Mesos slave lost") -- } -- } -- } -- -- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { -- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) -- slaveLost(d, s) -- } --} -diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala -deleted file mode 100644 -index 4978148..0000000 ---- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala -+++ /dev/null -@@ -1,344 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.scheduler.cluster.mesos -- --import java.io.File --import java.util.{ArrayList => JArrayList, List => JList} --import java.util.Collections -- --import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} --import scala.collection.JavaConversions._ -- --import com.google.protobuf.ByteString --import org.apache.mesos.{Scheduler => MScheduler} --import org.apache.mesos._ --import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -- --import org.apache.spark.{Logging, SparkException, SparkContext, TaskState} --import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, -- TaskDescription, TaskSchedulerImpl, WorkerOffer} --import org.apache.spark.util.Utils -- --/** -- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a -- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks -- * from multiple apps can run on different cores) and in time (a core can switch ownership). -- */ --private[spark] class MesosSchedulerBackend( -- scheduler: TaskSchedulerImpl, -- sc: SparkContext, -- master: String, -- appName: String) -- extends SchedulerBackend -- with MScheduler -- with Logging { -- -- // Lock used to wait for scheduler to be registered -- var isRegistered = false -- val registeredLock = new Object() -- -- // Driver for talking to Mesos -- var driver: SchedulerDriver = null -- -- // Which slave IDs we have executors on -- val slaveIdsWithExecutors = new HashSet[String] -- val taskIdToSlaveId = new HashMap[Long, String] -- -- // An ExecutorInfo for our tasks -- var execArgs: Array[Byte] = null -- -- var classLoader: ClassLoader = null -- -- override def start() { -- synchronized { -- classLoader = Thread.currentThread.getContextClassLoader -- -- new Thread("MesosSchedulerBackend driver") { -- setDaemon(true) -- override def run() { -- val scheduler = MesosSchedulerBackend.this -- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() -- driver = new MesosSchedulerDriver(scheduler, fwInfo, master) -- try { -- val ret = driver.run() -- logInfo("driver.run() returned with code " + ret) -- } catch { -- case e: Exception => logError("driver.run() failed", e) -- } -- } -- }.start() -- -- waitForRegister() -- } -- } -- -- def createExecutorInfo(execId: String): ExecutorInfo = { -- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( -- "Spark home is not set; set it through the spark.home system " + -- "property, the SPARK_HOME environment variable or the SparkContext constructor")) -- val environment = Environment.newBuilder() -- sc.executorEnvs.foreach { case (key, value) => -- environment.addVariables(Environment.Variable.newBuilder() -- .setName(key) -- .setValue(value) -- .build()) -- } -- val command = CommandInfo.newBuilder() -- .setEnvironment(environment) -- val uri = sc.conf.get("spark.executor.uri", null) -- if (uri == null) { -- command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath) -- } else { -- // Grab everything to the first '.'. We'll use that and '*' to -- // glob the directory "correctly". -- val basename = uri.split('/').last.split('.').head -- command.setValue("cd %s*; ./sbin/spark-executor".format(basename)) -- command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) -- } -- val memory = Resource.newBuilder() -- .setName("mem") -- .setType(Value.Type.SCALAR) -- .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build()) -- .build() -- ExecutorInfo.newBuilder() -- .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) -- .setCommand(command) -- .setData(ByteString.copyFrom(createExecArg())) -- .addResources(memory) -- .build() -- } -- -- /** -- * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array -- * containing all the spark.* system properties in the form of (String, String) pairs. -- */ -- private def createExecArg(): Array[Byte] = { -- if (execArgs == null) { -- val props = new HashMap[String, String] -- val iterator = System.getProperties.entrySet.iterator -- while (iterator.hasNext) { -- val entry = iterator.next -- val (key, value) = (entry.getKey.toString, entry.getValue.toString) -- if (key.startsWith("spark.")) { -- props(key) = value -- } -- } -- // Serialize the map as an array of (String, String) pairs -- execArgs = Utils.serialize(props.toArray) -- } -- execArgs -- } -- -- private def setClassLoader(): ClassLoader = { -- val oldClassLoader = Thread.currentThread.getContextClassLoader -- Thread.currentThread.setContextClassLoader(classLoader) -- oldClassLoader -- } -- -- private def restoreClassLoader(oldClassLoader: ClassLoader) { -- Thread.currentThread.setContextClassLoader(oldClassLoader) -- } -- -- override def offerRescinded(d: SchedulerDriver, o: OfferID) {} -- -- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { -- val oldClassLoader = setClassLoader() -- try { -- logInfo("Registered as framework ID " + frameworkId.getValue) -- registeredLock.synchronized { -- isRegistered = true -- registeredLock.notifyAll() -- } -- } finally { -- restoreClassLoader(oldClassLoader) -- } -- } -- -- def waitForRegister() { -- registeredLock.synchronized { -- while (!isRegistered) { -- registeredLock.wait() -- } -- } -- } -- -- override def disconnected(d: SchedulerDriver) {} -- -- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} -- -- /** -- * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets -- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that -- * tasks are balanced across the cluster. -- */ -- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { -- val oldClassLoader = setClassLoader() -- try { -- synchronized { -- // Build a big list of the offerable workers, and remember their indices so that we can -- // figure out which Offer to reply to for each worker -- val offerableIndices = new ArrayBuffer[Int] -- val offerableWorkers = new ArrayBuffer[WorkerOffer] -- -- def enoughMemory(o: Offer) = { -- val mem = getResource(o.getResourcesList, "mem") -- val slaveId = o.getSlaveId.getValue -- mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) -- } -- -- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { -- offerableIndices += index -- offerableWorkers += new WorkerOffer( -- offer.getSlaveId.getValue, -- offer.getHostname, -- getResource(offer.getResourcesList, "cpus").toInt) -- } -- -- // Call into the ClusterScheduler -- val taskLists = scheduler.resourceOffers(offerableWorkers) -- -- // Build a list of Mesos tasks for each slave -- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) -- for ((taskList, index) <- taskLists.zipWithIndex) { -- if (!taskList.isEmpty) { -- val offerNum = offerableIndices(index) -- val slaveId = offers(offerNum).getSlaveId.getValue -- slaveIdsWithExecutors += slaveId -- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) -- for (taskDesc <- taskList) { -- taskIdToSlaveId(taskDesc.taskId) = slaveId -- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) -- } -- } -- } -- -- // Reply to the offers -- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? -- for (i <- 0 until offers.size) { -- d.launchTasks(offers(i).getId, mesosTasks(i), filters) -- } -- } -- } finally { -- restoreClassLoader(oldClassLoader) -- } -- } -- -- /** Helper function to pull out a resource from a Mesos Resources protobuf */ -- def getResource(res: JList[Resource], name: String): Double = { -- for (r <- res if r.getName == name) { -- return r.getScalar.getValue -- } -- // If we reached here, no resource with the required name was present -- throw new IllegalArgumentException("No resource called " + name + " in " + res) -- } -- -- /** Turn a Spark TaskDescription into a Mesos task */ -- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { -- val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() -- val cpuResource = Resource.newBuilder() -- .setName("cpus") -- .setType(Value.Type.SCALAR) -- .setScalar(Value.Scalar.newBuilder().setValue(1).build()) -- .build() -- MesosTaskInfo.newBuilder() -- .setTaskId(taskId) -- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) -- .setExecutor(createExecutorInfo(slaveId)) -- .setName(task.name) -- .addResources(cpuResource) -- .setData(ByteString.copyFrom(task.serializedTask)) -- .build() -- } -- -- /** Check whether a Mesos task state represents a finished task */ -- def isFinished(state: MesosTaskState) = { -- state == MesosTaskState.TASK_FINISHED || -- state == MesosTaskState.TASK_FAILED || -- state == MesosTaskState.TASK_KILLED || -- state == MesosTaskState.TASK_LOST -- } -- -- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { -- val oldClassLoader = setClassLoader() -- try { -- val tid = status.getTaskId.getValue.toLong -- val state = TaskState.fromMesos(status.getState) -- synchronized { -- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { -- // We lost the executor on this slave, so remember that it's gone -- slaveIdsWithExecutors -= taskIdToSlaveId(tid) -- } -- if (isFinished(status.getState)) { -- taskIdToSlaveId.remove(tid) -- } -- } -- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) -- } finally { -- restoreClassLoader(oldClassLoader) -- } -- } -- -- override def error(d: SchedulerDriver, message: String) { -- val oldClassLoader = setClassLoader() -- try { -- logError("Mesos error: " + message) -- scheduler.error(message) -- } finally { -- restoreClassLoader(oldClassLoader) -- } -- } -- -- override def stop() { -- if (driver != null) { -- driver.stop() -- } -- } -- -- override def reviveOffers() { -- driver.reviveOffers() -- } -- -- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} -- -- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { -- val oldClassLoader = setClassLoader() -- try { -- logInfo("Mesos slave lost: " + slaveId.getValue) -- synchronized { -- slaveIdsWithExecutors -= slaveId.getValue -- } -- scheduler.executorLost(slaveId.getValue, reason) -- } finally { -- restoreClassLoader(oldClassLoader) -- } -- } -- -- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { -- recordSlaveLost(d, slaveId, SlaveLost()) -- } -- -- override def executorLost(d: SchedulerDriver, executorId: ExecutorID, -- slaveId: SlaveID, status: Int) { -- logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, -- slaveId.getValue)) -- recordSlaveLost(d, slaveId, ExecutorExited(status)) -- } -- -- // TODO: query Mesos for number of cores -- override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) --} --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0008-remove-unavailable-and-unnecessary-deps.patch b/spark-v0.9.0-0008-remove-unavailable-and-unnecessary-deps.patch deleted file mode 100644 index c6a1533..0000000 --- a/spark-v0.9.0-0008-remove-unavailable-and-unnecessary-deps.patch +++ /dev/null @@ -1,34 +0,0 @@ ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -183,13 +183,7 @@ object SparkBuild extends Build { - "io.netty" % "netty-all" % "4.0.13.Final", - "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", - /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ -- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), -- "org.scalatest" %% "scalatest" % "1.9.1" % "test", -- "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", -- "com.novocode" % "junit-interface" % "0.9" % "test", -- "org.easymock" % "easymock" % "3.1" % "test", -- "org.mockito" % "mockito-all" % "1.8.5" % "test", -- "commons-io" % "commons-io" % "2.4" % "test" -+ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") - ), - - parallelExecution := true, -@@ -233,13 +227,11 @@ object SparkBuild extends Build { - "org.ow2.asm" % "asm" % "4.0", - "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), -- "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", - "org.json4s" %% "json4s-jackson" % "3.2.6", - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", - "org.apache.mesos" % "mesos" % "0.13.0", - "net.java.dev.jets3t" % "jets3t" % "0.7.1", -- "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), - "org.apache.avro" % "avro" % "1.7.4", - "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0009-use-Jetty-8.patch b/spark-v0.9.0-0009-use-Jetty-8.patch deleted file mode 100644 index 6f79761..0000000 --- a/spark-v0.9.0-0009-use-Jetty-8.patch +++ /dev/null @@ -1,27 +0,0 @@ -From cbc6b801cbf717ecab12f6cbad48e8bfd60c5e82 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Fri, 28 Feb 2014 15:16:45 -0600 -Subject: [PATCH 1/2] use Jetty 8 - ---- - project/SparkBuild.scala | 4 +--- - 1 file changed, 1 insertion(+), 3 deletions(-) - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 956312f..63f5297 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -181,9 +181,7 @@ object SparkBuild extends Build { - - libraryDependencies ++= Seq( - "io.netty" % "netty-all" % "4.0.13.Final", -- "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", -- /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ -- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") -+ "org.eclipse.jetty" % "jetty-server" % "8.1.14.v20131031" - ), - - parallelExecution := true, --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0010-use-Akka-2.3.0-RC2.patch b/spark-v0.9.0-0010-use-Akka-2.3.0-RC2.patch deleted file mode 100644 index d0cd68f..0000000 --- a/spark-v0.9.0-0010-use-Akka-2.3.0-RC2.patch +++ /dev/null @@ -1,83 +0,0 @@ -From 474037191b66da67aef3d2038e85448294d848c1 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Fri, 28 Feb 2014 15:31:52 -0600 -Subject: [PATCH 2/2] use Akka 2.3.0-RC2 - ---- - core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- - core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- - .../src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala | 2 +- - .../main/scala/org/apache/spark/util/IndestructibleActorSystem.scala | 2 +- - project/SparkBuild.scala | 4 ++-- - 5 files changed, 6 insertions(+), 6 deletions(-) - -diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala -index 9987e23..7fda886 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/Client.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala -@@ -116,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") - System.exit(-1) - -- case AssociationErrorEvent(cause, _, remoteAddress, _) => -+ case AssociationErrorEvent(cause, _, remoteAddress, _, _) => - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") - println(s"Cause was: $cause") - System.exit(-1) -diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -index 1415e2f..8d732db 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -@@ -153,7 +153,7 @@ private[spark] class AppClient( - logWarning(s"Connection to $address failed; waiting for master to reconnect...") - markDisconnected() - -- case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => -+ case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) => - logWarning(s"Could not connect to $address: $cause") - - case StopAppClient => -diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala -index 1dc39c4..732a1d7 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala -@@ -52,7 +52,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor - case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => - logInfo(s"Successfully connected to $workerUrl") - -- case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound) -+ case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _) - if isWorker(remoteAddress) => - // These logs may not be seen if the worker (and associated pipe) has died - logError(s"Could not initialize connection to worker $workerUrl. Exiting.") -diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala -index bf71882..08d703e 100644 ---- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala -+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala -@@ -39,7 +39,7 @@ private[akka] class IndestructibleActorSystemImpl( - override val name: String, - applicationConfig: Config, - classLoader: ClassLoader) -- extends ActorSystemImpl(name, applicationConfig, classLoader) { -+ extends ActorSystemImpl(name, applicationConfig, classLoader, None) { - - protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { - val fallbackHandler = super.uncaughtExceptionHandler -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 63f5297..2bfa6b5 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -223,8 +223,8 @@ object SparkBuild extends Build { - "com.ning" % "compress-lzf" % "1.0.0", - "org.xerial.snappy" % "snappy-java" % "1.0.5", - "org.ow2.asm" % "asm" % "4.0", -- "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), -- "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), -+ "com.typesafe.akka" %% "akka-remote" % "2.3.0-RC2" excludeAll(excludeNetty), -+ "com.typesafe.akka" %% "akka-slf4j" % "2.3.0-RC2" excludeAll(excludeNetty), - "org.json4s" %% "json4s-jackson" % "3.2.6", - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.0-0011-xmvn.patch b/spark-v0.9.0-0011-xmvn.patch deleted file mode 100644 index 1426527..0000000 --- a/spark-v0.9.0-0011-xmvn.patch +++ /dev/null @@ -1,53 +0,0 @@ -From eab118d8acf17f64cb76a966715bbe8f15397da5 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Fri, 28 Feb 2014 16:39:51 -0600 -Subject: [PATCH] fedora-only resolver changes - ---- - project/SparkBuild.scala | 15 ++++----------- - 1 file changed, 4 insertions(+), 11 deletions(-) - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 2bfa6b5..895c4bb 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -93,7 +93,11 @@ object SparkBuild extends Build { - - lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) - -+ val ivyLocal = Resolver.file("local", file("IVY_LOCAL"))(Resolver.ivyStylePatterns) -+ - def sharedSettings = Defaults.defaultSettings ++ Seq( -+ externalResolvers := Seq(new sbt.RawRepository(new org.fedoraproject.maven.connector.ivy.IvyResolver), ivyLocal), -+ - organization := "org.apache.spark", - version := "0.9.0-incubating", - scalaVersion := "2.10.3", -@@ -123,13 +127,6 @@ object SparkBuild extends Build { - // Only allow one test at a time, even across projects, since they run in the same JVM - concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), - -- // also check the local Maven repository ~/.m2 -- resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), -- -- // For Sonatype publishing -- resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", -- "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), -- - publishMavenStyle := true, - - //useGpg in Global := true, -@@ -208,10 +205,6 @@ object SparkBuild extends Build { - - def coreSettings = sharedSettings ++ Seq( - name := "spark-core", -- resolvers ++= Seq( -- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", -- "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" -- ), - - libraryDependencies ++= Seq( - "com.google.guava" % "guava" % "14.0.1", --- -1.8.3.4 (Apple Git-47) -