diff options
22 files changed, 82 insertions, 44 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index ed39732f13..ccd9d0364a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD import spark.rdd.MapPartitionsWithIndexRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD +import spark.rdd.ShuffledRDD import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD @@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions) + def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { + if (shuffle) { + // include a shuffle step so that our upstream tasks are still distributed + new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys + } else { + new CoalescedRDD(this, numPartitions) + } + } /** * Return a sampled subset of this RDD. @@ -358,7 +366,7 @@ abstract class RDD[T: ClassManifest]( * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - @deprecated("use mapPartitionsWithIndex") + @deprecated("use mapPartitionsWithIndex", "0.7.0") def mapPartitionsWithSplit[U: ClassManifest]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 6b4a11d6d3..518034e07b 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -36,17 +36,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla self: RDD[(K, V)]) extends Logging with Serializable { - + private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { classManifest[T].erasure } else { // We get the type of the Writable class by looking at the apply method which converts // from T to Writable. Since we have two apply methods we filter out the one which - // is of the form "java.lang.Object apply(java.lang.Object)" + // is not of the form "java.lang.Object apply(java.lang.Object)" implicitly[T => Writable].getClass.getDeclaredMethods().filter( - m => m.getReturnType().toString != "java.lang.Object" && + m => m.getReturnType().toString != "class java.lang.Object" && m.getName() == "apply")(0).getReturnType } @@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla val valueClass = getWritableClass[V] val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) - - logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) + + logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyClass, valueClass, format) + self.saveAsHadoopFile(path, keyClass, valueClass, format) } else if (!convertKey && convertValue) { - self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && !convertValue) { - self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) + self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && convertValue) { - self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) - } + self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + } } } diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index ba00b6a844..16692c0440 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -58,6 +58,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions)) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD = + fromRDD(srdd.coalesce(numPartitions, shuffle)) + + /** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 49aaabf835..30084df4e2 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions)) + def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions)) + + /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] = + fromRDD(rdd.coalesce(numPartitions, shuffle)) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 3016888898..e29f1e5899 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -44,6 +44,12 @@ JavaRDDLike[T, JavaRDD[T]] { def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = + rdd.coalesce(numPartitions, shuffle) + + /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index da3f4f636c..8919d1261c 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -81,7 +81,7 @@ private[spark] class Worker( } def startWebUi() { - val webUi = new WorkerWebUI(context.system, self) + val webUi = new WorkerWebUI(context.system, self, workDir) try { AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) } catch { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index 135cc2e86c..c834f87d50 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -12,12 +12,13 @@ import cc.spray.typeconversion.SprayJsonSupport._ import spark.deploy.{WorkerState, RequestWorkerState} import spark.deploy.JsonProtocol._ +import java.io.File /** * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { +class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives { val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" @@ -43,7 +44,7 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct path("log") { parameters("appId", "executorId", "logType") { (appId, executorId, logType) => respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) { - getFromFileName("work/" + appId + "/" + executorId + "/" + logType) + getFromFileName(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType) } } } ~ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 53635b1de6..7fbdd44340 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -3,7 +3,7 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite import spark.SparkContext._ -import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD} +import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} class RDDSuite extends FunSuite with LocalSparkContext { @@ -184,6 +184,11 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(coalesced4.collect().toList === (1 to 10).toList) assert(coalesced4.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) + + // we can optionally shuffle to keep the upstream parallel + val coalesced5 = data.coalesce(1, shuffle = true) + assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] != + null) } test("zipped RDDs") { diff --git a/docs/_config.yml b/docs/_config.yml index f99d5bb376..055ba77c5b 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -5,6 +5,6 @@ markdown: kramdown # of Spark, Scala, and Mesos. SPARK_VERSION: 0.7.1-SNAPSHOT SPARK_VERSION_SHORT: 0.7.1 -SCALA_VERSION: 2.9.2 +SCALA_VERSION: 2.9.3 MESOS_VERSION: 0.9.0-incubating SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index c2eeafd07a..04cd79d039 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -42,10 +42,10 @@ To run a specific test suite: You might run into the following errors if you're using a vanilla installation of Maven: - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes... + [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes... [ERROR] PermGen space -> [Help 1] - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes... + [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] To fix these, you can do the following: diff --git a/docs/index.md b/docs/index.md index 51d505e1fa..0c4add45dc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -18,7 +18,7 @@ or you will need to set the `SCALA_HOME` environment variable to point to where you've installed Scala. Scala must also be accessible through one of these methods on slave nodes on your cluster. -Spark uses [Simple Build Tool](https://github.com/harrah/xsbt/wiki), which is bundled with it. To compile the code, go into the top-level Spark directory and run +Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run sbt/sbt package diff --git a/docs/quick-start.md b/docs/quick-start.md index 216f7c9cc5..5c80d2ed3a 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -111,14 +111,16 @@ We'll create a very simple Spark job in Scala. So simple, in fact, that it's nam import spark.SparkContext import SparkContext._ -object SimpleJob extends Application { - val logFile = "/var/log/syslog" // Should be some file on your system - val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", - List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")) - val logData = sc.textFile(logFile, 2).cache() - val numAs = logData.filter(line => line.contains("a")).count() - val numBs = logData.filter(line => line.contains("b")).count() - println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) +object SimpleJob { + def main(args: Array[String]) { + val logFile = "/var/log/syslog" // Should be some file on your system + val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", + List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")) + val logData = sc.textFile(logFile, 2).cache() + val numAs = logData.filter(line => line.contains("a")).count() + val numBs = logData.filter(line => line.contains("b")).count() + println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) + } } {% endhighlight %} diff --git a/docs/tuning.md b/docs/tuning.md index 843380b9a2..32c7ab86e9 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -49,7 +49,7 @@ Finally, to register your classes with Kryo, create a public class that extends {% highlight scala %} import com.esotericsoftware.kryo.Kryo -class MyRegistrator extends KryoRegistrator { +class MyRegistrator extends spark.KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[MyClass1]) kryo.register(classOf[MyClass2]) diff --git a/examples/pom.xml b/examples/pom.xml index 39cc47c709..0537404040 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -22,7 +22,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>algebird-core_2.9.2</artifactId> - <version>0.1.8</version> + <version>0.1.11</version> </dependency> <dependency> <groupId>org.scalatest</groupId> diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index 483aae452b..a9642100e3 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -49,7 +49,7 @@ object TwitterAlgebirdCMS { val users = stream.map(status => status.getUser.getId) - val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) var globalCMS = cms.zero val mm = new MapMonoid[Long, Int]() var globalExact = Map[Long, Int]() diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ea146b7b0b..08a6c1866e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -36,7 +36,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.7.1-SNAPSHOT", - scalaVersion := "2.9.2", + scalaVersion := "2.9.3", scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, @@ -140,7 +140,7 @@ object SparkBuild extends Build { "colt" % "colt" % "1.2.0", "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", - "cc.spray" %% "spray-json" % "1.1.1", + "cc.spray" % "spray-json_2.9.2" % "1.1.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } @@ -157,7 +157,7 @@ object SparkBuild extends Build { def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", - libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8") + libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.11") ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py index 73f7f8fbaf..7f85a1008e 100644 --- a/python/examples/transitive_closure.py +++ b/python/examples/transitive_closure.py @@ -24,7 +24,7 @@ if __name__ == "__main__": "Usage: PythonTC <master> [<slices>]" exit(-1) sc = SparkContext(sys.argv[1], "PythonTC") - slices = sys.argv[2] if len(sys.argv) > 2 else 2 + slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 tc = sc.parallelize(generateGraph(), slices).cache() # Linear transitive closure: each round grows paths by one edge, diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 172ed85fab..a9fec17a9d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -399,7 +399,7 @@ class RDD(object): >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob - >>> ''.join(input(glob(tempFile.name + "/part-0000*"))) + >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' """ def func(split, iterator): @@ -1,6 +1,6 @@ #!/bin/bash -SCALA_VERSION=2.9.2 +SCALA_VERSION=2.9.3 # Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`; pwd)" @@ -1,6 +1,6 @@ @echo off -set SCALA_VERSION=2.9.2 +set SCALA_VERSION=2.9.3 rem Figure out where the Spark framework is installed set FWDIR=%~dp0 diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 8fce91853c..cf2ed8b1d4 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -6,10 +6,12 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - override def framework() = "BasicOperationsSuite" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 4d33857b25..67dca2ac31 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -30,12 +30,14 @@ import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - val testPort = 9999 override def checkpointDir = "checkpoint" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") |