From f0164e5047f3a023768cdf0515a7a7d1c75ef7c3 Mon Sep 17 00:00:00 2001 From: kalpit Date: Wed, 6 Mar 2013 17:03:15 -0800 Subject: upgraded sbt version, sbt plugins and some library dependencies to latest stable version --- project/SparkBuild.scala | 8 ++++---- project/build.properties | 2 +- project/plugins.sbt | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5f378b2398..ea146b7b0b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -92,10 +92,10 @@ object SparkBuild extends Build { */ libraryDependencies ++= Seq( - "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011", - "org.scalatest" %% "scalatest" % "1.8" % "test", - "org.scalacheck" %% "scalacheck" % "1.9" % "test", - "com.novocode" % "junit-interface" % "0.8" % "test", + "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", + "org.scalatest" %% "scalatest" % "1.9.1" % "test", + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", + "com.novocode" % "junit-interface" % "0.9" % "test", "org.easymock" % "easymock" % "3.1" % "test" ), parallelExecution := false, diff --git a/project/build.properties b/project/build.properties index d4287112c6..66ad72ce2e 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.11.3 +sbt.version=0.12.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 4d0e696a11..d4f2442872 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,13 +4,13 @@ resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/release resolvers += "Spray Repository" at "http://repo.spray.cc/" -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5") -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1") +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1") -addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0") +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0") -addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2") +addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1") // For Sonatype publishing //resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) -- cgit v1.2.3 From f08db010d3ba4eff132f5c06a087b0c7c2e58576 Mon Sep 17 00:00:00 2001 From: kalpit Date: Wed, 6 Mar 2013 18:06:32 -0800 Subject: added SPARK_WORKER_INSTANCES : allows spawning multiple worker instances/processes on every slave machine --- bin/spark-daemon.sh | 6 ++++-- bin/spark-daemons.sh | 2 +- bin/start-master.sh | 2 +- bin/start-slave.sh | 2 +- bin/start-slaves.sh | 11 ++++++++++- bin/stop-master.sh | 2 +- bin/stop-slaves.sh | 12 +++++++++++- conf/spark-env.sh.template | 1 + 8 files changed, 30 insertions(+), 8 deletions(-) diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh index 0c584055c7..d7dc62ab08 100755 --- a/bin/spark-daemon.sh +++ b/bin/spark-daemon.sh @@ -30,7 +30,7 @@ # SPARK_NICENESS The scheduling priority for daemons. Defaults to 0. ## -usage="Usage: spark-daemon.sh [--config ] [--hosts hostlistfile] (start|stop) " +usage="Usage: spark-daemon.sh [--config ] [--hosts hostlistfile] (start|stop) " # if no args specified, show usage if [ $# -le 1 ]; then @@ -48,6 +48,8 @@ startStop=$1 shift command=$1 shift +instance=$1 +shift spark_rotate_log () { @@ -95,7 +97,7 @@ fi export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log export SPARK_ROOT_LOGGER="INFO,DRFA" log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out -pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command.pid +pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid # Set default scheduling priority if [ "$SPARK_NICENESS" = "" ]; then diff --git a/bin/spark-daemons.sh b/bin/spark-daemons.sh index 4f9719ee80..0619097e4d 100755 --- a/bin/spark-daemons.sh +++ b/bin/spark-daemons.sh @@ -2,7 +2,7 @@ # Run a Spark command on all slave hosts. -usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..." +usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..." # if no args specified, show usage if [ $# -le 1 ]; then diff --git a/bin/start-master.sh b/bin/start-master.sh index 87feb261fe..83a3e1f3dc 100755 --- a/bin/start-master.sh +++ b/bin/start-master.sh @@ -32,4 +32,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then fi fi -"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT +"$bin"/spark-daemon.sh start spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT diff --git a/bin/start-slave.sh b/bin/start-slave.sh index 45a0cf7a6b..616c76e4ee 100755 --- a/bin/start-slave.sh +++ b/bin/start-slave.sh @@ -11,4 +11,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then fi fi -"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1 +"$bin"/spark-daemon.sh start spark.deploy.worker.Worker "$@" diff --git a/bin/start-slaves.sh b/bin/start-slaves.sh index 390247ca4a..4e05224190 100755 --- a/bin/start-slaves.sh +++ b/bin/start-slaves.sh @@ -21,4 +21,13 @@ fi echo "Master IP: $SPARK_MASTER_IP" # Launch the slaves -exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT +if [ "$SPARK_WORKER_INSTANCES" = "" ]; then + exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT +else + if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then + SPARK_WORKER_WEBUI_PORT=8081 + fi + for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do + "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" $(( $i + 1 )) spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i )) + done +fi diff --git a/bin/stop-master.sh b/bin/stop-master.sh index f75167dd2c..172ee5891d 100755 --- a/bin/stop-master.sh +++ b/bin/stop-master.sh @@ -7,4 +7,4 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" -"$bin"/spark-daemon.sh stop spark.deploy.master.Master \ No newline at end of file +"$bin"/spark-daemon.sh stop spark.deploy.master.Master 1 diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh index 21c9ebf324..fbfc594472 100755 --- a/bin/stop-slaves.sh +++ b/bin/stop-slaves.sh @@ -7,4 +7,14 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" -"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker \ No newline at end of file +if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then + . "${SPARK_CONF_DIR}/spark-env.sh" +fi + +if [ "$SPARK_WORKER_INSTANCES" = "" ]; then + "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker 1 +else + for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do + "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker $(( $i + 1 )) + done +fi diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 6d71ec5691..37565ca827 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -12,6 +12,7 @@ # - SPARK_WORKER_CORES, to set the number of cores to use on this machine # - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g) # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT +# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine # # Finally, Spark also relies on the following variables, but these can be set # on just the *master* (i.e. in your driver program), and will automatically -- cgit v1.2.3 From aa9134f72a13cdc1fd1a662ec9389724e8e0e35f Mon Sep 17 00:00:00 2001 From: kalpit Date: Fri, 8 Mar 2013 18:28:43 -0800 Subject: spark instance number must be present in log filename to prevent multiple workers from overriding each other's logs --- bin/spark-daemon.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh index d7dc62ab08..8ee3ec481f 100755 --- a/bin/spark-daemon.sh +++ b/bin/spark-daemon.sh @@ -94,9 +94,9 @@ if [ "$SPARK_PID_DIR" = "" ]; then fi # some variables -export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log +export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log export SPARK_ROOT_LOGGER="INFO,DRFA" -log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out +log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid # Set default scheduling priority -- cgit v1.2.3 From bc8ba222ff29d46cd2e3331753c9c4ce681eccb4 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 28 Mar 2013 15:42:01 -0700 Subject: Bump development version to 0.8.0 --- bagel/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- pom.xml | 2 +- project/SparkBuild.scala | 2 +- repl-bin/pom.xml | 2 +- repl/pom.xml | 2 +- repl/src/main/scala/spark/repl/SparkILoop.scala | 2 +- streaming/pom.xml | 2 +- 10 files changed, 11 insertions(+), 11 deletions(-) diff --git a/bagel/pom.xml b/bagel/pom.xml index 510cff4669..be2e358091 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index fe9c803728..08717860a7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/docs/_config.yml b/docs/_config.yml index f99d5bb376..a6aa38a46d 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,8 +3,8 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 0.7.1-SNAPSHOT -SPARK_VERSION_SHORT: 0.7.1 +SPARK_VERSION: 0.8.0-SNAPSHOT +SPARK_VERSION_SHORT: 0.8.0 SCALA_VERSION: 2.9.2 MESOS_VERSION: 0.9.0-incubating SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/examples/pom.xml b/examples/pom.xml index 39cc47c709..d014089fe4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 08d1fc12e0..1174b475d3 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT pom Spark Project Parent POM http://spark-project.org/ diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5f378b2398..250211fb0c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -35,7 +35,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", - version := "0.7.1-SNAPSHOT", + version := "0.8.0-SNAPSHOT", scalaVersion := "2.9.2", scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index dd720e2291..fe526a7616 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/repl/pom.xml b/repl/pom.xml index a3e4606edc..0b5e400c3d 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index cd7b5128b2..39b213851f 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.7.1 + /___/ .__/\_,_/_/ /_/\_\ version 0.8.0-SNAPSHOT /_/ """) import Properties._ diff --git a/streaming/pom.xml b/streaming/pom.xml index ec077e8089..b0d0cd0ff3 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml -- cgit v1.2.3 From 8bcdc6400516d3fc7d221c63d5935fa6fe6af24b Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 30 Mar 2013 22:09:52 -0700 Subject: Fixed broken filter in getWritableClass[T] --- core/src/main/scala/spark/SequenceFileRDDFunctions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 6b4a11d6d3..b68609090f 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -46,7 +46,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla // from T to Writable. Since we have two apply methods we filter out the one which // is of the form "java.lang.Object apply(java.lang.Object)" implicitly[T => Writable].getClass.getDeclaredMethods().filter( - m => m.getReturnType().toString != "java.lang.Object" && + m => m.getReturnType().toString != "class java.lang.Object" && m.getName() == "apply")(0).getReturnType } -- cgit v1.2.3 From e215f67923035dcf5515946a4236a6ef36c5b309 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sun, 31 Mar 2013 08:00:13 -0700 Subject: Correct sense of 'filter out' in comment. --- .../main/scala/spark/SequenceFileRDDFunctions.scala | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index b68609090f..518034e07b 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -36,15 +36,15 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla self: RDD[(K, V)]) extends Logging with Serializable { - + private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { classManifest[T].erasure } else { // We get the type of the Writable class by looking at the apply method which converts // from T to Writable. Since we have two apply methods we filter out the one which - // is of the form "java.lang.Object apply(java.lang.Object)" + // is not of the form "java.lang.Object apply(java.lang.Object)" implicitly[T => Writable].getClass.getDeclaredMethods().filter( m => m.getReturnType().toString != "class java.lang.Object" && m.getName() == "apply")(0).getReturnType @@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla val valueClass = getWritableClass[V] val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) - - logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) + + logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyClass, valueClass, format) + self.saveAsHadoopFile(path, keyClass, valueClass, format) } else if (!convertKey && convertValue) { - self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && !convertValue) { - self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) + self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && convertValue) { - self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) - } + self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + } } } -- cgit v1.2.3 From b5e60c3253748cf87a76c713ae3388dce1b03107 Mon Sep 17 00:00:00 2001 From: Erik van oosten Date: Tue, 2 Apr 2013 15:25:22 +0300 Subject: Corrected order of CountMinSketchMonoid arguments --- .../src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index 483aae452b..a9642100e3 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -49,7 +49,7 @@ object TwitterAlgebirdCMS { val users = stream.map(status => status.getUser.getId) - val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) var globalCMS = cms.zero val mm = new MapMonoid[Long, Int]() var globalExact = Map[Long, Int]() -- cgit v1.2.3 From 20604001e280a445d7c25bac6eb31b1f0512c20f Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sun, 10 Mar 2013 06:16:19 +0000 Subject: Fix argv handling in Python transitive closure example --- python/examples/transitive_closure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py index 73f7f8fbaf..7f85a1008e 100644 --- a/python/examples/transitive_closure.py +++ b/python/examples/transitive_closure.py @@ -24,7 +24,7 @@ if __name__ == "__main__": "Usage: PythonTC []" exit(-1) sc = SparkContext(sys.argv[1], "PythonTC") - slices = sys.argv[2] if len(sys.argv) > 2 else 2 + slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 tc = sc.parallelize(generateGraph(), slices).cache() # Linear transitive closure: each round grows paths by one edge, -- cgit v1.2.3 From 9a731f5a6dd56f5bc9eda3bb3177f7bbbb251c31 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Tue, 2 Apr 2013 11:58:40 -0700 Subject: Fix Python saveAsTextFile doctest to not expect order to be preserved --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 172ed85fab..a9fec17a9d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -399,7 +399,7 @@ class RDD(object): >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob - >>> ''.join(input(glob(tempFile.name + "/part-0000*"))) + >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' """ def func(split, iterator): -- cgit v1.2.3 From 0f54344fd858acb327174cc976a07038a3aadf24 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 3 Apr 2013 13:15:34 +0200 Subject: Bumping Algebird version in examples now that it supports JDK 1.6 --- examples/pom.xml | 2 +- project/SparkBuild.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pom.xml b/examples/pom.xml index 39cc47c709..0537404040 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -22,7 +22,7 @@ com.twitter algebird-core_2.9.2 - 0.1.8 + 0.1.11 org.scalatest diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d44bf3b5e3..082e2b985d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -157,7 +157,7 @@ object SparkBuild extends Build { def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", - libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8") + libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.11") ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") -- cgit v1.2.3 From 9b68ceaa26ffe0cf0692781dd010e69f1d9633e2 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 3 Apr 2013 14:29:46 -0700 Subject: SPARK-724 - Have Akka logging enabled by default for standalone daemons See the JIRA for more details. I was only able to test the bash version (don't have Windows) so maybe check over that the syntax is correct there. --- run | 1 + run2.cmd | 1 + 2 files changed, 2 insertions(+) diff --git a/run b/run index 2c29cc4a66..ceb6e58173 100755 --- a/run +++ b/run @@ -22,6 +22,7 @@ fi # values for that; it doesn't need a lot if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} + SPARK_DAEMON_JAVA_OPTS+="-Dspark.akka.logLifecycleEvents=true" SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default fi diff --git a/run2.cmd b/run2.cmd index cb20a4b7a2..cfbb35ddca 100644 --- a/run2.cmd +++ b/run2.cmd @@ -21,6 +21,7 @@ set RUNNING_DAEMON=0 if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1 if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1 if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m +set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY% if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -- cgit v1.2.3 From 5555811bd54ddb84bce11d4ab04b1f818c221a14 Mon Sep 17 00:00:00 2001 From: Andy Konwinski Date: Thu, 4 Apr 2013 13:26:45 -0700 Subject: Update build to Scala 2.9.3 --- project/SparkBuild.scala | 8 ++++---- project/plugins.sbt | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5f378b2398..b54fa1ebf8 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -36,7 +36,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.7.1-SNAPSHOT", - scalaVersion := "2.9.2", + scalaVersion := "2.9.3", scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, @@ -93,8 +93,8 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011", - "org.scalatest" %% "scalatest" % "1.8" % "test", - "org.scalacheck" %% "scalacheck" % "1.9" % "test", + "org.scalatest" %% "scalatest" % "1.9.1" % "test", + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", "com.novocode" % "junit-interface" % "0.8" % "test", "org.easymock" % "easymock" % "3.1" % "test" ), @@ -140,7 +140,7 @@ object SparkBuild extends Build { "colt" % "colt" % "1.2.0", "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", - "cc.spray" %% "spray-json" % "1.1.1", + "cc.spray" % "spray-json_2.9.2" % "1.1.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } diff --git a/project/plugins.sbt b/project/plugins.sbt index 4d0e696a11..9cea0b5ee4 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -10,7 +10,7 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1") addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0") -addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2") +addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1") // For Sonatype publishing //resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) -- cgit v1.2.3 From ff2130a0ad17388036b66fcdf2b1848e208fa0f8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 6 Apr 2013 00:35:50 -0700 Subject: Retry failed ssh commands. This is especially useful during system startup when the hosts may not have yet come on-line but can be useful at other points for people with flakey connections --- ec2/retry_decorator.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ ec2/spark_ec2.py | 2 ++ 2 files changed, 46 insertions(+) create mode 100644 ec2/retry_decorator.py diff --git a/ec2/retry_decorator.py b/ec2/retry_decorator.py new file mode 100644 index 0000000000..1a2f79ae84 --- /dev/null +++ b/ec2/retry_decorator.py @@ -0,0 +1,44 @@ +import time +from functools import wraps + +def retry(ExceptionToCheck, tries=4, delay=3, backoff=2, logger=None): + """Retry calling the decorated function using an exponential backoff. + + http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ + original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry + + :param ExceptionToCheck: the exception to check. may be a tuple of + exceptions to check + :type ExceptionToCheck: Exception or tuple + :param tries: number of times to try (not retry) before giving up + :type tries: int + :param delay: initial delay between retries in seconds + :type delay: int + :param backoff: backoff multiplier e.g. value of 2 will double the delay + each retry + :type backoff: int + :param logger: logger to use. If None, print + :type logger: logging.Logger instance + """ + def deco_retry(f): + + @wraps(f) + def f_retry(*args, **kwargs): + mtries, mdelay = tries, delay + while mtries > 1: + try: + return f(*args, **kwargs) + except ExceptionToCheck, e: + msg = "%s, Retrying in %d seconds..." % (str(e), mdelay) + if logger: + logger.warning(msg) + else: + print msg + time.sleep(mdelay) + mtries -= 1 + mdelay *= backoff + return f(*args, **kwargs) + + return f_retry # true decorator + + return deco_retry diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 571d27fde6..aa2d360fbb 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -22,6 +22,7 @@ from __future__ import with_statement import logging import os import random +from retry_decorator import retry import shutil import subprocess import sys @@ -541,6 +542,7 @@ def scp(host, opts, local_file, dest_file): # Run a command on a host through ssh, throwing an exception if ssh fails +@retry(subprocess.CalledProcessError, tries=3, delay=30) def ssh(host, opts, command): subprocess.check_call( "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % -- cgit v1.2.3 From 34a47b8bc9c8d3357d42201ec41e5af2675bc766 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 7 Apr 2013 20:27:03 -0400 Subject: Update Scala version in docs --- docs/_config.yml | 2 +- docs/building-with-maven.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/_config.yml b/docs/_config.yml index f99d5bb376..055ba77c5b 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -5,6 +5,6 @@ markdown: kramdown # of Spark, Scala, and Mesos. SPARK_VERSION: 0.7.1-SNAPSHOT SPARK_VERSION_SHORT: 0.7.1 -SCALA_VERSION: 2.9.2 +SCALA_VERSION: 2.9.3 MESOS_VERSION: 0.9.0-incubating SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index c2eeafd07a..04cd79d039 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -42,10 +42,10 @@ To run a specific test suite: You might run into the following errors if you're using a vanilla installation of Maven: - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes... + [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes... [ERROR] PermGen space -> [Help 1] - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes... + [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] To fix these, you can do the following: -- cgit v1.2.3 From 6962d40b44f11651ec7021c8a8d9e0cc64fff970 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 7 Apr 2013 20:27:33 -0400 Subject: Fix deprecated warning --- core/src/main/scala/spark/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 33dc7627a3..ccd9d0364a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -366,7 +366,7 @@ abstract class RDD[T: ClassManifest]( * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - @deprecated("use mapPartitionsWithIndex") + @deprecated("use mapPartitionsWithIndex", "0.7.0") def mapPartitionsWithSplit[U: ClassManifest]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = -- cgit v1.2.3 From a1586412d647da1a192259a58b10d50b02f02a9c Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 7 Apr 2013 20:31:19 -0400 Subject: Updated link to SBT --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 51d505e1fa..0c4add45dc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -18,7 +18,7 @@ or you will need to set the `SCALA_HOME` environment variable to point to where you've installed Scala. Scala must also be accessible through one of these methods on slave nodes on your cluster. -Spark uses [Simple Build Tool](https://github.com/harrah/xsbt/wiki), which is bundled with it. To compile the code, go into the top-level Spark directory and run +Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run sbt/sbt package -- cgit v1.2.3 From b496decf0a486a31afe3cac706a6b80d075cd327 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 7 Apr 2013 17:44:48 -0700 Subject: Updating based on code review --- run | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run b/run index ceb6e58173..93e4b0d18f 100755 --- a/run +++ b/run @@ -22,7 +22,7 @@ fi # values for that; it doesn't need a lot if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} - SPARK_DAEMON_JAVA_OPTS+="-Dspark.akka.logLifecycleEvents=true" + SPARK_DAEMON_JAVA_OPTS+=" -Dspark.akka.logLifecycleEvents=true" SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default fi -- cgit v1.2.3 From b5900d47b1386f5bc21df5db32b09b2d44e9dba7 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 7 Apr 2013 20:55:42 -0400 Subject: Fix compile warning --- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 9e37bdf659..5db77eb142 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -122,6 +122,6 @@ private[spark] object CheckpointRDD extends Logging { val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") - fs.delete(path) + fs.delete(path, true) } } -- cgit v1.2.3 From 054feb6448578de5542f9ef54d4cc88f706c22f5 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 7 Apr 2013 21:15:21 -0400 Subject: Fixed a bug with zip --- core/src/main/scala/spark/rdd/ZippedRDD.scala | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index e80ec17aa5..35b0e06785 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( @transient rdd2: RDD[U] ) extends Partition { - var split1 = rdd1.partitions(idx) - var split2 = rdd1.partitions(idx) + var partition1 = rdd1.partitions(idx) + var partition2 = rdd2.partitions(idx) override val index: Int = idx - def splits = (split1, split2) + def partitions = (partition1, partition2) @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { - // Update the reference to parent split at the time of task serialization - split1 = rdd1.partitions(idx) - split2 = rdd2.partitions(idx) + // Update the reference to parent partition at the time of task serialization + partition1 = rdd1.partitions(idx) + partition2 = rdd2.partitions(idx) oos.defaultWriteObject() } } @@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context)) } override def getPreferredLocations(s: Partition): Seq[String] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2)) } override def clearDependencies() { -- cgit v1.2.3 From adba773fab6294b5764d101d248815a7d3cb3558 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 8 Apr 2013 17:34:38 -0400 Subject: Fix passing of superstep in Bagel to avoid seeing new values of the superstep value upon recomputation, and set the default storage level in Bagel to MEMORY_AND_DISK --- bagel/src/main/scala/spark/bagel/Bagel.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index e10c03f6ba..5ecdd7d004 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -7,8 +7,7 @@ import scala.collection.mutable.ArrayBuffer import storage.StorageLevel object Bagel extends Logging { - - val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY + val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK /** * Runs a Bagel program. @@ -63,8 +62,9 @@ object Bagel extends Logging { val combinedMsgs = msgs.combineByKey( combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) + val superstep_ = superstep // Create a read-only copy of superstep for capture in closure val (processed, numMsgs, numActiveVerts) = - comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel) + comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel) val timeTaken = System.currentTimeMillis - startTime logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000)) -- cgit v1.2.3 From 18bd41d1a38bdb9d68a9e6f360a15f9d7b25ebf5 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 9 Apr 2013 14:35:29 -0400 Subject: Don't pull in old versions of Jackson via hadoop-core --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f03efd6c83..029ade2e2e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -129,7 +129,7 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "com.ning" % "compress-lzf" % "0.8.4", - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.22", -- cgit v1.2.3 From f1d8871ca14567e58111c1794d2a1b4ad48ce7cd Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 9 Apr 2013 23:35:13 -0400 Subject: Uniform whitespace across scala examples --- .../main/scala/spark/examples/LocalKMeans.scala | 138 ++++++++++----------- .../scala/spark/examples/MultiBroadcastTest.scala | 6 +- .../spark/examples/SimpleSkewedGroupByTest.scala | 4 +- .../scala/spark/examples/SkewedGroupByTest.scala | 4 +- 4 files changed, 76 insertions(+), 76 deletions(-) diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala index b07e799cef..4849f216fb 100644 --- a/examples/src/main/scala/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala @@ -10,73 +10,73 @@ import scala.collection.mutable.HashSet * K-means clustering. */ object LocalKMeans { - val N = 1000 - val R = 1000 // Scaling factor - val D = 10 - val K = 10 - val convergeDist = 0.001 - val rand = new Random(42) - - def generateData = { - def generatePoint(i: Int) = { - Vector(D, _ => rand.nextDouble * R) - } - Array.tabulate(N)(generatePoint) - } - - def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { - var index = 0 - var bestIndex = 0 - var closest = Double.PositiveInfinity - - for (i <- 1 to centers.size) { - val vCurr = centers.get(i).get - val tempDist = p.squaredDist(vCurr) - if (tempDist < closest) { - closest = tempDist - bestIndex = i - } - } - - return bestIndex - } - - def main(args: Array[String]) { - val data = generateData - var points = new HashSet[Vector] - var kPoints = new HashMap[Int, Vector] - var tempDist = 1.0 - - while (points.size < K) { - points.add(data(rand.nextInt(N))) - } - - val iter = points.iterator - for (i <- 1 to points.size) { - kPoints.put(i, iter.next()) - } - - println("Initial centers: " + kPoints) - - while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - - var mappings = closest.groupBy[Int] (x => x._1) - - var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))}) - - var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)} - - tempDist = 0.0 - for (mapping <- newPoints) { - tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2) - } - - for (newP <- newPoints) { - kPoints.put(newP._1, newP._2) - } - } - - println("Final centers: " + kPoints) - } + val N = 1000 + val R = 1000 // Scaling factor + val D = 10 + val K = 10 + val convergeDist = 0.001 + val rand = new Random(42) + + def generateData = { + def generatePoint(i: Int) = { + Vector(D, _ => rand.nextDouble * R) + } + Array.tabulate(N)(generatePoint) + } + + def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { + var index = 0 + var bestIndex = 0 + var closest = Double.PositiveInfinity + + for (i <- 1 to centers.size) { + val vCurr = centers.get(i).get + val tempDist = p.squaredDist(vCurr) + if (tempDist < closest) { + closest = tempDist + bestIndex = i + } + } + + return bestIndex + } + + def main(args: Array[String]) { + val data = generateData + var points = new HashSet[Vector] + var kPoints = new HashMap[Int, Vector] + var tempDist = 1.0 + + while (points.size < K) { + points.add(data(rand.nextInt(N))) + } + + val iter = points.iterator + for (i <- 1 to points.size) { + kPoints.put(i, iter.next()) + } + + println("Initial centers: " + kPoints) + + while(tempDist > convergeDist) { + var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + + var mappings = closest.groupBy[Int] (x => x._1) + + var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))}) + + var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)} + + tempDist = 0.0 + for (mapping <- newPoints) { + tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2) + } + + for (newP <- newPoints) { + kPoints.put(newP._1, newP._2) + } + } + + println("Final centers: " + kPoints) + } } diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala index 92cd81c487..a0aaf60918 100644 --- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala @@ -8,7 +8,7 @@ object MultiBroadcastTest { System.err.println("Usage: BroadcastTest [] [numElem]") System.exit(1) } - + val sc = new SparkContext(args(0), "Broadcast Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) @@ -19,7 +19,7 @@ object MultiBroadcastTest { for (i <- 0 until arr1.length) { arr1(i) = i } - + var arr2 = new Array[Int](num) for (i <- 0 until arr2.length) { arr2(i) = i @@ -30,7 +30,7 @@ object MultiBroadcastTest { sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size + barr2.value.size) } - + System.exit(0) } } diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala index 0d17bda004..461b84a2c6 100644 --- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala @@ -11,7 +11,7 @@ object SimpleSkewedGroupByTest { "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]") System.exit(1) } - + var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 @@ -20,7 +20,7 @@ object SimpleSkewedGroupByTest { val sc = new SparkContext(args(0), "GroupBy Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random var result = new Array[(Int, Array[Byte])](numKVPairs) diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala index 83be3fc27b..435675f9de 100644 --- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala @@ -10,7 +10,7 @@ object SkewedGroupByTest { System.err.println("Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) } - + var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 @@ -18,7 +18,7 @@ object SkewedGroupByTest { val sc = new SparkContext(args(0), "GroupBy Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random -- cgit v1.2.3 From 9bf24e1d61a629b410183dfc4296bba994f0a79e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 9 Apr 2013 21:37:02 -0700 Subject: Just use a loop for retries --- ec2/retry_decorator.py | 44 -------------------------------------------- ec2/spark_ec2.py | 23 +++++++++++++++++------ 2 files changed, 17 insertions(+), 50 deletions(-) delete mode 100644 ec2/retry_decorator.py diff --git a/ec2/retry_decorator.py b/ec2/retry_decorator.py deleted file mode 100644 index 1a2f79ae84..0000000000 --- a/ec2/retry_decorator.py +++ /dev/null @@ -1,44 +0,0 @@ -import time -from functools import wraps - -def retry(ExceptionToCheck, tries=4, delay=3, backoff=2, logger=None): - """Retry calling the decorated function using an exponential backoff. - - http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ - original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry - - :param ExceptionToCheck: the exception to check. may be a tuple of - exceptions to check - :type ExceptionToCheck: Exception or tuple - :param tries: number of times to try (not retry) before giving up - :type tries: int - :param delay: initial delay between retries in seconds - :type delay: int - :param backoff: backoff multiplier e.g. value of 2 will double the delay - each retry - :type backoff: int - :param logger: logger to use. If None, print - :type logger: logging.Logger instance - """ - def deco_retry(f): - - @wraps(f) - def f_retry(*args, **kwargs): - mtries, mdelay = tries, delay - while mtries > 1: - try: - return f(*args, **kwargs) - except ExceptionToCheck, e: - msg = "%s, Retrying in %d seconds..." % (str(e), mdelay) - if logger: - logger.warning(msg) - else: - print msg - time.sleep(mdelay) - mtries -= 1 - mdelay *= backoff - return f(*args, **kwargs) - - return f_retry # true decorator - - return deco_retry diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index aa2d360fbb..9f2daad2b6 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -22,7 +22,6 @@ from __future__ import with_statement import logging import os import random -from retry_decorator import retry import shutil import subprocess import sys @@ -541,12 +540,24 @@ def scp(host, opts, local_file, dest_file): (opts.identity_file, local_file, opts.user, host, dest_file), shell=True) -# Run a command on a host through ssh, throwing an exception if ssh fails -@retry(subprocess.CalledProcessError, tries=3, delay=30) +# Run a command on a host through ssh, retrying up to two times +# and then throwing an exception if ssh continues to fail. def ssh(host, opts, command): - subprocess.check_call( - "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % - (opts.identity_file, opts.user, host, command), shell=True) + tries = 0 + while True: + try: + return subprocess.check_call( + "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % + (opts.identity_file, opts.user, host, command), shell=True) + except subprocess.CalledProcessError as e: + if (tries > 2): + raise e + print "Error connecting to host {0}, sleeping 30".format(e) + time.sleep(30) + tries = tries + 1 + + + # Gets a list of zones to launch instances in -- cgit v1.2.3 From 6efc8cae8f4497d431e2a861778e2e120e774990 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 10 Apr 2013 13:44:10 -0300 Subject: Typos: cluser -> cluster --- docs/streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index b30699cf3d..f5788dc467 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -83,7 +83,7 @@ DStreams support many of the transformations available on normal Spark RDD's: groupByKey([numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream.
- Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. + Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. @@ -132,7 +132,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo groupByKeyAndWindow(windowDuration, slideDuration, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window.
-Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. +Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. reduceByKeyAndWindow(func, windowDuration, slideDuration, [numTasks]) -- cgit v1.2.3 From 29d3440efbc6e59da388f8d9c7f638a74dd9e9c5 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 11 Apr 2013 01:54:02 -0300 Subject: Add details when BlockManager heartbeats time out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Makes it more clear what the threshold was for tuning spark.storage.blockManagerSlaveTimeoutMs Before: WARN  "Removing BlockManager BlockManagerId(201304022120-1976232532-5050-27464-0, myhostname, 51337) with no recent heart beats After: WARN  "Removing BlockManager BlockManagerId(201304022120-1976232532-5050-27464-0, myhostname, 51337) with no recent heart beats: 19216ms exceeds 15000ms --- core/src/main/scala/spark/storage/BlockManagerMasterActor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 2830bc6297..9ab451b46b 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -121,7 +121,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { val toRemove = new HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { if (info.lastSeenMs < minSeenTime) { - logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") + logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") toRemove += info.blockManagerId } } -- cgit v1.2.3 From ed336e0d44d27e9be66adb0962f82af7d1ac4d87 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 11 Apr 2013 22:29:37 -0400 Subject: Fix tests from different projects running in parallel in SBT 0.12 --- project/SparkBuild.scala | 6 ++++-- project/build.properties | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f03efd6c83..0e55c51204 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -44,7 +44,10 @@ object SparkBuild extends Build { transitiveClassifiers in Scope.GlobalScope := Seq("sources"), testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), - // shared between both core and streaming. + // Only allow one test at a time, even across projects, since they run in the same JVM + concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), + + // Shared between both core and streaming. resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), // For Sonatype publishing @@ -98,7 +101,6 @@ object SparkBuild extends Build { "com.novocode" % "junit-interface" % "0.9" % "test", "org.easymock" % "easymock" % "3.1" % "test" ), - parallelExecution := false, /* Workaround for issue #206 (fixed after SBT 0.11.0) */ watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task, const(std.TaskExtra.constant(Nil)), aggregate = true, includeRoot = true) apply { _.join.map(_.flatten) }, diff --git a/project/build.properties b/project/build.properties index 66ad72ce2e..9b860e23c5 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.12.2 +sbt.version=0.12.3 -- cgit v1.2.3 From 6f688608915a82e6dcf4a27dc92e4b25a3570fa4 Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 11 Apr 2013 20:52:06 -0700 Subject: Reversed the order of tests to find a scala executable (in the case when SPARK_LAUNCH_WITH_SCALA is defined): instead of checking in the PATH first, and only then (if not found) for SCALA_HOME, now we check for SCALA_HOME first, and only then (if not defined) do we look in the PATH. The advantage is that now if the user has a more recent (non-compatible) version of scala in her PATH, she can use SCALA_HOME to point to the older (compatible) version for use with spark. Suggested by Josh Rosen in this thread: https://groups.google.com/forum/?fromgroups=#!topic/spark-users/NC9JKvP8808 --- run | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/run b/run index 73239097b9..756f8703f2 100755 --- a/run +++ b/run @@ -47,14 +47,15 @@ case "$1" in esac if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then - if [ `command -v scala` ]; then - RUNNER="scala" + if [ "$SCALA_HOME" ]; then + RUNNER="${SCALA_HOME}/bin/scala" else - if [ -z "$SCALA_HOME" ]; then - echo "SCALA_HOME is not set" >&2 + if [ `command -v scala` ]; then + RUNNER="scala" + else + echo "SCALA_HOME is not set and scala is not in PATH" >&2 exit 1 fi - RUNNER="${SCALA_HOME}/bin/scala" fi else if [ `command -v java` ]; then -- cgit v1.2.3 From 60a91b3b59513b00a3afaa5a7621ee17eb6ee425 Mon Sep 17 00:00:00 2001 From: Andy Konwinski Date: Fri, 12 Apr 2013 12:32:58 -0700 Subject: Update quick-start.md heading on Operations (not just Transformations). --- docs/quick-start.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/quick-start.md b/docs/quick-start.md index 5c80d2ed3a..2d961b29cb 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -53,8 +53,8 @@ scala> textFile.filter(line => line.contains("Spark")).count() // How many lines res3: Long = 15 {% endhighlight %} -## Transformations -RDD transformations can be used for more complex computations. Let's say we want to find the line with the most words: +## More On RDD Operations +RDD actions and transformations can be used for more complex computations. Let's say we want to find the line with the most words: {% highlight scala %} scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) -- cgit v1.2.3 From c35d530bcfea2e1764863eb9f47a794d8fa001af Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 13 Apr 2013 12:43:12 -0400 Subject: Fix compile error --- core/src/main/scala/spark/storage/BlockManagerMasterActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 9ab451b46b..1a6a6cfd3f 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -121,7 +121,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { val toRemove = new HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { if (info.lastSeenMs < minSeenTime) { - logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " + logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") toRemove += info.blockManagerId } -- cgit v1.2.3 From a64c10744988f55776e61c305734f9f5a42f9ea5 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 15 Apr 2013 16:41:51 -0400 Subject: Make ShuffledRDD.prev transient --- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 2 +- core/src/main/scala/spark/rdd/SubtractedRDD.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 9213513e80..a6235491ca 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -29,7 +29,7 @@ private[spark] case class NarrowCoGroupSplitDep( private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep private[spark] -class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep]) +class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) extends Partition with Serializable { override val index: Int = idx override def hashCode(): Int = idx @@ -88,7 +88,7 @@ class CoGroupedRDD[K]( case _ => new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) } - }.toList) + }.toArray) } array } diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 51f02409b6..4e33b7dd5c 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -16,7 +16,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { * @tparam V the value class. */ class ShuffledRDD[K, V]( - prev: RDD[(K, V)], + @transient prev: RDD[(K, V)], part: Partitioner) extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) { diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 0a02561062..481e03b349 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -56,7 +56,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM case _ => new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) } - }.toList) + }.toArray) } array } -- cgit v1.2.3 From 6bfe4bf3eb9421b4e4143f384a0012c3694df8c9 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Mon, 15 Apr 2013 23:32:25 -0700 Subject: Increase ReservedCodeCacheSize for sbt --- sbt/sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sbt/sbt b/sbt/sbt index 8f426d18e8..850c58e1e9 100755 --- a/sbt/sbt +++ b/sbt/sbt @@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then fi export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd) export SPARK_TESTING=1 # To put test classes on classpath -java -Xmx1200M -XX:MaxPermSize=250m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" +java -Xmx1200m -XX:MaxPermSize=250m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" -- cgit v1.2.3 From 17e076de800ea0d4c55f2bd657348641f6f9c55b Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 18 Apr 2013 22:25:21 -0700 Subject: Turn on forking in test JVMs to reduce the pressure on perm gen and code cache sizes due to having 2 instances of the Scala compiler and a bunch of classloaders. --- project/SparkBuild.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 91541a34f9..f0b371b2cf 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -44,6 +44,10 @@ object SparkBuild extends Build { transitiveClassifiers in Scope.GlobalScope := Seq("sources"), testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), + // Fork new JVMs for tests and set Java options for those + fork := true, + javaOptions += "-Xmx1g", + // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), -- cgit v1.2.3 From 3b594a4e3b94de49a09dc679a30d857e3f41df69 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 24 Apr 2013 10:18:25 +0530 Subject: Do not add signature files - results in validation errors when using assembled file --- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 0c2598ab35..b3f410bfa6 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -214,6 +214,7 @@ object SparkBuild extends Build { def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard + case m if m.toLowerCase.matches("meta-inf/.*\\.sf$") => MergeStrategy.discard case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } -- cgit v1.2.3 From 01d9ba503878d4191eaa8080e86c631d3c705cce Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 25 Apr 2013 00:11:27 -0700 Subject: Add back line removed during YARN merge --- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b3f410bfa6..44758ad87e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -48,6 +48,7 @@ object SparkBuild extends Build { scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, + retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", transitiveClassifiers in Scope.GlobalScope := Seq("sources"), testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), -- cgit v1.2.3 From eef9ea1993270d5f07e52e807e8d149e54079aad Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 24 Apr 2013 15:08:20 -0700 Subject: Update unit test memory to 2 GB --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 44758ad87e..f32c47e71f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -54,7 +54,7 @@ object SparkBuild extends Build { // Fork new JVMs for tests and set Java options for those fork := true, - javaOptions += "-Xmx1g", + javaOptions += "-Xmx2g", // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), -- cgit v1.2.3 From 6e6b5204ea015fc7cc2c3e16e0032be3074413be Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 24 Apr 2013 18:53:12 -0700 Subject: Create an empty directory when checkpointing a 0-partition RDD (fixes a test failure on Hadoop 2.0) --- core/src/main/scala/spark/RDDCheckpointData.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala index d00092e984..57e0405fb4 100644 --- a/core/src/main/scala/spark/RDDCheckpointData.scala +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -1,6 +1,7 @@ package spark import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration import rdd.{CheckpointRDD, CoalescedRDD} import scheduler.{ResultTask, ShuffleMapTask} @@ -62,14 +63,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) } } + // Create the output path for the checkpoint + val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) + val fs = path.getFileSystem(new Configuration()) + if (!fs.mkdirs(path)) { + throw new SparkException("Failed to create checkpoint path " + path) + } + // Save to file, and reload it as an RDD - val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString - rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _) - val newRDD = new CheckpointRDD[T](rdd.context, path) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _) + val newRDD = new CheckpointRDD[T](rdd.context, path.toString) // Change the dependencies and partitions of the RDD RDDCheckpointData.synchronized { - cpFile = Some(path) + cpFile = Some(path.toString) cpRDD = Some(newRDD) rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions cpState = Checkpointed -- cgit v1.2.3 From a72134a6ac04e2e49679bbd5ba1266daf909bec8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 25 Apr 2013 10:39:28 -0700 Subject: SPARK-739 Have quickstart standlone job use README --- docs/quick-start.md | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/docs/quick-start.md b/docs/quick-start.md index 2d961b29cb..335643536a 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -113,8 +113,8 @@ import SparkContext._ object SimpleJob { def main(args: Array[String]) { - val logFile = "/var/log/syslog" // Should be some file on your system - val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", + val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system + val sc = new SparkContext("local", "Simple Job", "YOUR_SPARK_HOME", List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() @@ -124,7 +124,7 @@ object SimpleJob { } {% endhighlight %} -This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. +This job simply counts the number of lines containing 'a' and the number containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` which explains that Spark is a dependency. This file also adds two repositories which host Spark dependencies: @@ -156,7 +156,7 @@ $ find . $ sbt package $ sbt run ... -Lines with a: 8422, Lines with b: 1836 +Lines with a: 46, Lines with b: 23 {% endhighlight %} This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS. @@ -173,7 +173,7 @@ import spark.api.java.function.Function; public class SimpleJob { public static void main(String[] args) { - String logFile = "/var/log/syslog"; // Should be some file on your system + String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system JavaSparkContext sc = new JavaSparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"}); JavaRDD logData = sc.textFile(logFile).cache(); @@ -191,7 +191,7 @@ public class SimpleJob { } {% endhighlight %} -This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that like in the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail. +This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail. To build the job, we also write a Maven `pom.xml` file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version. @@ -239,7 +239,7 @@ Now, we can execute the job using Maven: $ mvn package $ mvn exec:java -Dexec.mainClass="SimpleJob" ... -Lines with a: 8422, Lines with b: 1836 +Lines with a: 46, Lines with b: 23 {% endhighlight %} This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS. @@ -253,7 +253,7 @@ As an example, we'll create a simple Spark job, `SimpleJob.py`: """SimpleJob.py""" from pyspark import SparkContext -logFile = "/var/log/syslog" # Should be some file on your system +logFile = "$YOUR_SPARK_HOME/README.md" # Should be some file on your system sc = SparkContext("local", "Simple job") logData = sc.textFile(logFile).cache() @@ -265,7 +265,8 @@ print "Lines with a: %i, lines with b: %i" % (numAs, numBs) This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. -Like in the Scala and Java examples, we use a SparkContext to create RDDs. +Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. +As with the Scala and Java examples, we use a SparkContext to create RDDs. We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference. For jobs that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide.html). `SimpleJob` is simple enough that we do not need to specify any code dependencies. @@ -276,7 +277,7 @@ We can run this job using the `pyspark` script: $ cd $SPARK_HOME $ ./pyspark SimpleJob.py ... -Lines with a: 8422, Lines with b: 1836 +Lines with a: 46, Lines with b: 23 {% endhighlight python %} This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS. -- cgit v1.2.3 From 1b169f190c5c5210d088faced86dee1007295ac8 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 25 Apr 2013 19:52:12 -0700 Subject: Exclude old versions of Netty, which had a different Maven organization --- project/SparkBuild.scala | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f32c47e71f..7bd6c4c235 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -129,6 +129,9 @@ object SparkBuild extends Build { val slf4jVersion = "1.6.1" + val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") + val excludeNetty = ExclusionRule(organization = "org.jboss.netty") + def coreSettings = sharedSettings ++ Seq( name := "spark-core", resolvers ++= Seq( @@ -149,33 +152,33 @@ object SparkBuild extends Build { "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.22", - "com.typesafe.akka" % "akka-actor" % "2.0.3", - "com.typesafe.akka" % "akka-remote" % "2.0.3", - "com.typesafe.akka" % "akka-slf4j" % "2.0.3", + "com.typesafe.akka" % "akka-actor" % "2.0.3" excludeAll(excludeNetty), + "com.typesafe.akka" % "akka-remote" % "2.0.3" excludeAll(excludeNetty), + "com.typesafe.akka" % "akka-slf4j" % "2.0.3" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", - "cc.spray" % "spray-can" % "1.0-M2.1", - "cc.spray" % "spray-server" % "1.0-M2.1", - "cc.spray" % "spray-json_2.9.2" % "1.1.1", + "cc.spray" % "spray-can" % "1.0-M2.1" excludeAll(excludeNetty), + "cc.spray" % "spray-server" % "1.0-M2.1" excludeAll(excludeNetty), + "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty), "org.apache.mesos" % "mesos" % "0.9.0-incubating" ) ++ ( if (HADOOP_MAJOR_VERSION == "2") { if (HADOOP_YARN) { Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ) + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) } else { Seq( - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ) + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) } } else { - Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ) ) + Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) }), unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") { @@ -205,10 +208,10 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty), "com.github.sgroschupf" % "zkclient" % "0.1", - "org.twitter4j" % "twitter4j-stream" % "3.0.3", - "com.typesafe.akka" % "akka-zeromq" % "2.0.3" + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), + "com.typesafe.akka" % "akka-zeromq" % "2.0.3" excludeAll(excludeNetty) ) ) ++ assemblySettings ++ extraAssemblySettings -- cgit v1.2.3 From afee9024430ef79cc0840a5e5788b60c8c53f9d2 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 28 Apr 2013 22:26:45 +0530 Subject: Attempt to fix streaming test failures after yarn branch merge --- bagel/src/test/scala/bagel/BagelSuite.scala | 1 + core/src/test/scala/spark/LocalSparkContext.scala | 3 ++- repl/src/test/scala/spark/repl/ReplSuite.scala | 1 + .../main/scala/spark/streaming/Checkpoint.scala | 30 +++++++++++++++++----- .../spark/streaming/util/MasterFailureTest.scala | 8 +++++- .../spark/streaming/BasicOperationsSuite.scala | 1 + .../scala/spark/streaming/CheckpointSuite.scala | 4 ++- .../test/scala/spark/streaming/FailureSuite.scala | 2 ++ .../scala/spark/streaming/InputStreamsSuite.scala | 1 + .../spark/streaming/WindowOperationsSuite.scala | 1 + 10 files changed, 42 insertions(+), 10 deletions(-) diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 25db395c22..a09c978068 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -23,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("halting by voting") { diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala index ff00dd05dd..76d5258b02 100644 --- a/core/src/test/scala/spark/LocalSparkContext.scala +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -27,6 +27,7 @@ object LocalSparkContext { sc.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ @@ -38,4 +39,4 @@ object LocalSparkContext { } } -} \ No newline at end of file +} diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 43559b96d3..1c64f9b98d 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -32,6 +32,7 @@ class ReplSuite extends FunSuite { interp.sparkContext.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") return out.toString } diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index e303e33e5e..7bd104b8d5 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(checkpointDir: String) extends Logging { val file = new Path(checkpointDir, "graph") + // The file to which we actually write - and then "move" to file. + private val writeFile = new Path(file.getParent, file.getName + ".next") + private val bakFile = new Path(file.getParent, file.getName + ".bk") + + @volatile private var stopped = false + val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // I did not notice any errors - reintroduce it ? + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() while (attempts < maxAttempts) { + if (stopped) { + logInfo("Already stopped, ignore checkpoint attempt for " + file) + return + } attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) - } - val fos = fs.create(file) + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + val fos = fs.create(writeFile) fos.write(bytes) fos.close() - fos.close() + if (fs.exists(file) && fs.rename(file, bakFile)) { + logDebug("Moved existing checkpoint file to " + bakFile) + } + // paranoia + fs.delete(file, false) + fs.rename(writeFile, file) + val finishTime = System.currentTimeMillis(); logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") @@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { + stopped = true executor.shutdown() } } diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index f673e5be15..e7a3f92bc0 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -74,6 +74,7 @@ object MasterFailureTest extends Logging { val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Long], state: Option[Long]) => { + logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values) Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) } st.flatMap(_.split(" ")) @@ -159,6 +160,7 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) @@ -205,6 +207,7 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { @@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) // Write the data to a local file and then move it to the target test directory val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false while (!done && tries < maxTries) { tries += 1 try { - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) done = true } catch { case ioe: IOException => { diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index cf2ed8b1d4..e7352deb81 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index cac86deeaf..607dea77ec 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } var ssc: StreamingContext = null @@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart @@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] outputStream.output } -} \ No newline at end of file +} diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index a5fa7ab92d..4529e774e9 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -22,10 +22,12 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging { val batchDuration = Milliseconds(1000) before { + logInfo("BEFORE ...") FileUtils.deleteDirectory(new File(directory)) } after { + logInfo("AFTER ...") FileUtils.deleteDirectory(new File(directory)) } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 67dca2ac31..0acb6db6f2 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 1b66f3bda2..80d827706f 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } val largerSlideInput = Seq( -- cgit v1.2.3