diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-26 11:27:16 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-26 11:40:02 +0530 |
commit | 604dc409969923d1e175e2d77fb0ddcac09c9d31 (patch) | |
tree | 4c8e6bd221f333e2ccffe8ad7b85c79996e3fe7a | |
parent | 7ff4c2d399e1497966689cbe13edf2cd2a9a29b1 (diff) | |
parent | 834686b108ce31cbee531d89de6c6e80913448f4 (diff) | |
download | spark-604dc409969923d1e175e2d77fb0ddcac09c9d31.tar.gz spark-604dc409969923d1e175e2d77fb0ddcac09c9d31.tar.bz2 spark-604dc409969923d1e175e2d77fb0ddcac09c9d31.zip |
Sync with master and some build fixes
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala | 99 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/LocalSparkContext.scala | 10 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/SharedSparkContext.scala | 6 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/ThreadingSuite.scala | 45 | ||||
-rw-r--r-- | docs/_config.yml | 2 | ||||
-rw-r--r-- | docs/building-with-maven.md | 6 | ||||
-rwxr-xr-x | ec2/spark_ec2.py | 8 | ||||
-rw-r--r-- | examples/pom.xml | 2 | ||||
-rw-r--r-- | pom.xml | 3 | ||||
-rw-r--r-- | project/SparkBuild.scala | 16 | ||||
-rwxr-xr-x | spark-class | 12 |
12 files changed, 138 insertions, 78 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 882bc506b4..ef97fa85fa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -258,7 +258,9 @@ class SparkContext( private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack - private val localProperties = new ThreadLocal[Properties] + private val localProperties = new InheritableThreadLocal[Properties] { + override protected def childValue(parent: Properties): Properties = new Properties(parent) + } def initLocalProperties() { localProperties.set(new Properties()) @@ -275,6 +277,9 @@ class SparkContext( } } + def getLocalProperty(key: String): String = + Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) + /** Set a human readable description of the current job. */ def setJobDescription(value: String) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala index f80823317b..114617c51a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala @@ -17,14 +17,12 @@ package org.apache.spark.scheduler.cluster -import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException} -import java.util.Properties - -import scala.xml.XML +import java.io.{FileInputStream, InputStream} +import java.util.{NoSuchElementException, Properties} import org.apache.spark.Logging -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import scala.xml.XML /** * An interface to build Schedulable tree @@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { - val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file") + val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file")) + val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" @@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) val DEFAULT_WEIGHT = 1 override def buildPools() { - if (schedulerAllocFile != null) { - val file = new File(schedulerAllocFile) - if (file.exists()) { - val xml = XML.loadFile(file) - for (poolNode <- (xml \\ POOLS_PROPERTY)) { - - val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } catch { - case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode") - } - } - - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { - minShare = xmlMinShare.toInt - } - - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } - - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( - poolName, schedulingMode, minShare, weight)) + var is: Option[InputStream] = None + try { + is = Option { + schedulerAllocFile.map { f => + new FileInputStream(f) + }.getOrElse { + getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) } - } else { - throw new java.io.FileNotFoundException( - "Fair scheduler allocation file not found: " + schedulerAllocFile) } + + is.foreach { i => buildFairSchedulerPool(i) } + } finally { + is.foreach(_.close()) } // finally create "default" pool + buildDefaultPool() + } + + private def buildDefaultPool() { if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) @@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) } } + private def buildFairSchedulerPool(is: InputStream) { + val xml = XML.load(is) + for (poolNode <- (xml \\ POOLS_PROPERTY)) { + + val poolName = (poolNode \ POOL_NAME_PROPERTY).text + var schedulingMode = DEFAULT_SCHEDULING_MODE + var minShare = DEFAULT_MINIMUM_SHARE + var weight = DEFAULT_WEIGHT + + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text + if (xmlSchedulingMode != "") { + try { + schedulingMode = SchedulingMode.withName(xmlSchedulingMode) + } catch { + case e: NoSuchElementException => + logWarning("Error xml schedulingMode, using default schedulingMode") + } + } + + val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text + if (xmlMinShare != "") { + minShare = xmlMinShare.toInt + } + + val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text + if (xmlWeight != "") { + weight = xmlWeight.toInt + } + + val pool = new Pool(poolName, schedulingMode, minShare, weight) + rootPool.addSchedulable(pool) + logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + poolName, schedulingMode, minShare, weight)) + } + } + override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName) diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index 6ec124da9c..459e257d79 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -40,17 +40,17 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self } def resetSparkContext() = { - if (sc != null) { - LocalSparkContext.stop(sc) - sc = null - } + LocalSparkContext.stop(sc) + sc = null } } object LocalSparkContext { def stop(sc: SparkContext) { - sc.stop() + if (sc != null) { + sc.stop() + } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala index 97cbca09bf..288aa14eeb 100644 --- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala @@ -33,10 +33,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => } override def afterAll() { - if (_sc != null) { - LocalSparkContext.stop(_sc) - _sc = null - } + LocalSparkContext.stop(_sc) + _sc = null super.afterAll() } } diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 69383ddfb8..75d6493e33 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -40,7 +40,7 @@ object ThreadingSuiteState { } class ThreadingSuite extends FunSuite with LocalSparkContext { - + test("accessing SparkContext form a different thread") { sc = new SparkContext("local", "test") val nums = sc.parallelize(1 to 10, 2) @@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext { fail("One or more threads didn't see runningThreads = 4") } } + + test("set local properties in different thread") { + sc = new SparkContext("local", "test") + val sem = new Semaphore(0) + + val threads = (1 to 5).map { i => + new Thread() { + override def run() { + sc.setLocalProperty("test", i.toString) + assert(sc.getLocalProperty("test") === i.toString) + sem.release() + } + } + } + + threads.foreach(_.start()) + + sem.acquire(5) + assert(sc.getLocalProperty("test") === null) + } + + test("set and get local properties in parent-children thread") { + sc = new SparkContext("local", "test") + sc.setLocalProperty("test", "parent") + val sem = new Semaphore(0) + + val threads = (1 to 5).map { i => + new Thread() { + override def run() { + assert(sc.getLocalProperty("test") === "parent") + sc.setLocalProperty("test", i.toString) + assert(sc.getLocalProperty("test") === i.toString) + sem.release() + } + } + } + + threads.foreach(_.start()) + + sem.acquire(5) + assert(sc.getLocalProperty("test") === "parent") + assert(sc.getLocalProperty("Foo") === null) + } } diff --git a/docs/_config.yml b/docs/_config.yml index e7a96c0c70..ad851673a5 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -6,5 +6,5 @@ markdown: kramdown SPARK_VERSION: 0.8.0-SNAPSHOT SPARK_VERSION_SHORT: 0.8.0 SCALA_VERSION: 2.10 -MESOS_VERSION: 0.9.0-incubating +MESOS_VERSION: 0.13.0 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index 7ecb601ddd..19c01e179f 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -37,13 +37,13 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions wit # Cloudera CDH 4.2.0 with MapReduce v1 $ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package -For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should also enable the "hadoop2-yarn" profile: +For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "hadoop2-yarn" profile and set the "yarn.version" property: # Apache Hadoop 2.0.5-alpha - $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -DskipTests clean package + $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package # Cloudera CDH 4.2.0 with MapReduce v2 - $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -DskipTests clean package + $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package ## Spark Tests in Maven ## diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 419d0fe13f..6b7d202a88 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -364,12 +364,12 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): slave_nodes = [] for res in reservations: active = [i for i in res.instances if is_active(i)] - if len(active) > 0: - group_names = [g.name for g in res.groups] + for inst in active: + group_names = [g.name for g in inst.groups] if group_names == [cluster_name + "-master"]: - master_nodes += res.instances + master_nodes.append(inst) elif group_names == [cluster_name + "-slaves"]: - slave_nodes += res.instances + slave_nodes.append(inst) if any((master_nodes, slave_nodes)): print ("Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))) diff --git a/examples/pom.xml b/examples/pom.xml index b44a126546..ca06a9ad8d 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -93,7 +93,7 @@ <dependency> <groupId>org.apache.cassandra</groupId> <artifactId>cassandra-all</artifactId> - <version>1.2.5</version> + <version>1.2.6</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> @@ -76,9 +76,10 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.5</java.version> + <scala-short.version>2.10</scala-short.version> <scala.version>2.10.2</scala.version> - <mesos.version>0.12.1</mesos.version> + <mesos.version>0.13.0</mesos.version> <akka.version>2.2.1</akka.version> <slf4j.version>1.7.2</slf4j.version> <log4j.version>1.2.17</log4j.version> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ffa9c93925..b5e65a1856 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -81,7 +81,7 @@ object SparkBuild extends Build { organization := "org.apache.spark", version := "0.8.0-SNAPSHOT", scalaVersion := "2.10.2", -// scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION), + scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION), javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, @@ -195,7 +195,7 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "com.ning" % "compress-lzf" % "0.8.4", "org.xerial.snappy" % "snappy-java" % "1.0.5", - "commons-daemon" % "commons-daemon" % "1.0.10", + "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407 "org.ow2.asm" % "asm" % "4.0", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.22", @@ -204,7 +204,7 @@ object SparkBuild extends Build { "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", - "org.apache.mesos" % "mesos" % "0.12.1", + "org.apache.mesos" % "mesos" % "0.13.0", "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.derby" % "derby" % "10.4.2.0" % "test", "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), @@ -215,9 +215,7 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", "com.twitter" %% "chill" % "0.3.1", - "com.twitter" % "chill-java" % "0.3.1", - "org.scala-lang" % "jline" % "2.10.2", - "org.scala-lang" % "scala-reflect" % "2.10.2" + "com.twitter" % "chill-java" % "0.3.1" ) ) @@ -227,7 +225,9 @@ object SparkBuild extends Build { def replSettings = sharedSettings ++ Seq( name := "spark-repl", - libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _) + libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v ), + libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "jline" % v ), + libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) ) @@ -237,7 +237,7 @@ object SparkBuild extends Build { "com.twitter" %% "algebird-core" % "0.1.11", "org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm), "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), - "org.apache.cassandra" % "cassandra-all" % "1.2.5" + "org.apache.cassandra" % "cassandra-all" % "1.2.6" exclude("com.google.guava", "guava") exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") exclude("com.ning","compress-lzf") diff --git a/spark-class b/spark-class index 1b2388e8c4..5305b3d025 100755 --- a/spark-class +++ b/spark-class @@ -37,7 +37,7 @@ fi # If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable # values for that; it doesn't need a lot -if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then +if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" # Do not overwrite SPARK_JAVA_OPTS environment variable in this script @@ -49,19 +49,19 @@ fi # Add java opts for master, worker, executor. The opts maybe null case "$1" in - 'spark.deploy.master.Master') + 'org.apache.spark.deploy.master.Master') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" ;; - 'spark.deploy.worker.Worker') + 'org.apache.spark.deploy.worker.Worker') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" ;; - 'spark.executor.StandaloneExecutorBackend') + 'org.apache.spark.executor.StandaloneExecutorBackend') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; - 'spark.executor.MesosExecutorBackend') + 'org.apache.spark.executor.MesosExecutorBackend') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; - 'spark.repl.Main') + 'org.apache.spark.repl.Main') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" ;; esac |