diff options
-rw-r--r-- | README.md | 9 | ||||
-rw-r--r-- | core/src/main/scala/spark/CacheManager.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 76 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaDoubleRDD.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaPairRDD.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDD.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/executor/Executor.scala | 87 | ||||
-rw-r--r-- | core/src/main/scala/spark/executor/MesosExecutorBackend.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala | 13 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 67 | ||||
-rw-r--r-- | docs/quick-start.md | 18 | ||||
-rw-r--r-- | docs/tuning.md | 2 | ||||
-rw-r--r-- | project/SparkBuild.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala | 2 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala | 6 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 6 |
16 files changed, 247 insertions, 79 deletions
@@ -12,12 +12,15 @@ This README file only contains basic setup instructions. ## Building -Spark requires Scala 2.9.2. The project is built using Simple Build Tool (SBT), -which is packaged with it. To build Spark and its example programs, run: +Spark requires Scala 2.9.2 (Scala 2.10 is not yet supported). The project is +built using Simple Build Tool (SBT), which is packaged with it. To build +Spark and its example programs, run: sbt/sbt package -Spark also supports building using Maven. If you would like to build using Maven, see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html) in the spark documentation.. +Spark also supports building using Maven. If you would like to build using Maven, +see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html) +in the spark documentation.. To run Spark, you will need to have Scala's bin directory in your `PATH`, or you will need to set the `SCALA_HOME` environment variable to point to where diff --git a/core/src/main/scala/spark/CacheManager.scala b/core/src/main/scala/spark/CacheManager.scala index c7b379a3fb..f7a2b7e802 100644 --- a/core/src/main/scala/spark/CacheManager.scala +++ b/core/src/main/scala/spark/CacheManager.scala @@ -27,7 +27,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { if (loading.contains(key)) { logInfo("Loading contains " + key + ", waiting...") while (loading.contains(key)) { - try {loading.wait()} catch {case _ =>} + try {loading.wait()} catch {case _ : Throwable =>} } logInfo("Loading no longer contains " + key + ", so returning cached result") // See whether someone else has successfully loaded it. The main way this would fail diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 9bd8a0f98d..33dc7627a3 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD import spark.rdd.MapPartitionsWithIndexRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD +import spark.rdd.ShuffledRDD import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD @@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions) + def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { + if (shuffle) { + // include a shuffle step so that our upstream tasks are still distributed + new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys + } else { + new CoalescedRDD(this, numPartitions) + } + } /** * Return a sampled subset of this RDD. @@ -365,6 +373,62 @@ abstract class RDD[T: ClassManifest]( new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning) /** + * Maps f over this RDD, where f takes an additional parameter of type A. This + * additional parameter is produced by constructA, which is called in each + * partition with the index of that partition. + */ + def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) + (f:(T, A) => U): RDD[U] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { + val a = constructA(index) + iter.map(t => f(t, a)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + + /** + * FlatMaps f over this RDD, where f takes an additional parameter of type A. This + * additional parameter is produced by constructA, which is called in each + * partition with the index of that partition. + */ + def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) + (f:(T, A) => Seq[U]): RDD[U] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { + val a = constructA(index) + iter.flatMap(t => f(t, a)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + + /** + * Applies f to each element of this RDD, where f takes an additional parameter of type A. + * This additional parameter is produced by constructA, which is called in each + * partition with the index of that partition. + */ + def foreachWith[A: ClassManifest](constructA: Int => A) + (f:(T, A) => Unit) { + def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { + val a = constructA(index) + iter.map(t => {f(t, a); t}) + } + (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {}) + } + + /** + * Filters this RDD with p, where p takes an additional parameter of type A. This + * additional parameter is produced by constructA, which is called in each + * partition with the index of that partition. + */ + def filterWith[A: ClassManifest](constructA: Int => A) + (p:(T, A) => Boolean): RDD[T] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { + val a = constructA(index) + iter.filter(t => p(t, a)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true) + } + + /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of * partitions* and the *same number of elements in each partition* (e.g. one was made through @@ -383,6 +447,14 @@ abstract class RDD[T: ClassManifest]( } /** + * Applies a function f to each partition of this RDD. + */ + def foreachPartition(f: Iterator[T] => Unit) { + val cleanF = sc.clean(f) + sc.runJob(this, (iter: Iterator[T]) => f(iter)) + } + + /** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = { @@ -404,7 +476,7 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index ba00b6a844..16692c0440 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -58,6 +58,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions)) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD = + fromRDD(srdd.coalesce(numPartitions, shuffle)) + + /** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 49aaabf835..30084df4e2 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions)) + def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions)) + + /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] = + fromRDD(rdd.coalesce(numPartitions, shuffle)) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 3016888898..e29f1e5899 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -44,6 +44,12 @@ JavaRDDLike[T, JavaRDD[T]] { def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = + rdd.coalesce(numPartitions, shuffle) + + /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 4474ef4593..3e7407b58d 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -16,66 +16,61 @@ import java.nio.ByteBuffer /** * The Mesos executor for Spark. */ -private[spark] class Executor extends Logging { - var urlClassLoader : ExecutorURLClassLoader = null - var threadPool: ExecutorService = null - var env: SparkEnv = null - +private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging { + // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. - val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() - val currentJars: HashMap[String, Long] = new HashMap[String, Long]() + private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() + private val currentJars: HashMap[String, Long] = new HashMap[String, Long]() - val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) + private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) initLogging() - def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) { - // Make sure the local hostname we report matches the cluster scheduler's name for this host - Utils.setCustomHostname(slaveHostname) + // Make sure the local hostname we report matches the cluster scheduler's name for this host + Utils.setCustomHostname(slaveHostname) - // Set spark.* system properties from executor arg - for ((key, value) <- properties) { - System.setProperty(key, value) - } + // Set spark.* system properties from executor arg + for ((key, value) <- properties) { + System.setProperty(key, value) + } + + // Create our ClassLoader and set it on this thread + private val urlClassLoader = createClassLoader() + Thread.currentThread.setContextClassLoader(urlClassLoader) - // Create our ClassLoader and set it on this thread - urlClassLoader = createClassLoader() - Thread.currentThread.setContextClassLoader(urlClassLoader) - - // Make any thread terminations due to uncaught exceptions kill the entire - // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } + // Make any thread terminations due to uncaught exceptions kill the entire + // executor process to avoid surprising stalls. + Thread.setDefaultUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } - ) + } + ) - // Initialize Spark environment (using system properties read above) - env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) - SparkEnv.set(env) + // Initialize Spark environment (using system properties read above) + val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) + SparkEnv.set(env) - // Start worker thread pool - threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - } + // Start worker thread pool + val threadPool = new ThreadPoolExecutor( + 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { threadPool.execute(new TaskRunner(context, taskId, serializedTask)) diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala index 818d6d1dda..10f3531df0 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -8,11 +8,12 @@ import com.google.protobuf.ByteString import spark.{Utils, Logging} import spark.TaskState -private[spark] class MesosExecutorBackend(executor: Executor) +private[spark] class MesosExecutorBackend extends MesosExecutor with ExecutorBackend with Logging { + var executor: Executor = null var driver: ExecutorDriver = null override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { @@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor) logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) - executor.initialize( + executor = new Executor( executorInfo.getExecutorId.getValue, slaveInfo.getHostname, - properties - ) + properties) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { val taskId = taskInfo.getTaskId.getValue.toLong - executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + if (executor == null) { + logError("Received launchTask but executor was null") + } else { + executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + } } override def error(d: ExecutorDriver, message: String) { @@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend { def main(args: Array[String]) { MesosNativeLibrary.load() // Create a new Executor and start it running - val runner = new MesosExecutorBackend(new Executor) + val runner = new MesosExecutorBackend() new MesosExecutorDriver(runner).run() } } diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 9a82c3054c..1047f71c6a 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed import spark.scheduler.cluster.RegisterExecutor private[spark] class StandaloneExecutorBackend( - executor: Executor, driverUrl: String, executorId: String, hostname: String, @@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend( with ExecutorBackend with Logging { + var executor: Executor = null var driver: ActorRef = null override def preStart() { @@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend( override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") - executor.initialize(executorId, hostname, sparkProperties) + executor = new Executor(executorId, hostname, sparkProperties) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend( case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) - executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + if (executor == null) { + logError("Received launchTask but executor was null") + System.exit(1) + } else { + executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") @@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend { // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)), + Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)), name = "Executor") actorSystem.awaitTermination() } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 0ff4a47998..7fbdd44340 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -3,7 +3,7 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite import spark.SparkContext._ -import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD} +import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} class RDDSuite extends FunSuite with LocalSparkContext { @@ -184,6 +184,11 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(coalesced4.collect().toList === (1 to 10).toList) assert(coalesced4.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) + + // we can optionally shuffle to keep the upstream parallel + val coalesced5 = data.coalesce(1, shuffle = true) + assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] != + null) } test("zipped RDDs") { @@ -208,4 +213,64 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(prunedData.size === 1) assert(prunedData(0) === 10) } + + test("mapWith") { + import java.util.Random + sc = new SparkContext("local", "test") + val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) + val randoms = ones.mapWith( + (index: Int) => new Random(index + 42)) + {(t: Int, prng: Random) => prng.nextDouble * t}.collect() + val prn42_3 = { + val prng42 = new Random(42) + prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() + } + val prn43_3 = { + val prng43 = new Random(43) + prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() + } + assert(randoms(2) === prn42_3) + assert(randoms(5) === prn43_3) + } + + test("flatMapWith") { + import java.util.Random + sc = new SparkContext("local", "test") + val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) + val randoms = ones.flatMapWith( + (index: Int) => new Random(index + 42)) + {(t: Int, prng: Random) => + val random = prng.nextDouble() + Seq(random * t, random * t * 10)}. + collect() + val prn42_3 = { + val prng42 = new Random(42) + prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() + } + val prn43_3 = { + val prng43 = new Random(43) + prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() + } + assert(randoms(5) === prn42_3 * 10) + assert(randoms(11) === prn43_3 * 10) + } + + test("filterWith") { + import java.util.Random + sc = new SparkContext("local", "test") + val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) + val sample = ints.filterWith( + (index: Int) => new Random(index + 42)) + {(t: Int, prng: Random) => prng.nextInt(3) == 0}. + collect() + val checkSample = { + val prng42 = new Random(42) + val prng43 = new Random(43) + Array(1, 2, 3, 4, 5, 6).filter{i => + if (i < 4) 0 == prng42.nextInt(3) + else 0 == prng43.nextInt(3)} + } + assert(sample.size === checkSample.size) + for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) + } } diff --git a/docs/quick-start.md b/docs/quick-start.md index 216f7c9cc5..5c80d2ed3a 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -111,14 +111,16 @@ We'll create a very simple Spark job in Scala. So simple, in fact, that it's nam import spark.SparkContext import SparkContext._ -object SimpleJob extends Application { - val logFile = "/var/log/syslog" // Should be some file on your system - val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", - List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")) - val logData = sc.textFile(logFile, 2).cache() - val numAs = logData.filter(line => line.contains("a")).count() - val numBs = logData.filter(line => line.contains("b")).count() - println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) +object SimpleJob { + def main(args: Array[String]) { + val logFile = "/var/log/syslog" // Should be some file on your system + val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", + List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")) + val logData = sc.textFile(logFile, 2).cache() + val numAs = logData.filter(line => line.contains("a")).count() + val numBs = logData.filter(line => line.contains("b")).count() + println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) + } } {% endhighlight %} diff --git a/docs/tuning.md b/docs/tuning.md index 843380b9a2..32c7ab86e9 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -49,7 +49,7 @@ Finally, to register your classes with Kryo, create a public class that extends {% highlight scala %} import com.esotericsoftware.kryo.Kryo -class MyRegistrator extends KryoRegistrator { +class MyRegistrator extends spark.KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[MyClass1]) kryo.register(classOf[MyClass2]) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d44bf3b5e3..5f378b2398 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -37,7 +37,7 @@ object SparkBuild extends Build { organization := "org.spark-project", version := "0.7.1-SNAPSHOT", scalaVersion := "2.9.2", - scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue + scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala index 6b310bc0b6..da224ad6f7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala @@ -28,7 +28,7 @@ class QueueInputDStream[T: ClassManifest]( } if (buffer.size > 0) { if (oneAtATime) { - Some(buffer.first) + Some(buffer.head) } else { Some(new UnionRDD(ssc.sc, buffer.toSeq)) } diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 8fce91853c..cf2ed8b1d4 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -6,10 +6,12 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - override def framework() = "BasicOperationsSuite" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 4d33857b25..67dca2ac31 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -30,12 +30,14 @@ import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - val testPort = 9999 override def checkpointDir = "checkpoint" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") |