diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-19 11:20:48 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-19 11:20:48 -0800 |
commit | 984c5824876e0daceb8a74af57593926faf727ce (patch) | |
tree | 48be0648167683de8dd064b17513c81956bbffb5 /streaming | |
parent | 5e9ce83d682d6198cda4631faf11cb53fcccf07f (diff) | |
parent | ec71b445ad0440e84c4b4909e4faf75aba0f13d7 (diff) | |
download | spark-984c5824876e0daceb8a74af57593926faf727ce.tar.gz spark-984c5824876e0daceb8a74af57593926faf727ce.tar.bz2 spark-984c5824876e0daceb8a74af57593926faf727ce.zip |
Merge branch 'scheduler-update' into filestream-fix
Conflicts:
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
Diffstat (limited to 'streaming')
57 files changed, 1009 insertions, 620 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml index 40892937b8..e3b6fee9b2 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.9.3</artifactId> + <artifactId>spark-streaming_2.10</artifactId> <packaging>jar</packaging> <name>Spark Project Streaming</name> <url>http://spark.incubator.apache.org/</url> @@ -48,7 +48,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.9.3</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -61,8 +61,8 @@ <version>1.9.11</version> </dependency> <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.9.2</artifactId> + <groupId>com.sksamuel.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> <version>0.8.0-beta1</version> <exclusions> <exclusion> @@ -111,16 +111,16 @@ </dependency> <dependency> <groupId>${akka.group}</groupId> - <artifactId>akka-zeromq</artifactId> + <artifactId>akka-zeromq_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.9.3</artifactId> + <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_2.9.3</artifactId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> @@ -134,14 +134,18 @@ <scope>test</scope> </dependency> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> <version>0.4.0</version> </dependency> </dependencies> <build> - <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <plugins> <plugin> <groupId>org.scalatest</groupId> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index bcf5e6b1e6..05760bfcd4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -41,7 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration - val pendingTimes = ssc.scheduler.jobManager.getPendingTimes() + val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds def validate() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 9ceff754c4..a78d3965ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -17,27 +17,23 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream._ import StreamingContext._ -import org.apache.spark.util.MetadataCleaner - -//import Time._ - +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.Job import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner -import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import scala.reflect.ClassTag import java.io.{ObjectInputStream, IOException, ObjectOutputStream} -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.conf.Configuration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] * for more details on RDDs). DStreams can either be created from live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each @@ -56,7 +52,7 @@ import org.apache.hadoop.conf.Configuration * - A function that is used to generate an RDD after each time interval */ -abstract class DStream[T: ClassManifest] ( +abstract class DStream[T: ClassTag] ( @transient protected[streaming] var ssc: StreamingContext ) extends Serializable with Logging { @@ -82,7 +78,7 @@ abstract class DStream[T: ClassManifest] ( // RDDs generated, marked as protected[streaming] so that testsuites can access it @transient protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () - + // Time zero for the DStream protected[streaming] var zeroTime: Time = null @@ -274,16 +270,16 @@ abstract class DStream[T: ClassManifest] ( /** * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal * method that should not be called directly. - */ + */ protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { // If this DStream was not initialized (i.e., zeroTime not set), then do it // If RDD was already generated, then retrieve it from HashMap generatedRDDs.get(time) match { - - // If an RDD was already generated and is being reused, then + + // If an RDD was already generated and is being reused, then // probably all RDDs in this DStream will be reused and hence should be cached case Some(oldRDD) => Some(oldRDD) - + // if RDD was not generated, and if the time is valid // (based on sliding time of this DStream), then generate the RDD case None => { @@ -300,7 +296,7 @@ abstract class DStream[T: ClassManifest] ( } generatedRDDs.put(time, newRDD) Some(newRDD) - case None => + case None => None } } else { @@ -344,7 +340,7 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.clearOldMetadata(time)) } - /* Adds metadata to the Stream while it is running. + /* Adds metadata to the Stream while it is running. * This methd should be overwritten by sublcasses of InputDStream. */ protected[streaming] def addMetadata(metadata: Any) { @@ -416,7 +412,7 @@ abstract class DStream[T: ClassManifest] ( // ======================================================================= /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { + def map[U: ClassTag](mapFunc: T => U): DStream[U] = { new MappedDStream(this, context.sparkContext.clean(mapFunc)) } @@ -424,7 +420,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { + def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } @@ -450,7 +446,7 @@ abstract class DStream[T: ClassManifest] ( * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition * of the RDD. */ - def mapPartitions[U: ClassManifest]( + def mapPartitions[U: ClassTag]( mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false ): DStream[U] = { @@ -497,16 +493,14 @@ abstract class DStream[T: ClassManifest] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) - newStream + ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) } /** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) } @@ -514,7 +508,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) val cleanedF = context.sparkContext.clean(transformFunc) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { @@ -528,7 +522,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[U: ClassManifest, V: ClassManifest]( + def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { val cleanedF = ssc.sparkContext.clean(transformFunc) @@ -539,7 +533,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[U: ClassManifest, V: ClassManifest]( + def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { val cleanedF = ssc.sparkContext.clean(transformFunc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index 58a0da2870..3fd5d52403 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -20,13 +20,16 @@ package org.apache.spark.streaming import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.conf.Configuration + import collection.mutable.HashMap import org.apache.spark.Logging +import scala.collection.mutable.HashMap +import scala.reflect.ClassTag private[streaming] -class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) +class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() @@ -107,4 +110,3 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]" } } - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index b9a58fded6..daed7ff7c3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -21,6 +21,7 @@ import dstream.InputDStream import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import collection.mutable.ArrayBuffer import org.apache.spark.Logging +import org.apache.spark.streaming.scheduler.Job final private[streaming] class DStreamGraph extends Serializable with Logging { initLogging() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala deleted file mode 100644 index 5233129506..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import org.apache.spark.Logging -import org.apache.spark.SparkEnv -import java.util.concurrent.Executors -import collection.mutable.HashMap -import collection.mutable.ArrayBuffer - - -private[streaming] -class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { - - class JobHandler(ssc: StreamingContext, job: Job) extends Runnable { - def run() { - SparkEnv.set(ssc.env) - try { - val timeTaken = job.run() - logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format( - (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0)) - } catch { - case e: Exception => - logError("Running " + job + " failed", e) - } - clearJob(job) - } - } - - initLogging() - - val jobExecutor = Executors.newFixedThreadPool(numThreads) - val jobs = new HashMap[Time, ArrayBuffer[Job]] - - def runJob(job: Job) { - jobs.synchronized { - jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job - } - jobExecutor.execute(new JobHandler(ssc, job)) - logInfo("Added " + job + " to queue") - } - - def stop() { - jobExecutor.shutdown() - } - - private def clearJob(job: Job) { - var timeCleared = false - val time = job.time - jobs.synchronized { - val jobsOfTime = jobs.get(time) - if (jobsOfTime.isDefined) { - jobsOfTime.get -= job - if (jobsOfTime.get.isEmpty) { - jobs -= time - timeCleared = true - } - } else { - throw new Exception("Job finished for time " + job.time + - " but time does not exist in jobs") - } - } - if (timeCleared) { - ssc.scheduler.clearOldMetadata(time) - } - } - - def getPendingTimes(): Array[Time] = { - jobs.synchronized { - jobs.keySet.toArray - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 8c12fd11ef..80af96c060 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -18,16 +18,15 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} -import org.apache.spark.streaming.dstream.{ShuffledDStream} -import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} +import org.apache.spark.streaming.dstream._ import org.apache.spark.{Partitioner, HashPartitioner} import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{Manifests, RDD, PairRDDFunctions} +import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions} import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer +import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} @@ -35,7 +34,7 @@ import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration -class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) +class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { private[streaming] def ssc = self.ssc @@ -105,7 +104,7 @@ extends Serializable { * combineByKey for RDDs. Please refer to combineByKey in * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ - def combineByKey[C: ClassManifest]( + def combineByKey[C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, @@ -205,7 +204,7 @@ extends Serializable { * DStream's batching interval */ def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, + reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration ): DStream[(K, V)] = { @@ -336,7 +335,7 @@ extends Serializable { * corresponding state key-value pair will be eliminated. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) @@ -351,7 +350,7 @@ extends Serializable { * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int ): DStream[(K, S)] = { @@ -367,7 +366,7 @@ extends Serializable { * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = { @@ -390,7 +389,7 @@ extends Serializable { * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean @@ -402,7 +401,7 @@ extends Serializable { * Return a new DStream by applying a map function to the value of each key-value pairs in * 'this' DStream without changing the key. */ - def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { + def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { new MapValuedDStream[K, V, U](self, mapValuesFunc) } @@ -410,7 +409,7 @@ extends Serializable { * Return a new DStream by applying a flatmap function to the value of each key-value pairs in * 'this' DStream without changing the key. */ - def flatMapValues[U: ClassManifest]( + def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) @@ -421,7 +420,7 @@ extends Serializable { * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ - def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner()) } @@ -429,7 +428,7 @@ extends Serializable { * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -437,7 +436,7 @@ extends Serializable { * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. */ - def cogroup[W: ClassManifest]( + def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Seq[V], Seq[W]))] = { @@ -451,7 +450,7 @@ extends Serializable { * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ - def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } @@ -459,7 +458,7 @@ extends Serializable { * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner(numPartitions)) } @@ -467,7 +466,7 @@ extends Serializable { * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def join[W: ClassManifest]( + def join[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, W))] = { @@ -482,7 +481,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ - def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { leftOuterJoin[W](other, defaultPartitioner()) } @@ -491,7 +490,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` * partitions. */ - def leftOuterJoin[W: ClassManifest]( + def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int ): DStream[(K, (V, Option[W]))] = { @@ -503,7 +502,7 @@ extends Serializable { * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ - def leftOuterJoin[W: ClassManifest]( + def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, Option[W]))] = { @@ -518,7 +517,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ - def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { rightOuterJoin[W](other, defaultPartitioner()) } @@ -527,7 +526,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` * partitions. */ - def rightOuterJoin[W: ClassManifest]( + def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int ): DStream[(K, (Option[V], W))] = { @@ -539,7 +538,7 @@ extends Serializable { * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ - def rightOuterJoin[W: ClassManifest]( + def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Option[V], W))] = { @@ -556,8 +555,8 @@ extends Serializable { def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassManifest[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + )(implicit fm: ClassTag[F]) { + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -586,8 +585,8 @@ extends Serializable { def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassManifest[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + )(implicit fm: ClassTag[F]) { + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -609,9 +608,7 @@ extends Serializable { self.foreach(saveFunc) } - private def getKeyClass() = implicitly[ClassManifest[K]].erasure + private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass - private def getValueClass() = implicitly[ClassManifest[V]].erasure + private def getValueClass() = implicitly[ClassTag[V]].runtimeClass } - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index d6fc2a19f4..71065f98fc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -20,33 +20,36 @@ package org.apache.spark.streaming import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe +import akka.util.ByteString import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.receivers.ActorReceiver import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy import org.apache.spark.streaming.receivers.ZeroMQReceiver import org.apache.spark.storage.StorageLevel import org.apache.spark.util.MetadataCleaner import org.apache.spark.streaming.receivers.ActorReceiver +import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker} import scala.collection.mutable.Queue import scala.collection.Map +import scala.reflect.ClassTag import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger -import java.util.UUID import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path + import twitter4j.Status import twitter4j.auth.Authorization -import org.apache.spark.deploy.SparkHadoopUtil + /** @@ -146,9 +149,10 @@ class StreamingContext private ( } } - protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null - protected[streaming] var receiverJobThread: Thread = null - protected[streaming] var scheduler: Scheduler = null + protected[streaming] val checkpointDuration: Duration = { + if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration + } + protected[streaming] val scheduler = new JobScheduler(this) /** * Return the associated Spark context @@ -195,7 +199,7 @@ class StreamingContext private ( * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of NetworkReceiver */ - def networkStream[T: ClassManifest]( + def networkStream[T: ClassTag]( receiver: NetworkReceiver[T]): DStream[T] = { val inputStream = new PluggableInputDStream[T](this, receiver) @@ -215,7 +219,7 @@ class StreamingContext private ( * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. */ - def actorStream[T: ClassManifest]( + def actorStream[T: ClassTag]( props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, @@ -235,14 +239,14 @@ class StreamingContext private ( * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ - def zeroMQStream[T: ClassManifest]( + def zeroMQStream[T: ClassTag]( publisherUrl:String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy ): DStream[T] = { - actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } @@ -279,8 +283,8 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects */ def kafkaStream[ - K: ClassManifest, - V: ClassManifest, + K: ClassTag, + V: ClassTag, U <: kafka.serializer.Decoder[_]: Manifest, T <: kafka.serializer.Decoder[_]: Manifest]( kafkaParams: Map[String, String], @@ -319,7 +323,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def socketStream[T: ClassManifest]( + def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], @@ -341,7 +345,7 @@ class StreamingContext private ( port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) + val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel) registerInputStream(inputStream) inputStream } @@ -356,7 +360,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects in the received blocks */ - def rawSocketStream[T: ClassManifest]( + def rawSocketStream[T: ClassTag]( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 @@ -376,9 +380,9 @@ class StreamingContext private ( * @tparam F Input format for reading HDFS file */ def fileStream[ - K: ClassManifest, - V: ClassManifest, - F <: NewInputFormat[K, V]: ClassManifest + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag ] (directory: String): DStream[(K, V)] = { val inputStream = new FileInputDStream[K, V, F](this, directory) registerInputStream(inputStream) @@ -396,9 +400,9 @@ class StreamingContext private ( * @tparam F Input format for reading HDFS file */ def fileStream[ - K: ClassManifest, - V: ClassManifest, - F <: NewInputFormat[K, V]: ClassManifest + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) registerInputStream(inputStream) @@ -440,7 +444,7 @@ class StreamingContext private ( * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @tparam T Type of objects in the RDD */ - def queueStream[T: ClassManifest]( + def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean = true ): DStream[T] = { @@ -456,7 +460,7 @@ class StreamingContext private ( * Set as null if no RDD should be returned when empty * @tparam T Type of objects in the RDD */ - def queueStream[T: ClassManifest]( + def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] @@ -484,7 +488,7 @@ class StreamingContext private ( /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ - def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { + def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = { new UnionDStream[T](streams.toArray) } @@ -492,7 +496,7 @@ class StreamingContext private ( * Create a new DStream in which each RDD is generated by applying a function on RDDs of * the DStreams. */ - def transform[T: ClassManifest]( + def transform[T: ClassTag]( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] ): DStream[T] = { @@ -514,6 +518,13 @@ class StreamingContext private ( graph.addOutputStream(outputStream) } + /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + * receiving system events related to streaming. + */ + def addStreamingListener(streamingListener: StreamingListener) { + scheduler.listenerBus.addListener(streamingListener) + } + protected def validate() { assert(graph != null, "Graph is null") graph.validate() @@ -529,27 +540,22 @@ class StreamingContext private ( * Start the execution of the streams. */ def start() { - if (checkpointDir != null && checkpointDuration == null && graph != null) { - checkpointDuration = graph.batchDuration - } - validate() + // Get the network input streams val networkInputStreams = graph.getInputStreams().filter(s => s match { case n: NetworkInputDStream[_] => true case _ => false }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray + // Start the network input tracker (must start before receivers) if (networkInputStreams.length > 0) { - // Start the network input tracker (must start before receivers) networkInputTracker = new NetworkInputTracker(this, networkInputStreams) networkInputTracker.start() } - Thread.sleep(1000) // Start the scheduler - scheduler = new Scheduler(this) scheduler.start() } @@ -560,7 +566,6 @@ class StreamingContext private ( try { if (scheduler != null) scheduler.stop() if (networkInputTracker != null) networkInputTracker.stop() - if (receiverJobThread != null) receiverJobThread.interrupt() sc.stop() logInfo("StreamingContext stopped successfully") } catch { @@ -572,7 +577,7 @@ class StreamingContext private ( object StreamingContext { - implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { + implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } @@ -599,9 +604,4 @@ object StreamingContext { prefix + "-" + time.milliseconds + "." + suffix } } - /* - protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = { - new Path(sscCheckpointDir, UUID.randomUUID.toString).toString - } - */ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index 1a2aeaa879..d29033df32 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -23,9 +23,11 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag + /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] * for more details on RDDs). DStreams can either be created from live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each @@ -41,7 +43,7 @@ import org.apache.spark.rdd.RDD * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval */ -class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) +class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]) extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] { override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) @@ -103,6 +105,6 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM } object JavaDStream { - implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = + implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] = new JavaDStream[T](dstream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 09189eadd8..64f38ce1c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -21,6 +21,7 @@ import java.util.{List => JList} import java.lang.{Long => JLong} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.spark.streaming._ import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} @@ -32,7 +33,7 @@ import JavaDStream._ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] extends Serializable { - implicit val classManifest: ClassManifest[T] + implicit val classTag: ClassTag[T] def dstream: DStream[T] @@ -136,7 +137,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** Return a new DStream by applying a function to all elements of this DStream. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } @@ -157,7 +158,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } @@ -260,8 +261,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] def scalaTransform (in: RDD[T]): RDD[U] = transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) @@ -272,8 +273,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] def scalaTransform (in: RDD[T], time: Time): RDD[U] = transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) @@ -285,10 +286,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T */ def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + implicit val cmk: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] def scalaTransform (in: RDD[T]): RDD[(K2, V2)] = transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) @@ -300,10 +301,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T */ def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + implicit val cmk: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] = transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) @@ -317,10 +318,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaDStream[U], transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmu: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - implicit val cmv: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cmu: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cmv: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _)) @@ -334,12 +335,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaDStream[U], transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]] ): JavaPairDStream[K2, V2] = { - implicit val cmu: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - implicit val cmk2: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv2: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + implicit val cmu: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _)) @@ -353,12 +354,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaPairDStream[K2, V2], transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmk2: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv2: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] - implicit val cmw: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmw: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _)) @@ -372,14 +373,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaPairDStream[K2, V2], transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]] ): JavaPairDStream[K3, V3] = { - implicit val cmk2: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv2: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] - implicit val cmk3: ClassManifest[K3] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]] - implicit val cmv3: ClassManifest[V3] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]] + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmk3: ClassTag[K3] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]] + implicit val cmv3: ClassTag[V3] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]] def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c6cd635afa..dfd6e27c3e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -21,6 +21,7 @@ import java.util.{List => JList} import java.lang.{Long => JLong} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ @@ -36,8 +37,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( - implicit val kManifest: ClassManifest[K], - implicit val vManifest: ClassManifest[V]) + implicit val kManifest: ClassTag[K], + implicit val vManifest: ClassTag[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) @@ -162,8 +163,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner ): JavaPairDStream[K, C] = { - implicit val cm: ClassManifest[C] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + implicit val cm: ClassTag[C] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } @@ -428,8 +429,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( */ def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) : JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) } @@ -446,8 +447,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } @@ -464,8 +465,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } @@ -475,8 +476,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * 'this' DStream without changing the key. */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] dstream.mapValues(f) } @@ -487,8 +488,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] dstream.flatMapValues(fn) } @@ -498,8 +499,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * of partitions. */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -511,8 +512,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream, numPartitions) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -525,8 +526,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream, partitioner) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -536,8 +537,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream) } @@ -546,8 +547,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream, numPartitions) } @@ -559,8 +560,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream, partitioner) } @@ -570,8 +571,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.leftOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -585,8 +586,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -599,8 +600,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -611,8 +612,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.rightOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -626,8 +627,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -641,8 +642,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -722,24 +723,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( new JavaDStream[(K, V)](dstream) } - override val classManifest: ClassManifest[(K, V)] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + override val classTag: ClassTag[(K, V)] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream { - implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = { + implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] new JavaPairDStream[K, V](dstream.dstream) } - def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]) + def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long]) : JavaPairDStream[K, JLong] = { StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 7f9dab0ef9..78d318cf27 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -22,12 +22,15 @@ import java.io.InputStream import java.util.{Map => JMap, List => JList} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import twitter4j.Status import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe +import akka.util.ByteString + import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD @@ -36,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.StreamingListener /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -141,10 +145,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { groupId: String, topics: JMap[String, JInt]) : JavaPairDStream[String, String] = { - implicit val cmt: ClassManifest[String] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), StorageLevel.MEMORY_ONLY_SER_2) + } /** @@ -162,8 +167,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { topics: JMap[String, JInt], storageLevel: StorageLevel) : JavaPairDStream[String, String] = { - implicit val cmt: ClassManifest[String] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } @@ -189,10 +194,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { topics: JMap[String, JInt], storageLevel: StorageLevel) : JavaPairDStream[K, V] = { - implicit val keyCmt: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val valueCmt: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] @@ -245,8 +250,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { storageLevel: StorageLevel) : JavaDStream[T] = { def fn = (x: InputStream) => converter.apply(x).toIterator - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.socketStream(hostname, port, fn, storageLevel) } @@ -274,8 +279,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { hostname: String, port: Int, storageLevel: StorageLevel): JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel)) } @@ -289,8 +294,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam T Type of the objects in the received blocks */ def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port)) } @@ -304,12 +309,12 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam F Input format for reading HDFS file */ def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] - implicit val cmf: ClassManifest[F] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + implicit val cmf: ClassTag[F] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]] ssc.fileStream[K, V, F](directory) } @@ -404,7 +409,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def twitterStream(): JavaDStream[Status] = { ssc.twitterStream() } - + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor @@ -422,8 +427,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) } @@ -443,8 +448,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { name: String, storageLevel: StorageLevel ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name, storageLevel) } @@ -462,8 +467,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { props: Props, name: String ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name) } @@ -480,12 +485,12 @@ class JavaStreamingContext(val ssc: StreamingContext) { def zeroMQStream[T]( publisherUrl:String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) } @@ -505,9 +510,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], storageLevel: StorageLevel ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) } @@ -525,9 +530,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator ssc.zeroMQStream[T](publisherUrl, subscribe, fn) } @@ -547,8 +552,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam T Type of objects in the RDD */ def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] sQueue.enqueue(queue.map(_.rdd).toSeq: _*) ssc.queueStream(sQueue) @@ -564,8 +569,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam T Type of objects in the RDD */ def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] sQueue.enqueue(queue.map(_.rdd).toSeq: _*) ssc.queueStream(sQueue, oneAtATime) @@ -585,8 +590,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean, defaultRDD: JavaRDD[T]): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] sQueue.enqueue(queue.map(_.rdd).toSeq: _*) ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) @@ -597,7 +602,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { */ def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = { val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) - implicit val cm: ClassManifest[T] = first.classManifest + implicit val cm: ClassTag[T] = first.classTag ssc.union(dstreams)(cm) } @@ -609,9 +614,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { rest: JList[JavaPairDStream[K, V]] ): JavaPairDStream[K, V] = { val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) - implicit val cm: ClassManifest[(K, V)] = first.classManifest - implicit val kcm: ClassManifest[K] = first.kManifest - implicit val vcm: ClassManifest[V] = first.vManifest + implicit val cm: ClassTag[(K, V)] = first.classTag + implicit val kcm: ClassTag[K] = first.kManifest + implicit val vcm: ClassTag[V] = first.vManifest new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) } @@ -628,8 +633,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { dstreams: JList[JavaDStream[_]], transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]] ): JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val scalaDStreams = dstreams.map(_.dstream).toSeq val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList @@ -651,10 +656,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { dstreams: JList[JavaDStream[_]], transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] ): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val scalaDStreams = dstreams.map(_.dstream).toSeq val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList @@ -683,6 +688,13 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.remember(duration) } + /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + * receiving system events related to streaming. + */ + def addStreamingListener(streamingListener: StreamingListener) { + ssc.addStreamingListener(streamingListener) + } + /** * Starts the execution of the streams. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index a9a05c9981..f396c34758 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -19,11 +19,12 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, StreamingContext} +import scala.reflect.ClassTag /** * An input stream that always returns the same RDD on each timestep. Useful for testing. */ -class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) +class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T]) extends InputDStream[T](ssc_) { override def start() {} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1a8db3ab59..79513548d2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,14 +26,16 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import scala.collection.mutable.{HashSet, HashMap} +import scala.reflect.ClassTag + import java.io.{ObjectInputStream, IOException} private[streaming] -class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( +class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @transient ssc_ : StreamingContext, directory: String, filter: Path => Boolean = FileInputDStream.defaultFilter, - newFilesOnly: Boolean = true) + newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData @@ -54,7 +56,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly) } - + override def stop() { } /** @@ -225,5 +227,3 @@ object FileInputDStream { val MAX_ATTEMPTS = 10 def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index 91ee2c1a36..db2e0a4cee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class FilteredDStream[T: ClassManifest]( +class FilteredDStream[T: ClassTag]( parent: DStream[T], filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index ca7d7ca49e..244dc3ee4f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag private[streaming] -class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( +class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( parent: DStream[(K, V)], flatMapValueFunc: V => TraversableOnce[U] ) extends DStream[(K, U)](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index b37966f9a7..336c4b7a92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( +class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => Traversable[U] ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala index a0189eca04..60d79175f1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala @@ -22,6 +22,7 @@ import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.flume.source.avro.AvroSourceProtocol import org.apache.flume.source.avro.AvroFlumeEvent @@ -34,7 +35,7 @@ import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel private[streaming] -class FlumeInputDStream[T: ClassManifest]( +class FlumeInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index e21bac4602..364abcde68 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -18,10 +18,12 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Duration, DStream, Job, Time} +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.scheduler.Job +import scala.reflect.ClassTag private[streaming] -class ForEachDStream[T: ClassManifest] ( +class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 4294b07d91..23136f44fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class GlommedDStream[T: ClassManifest](parent: DStream[T]) +class GlommedDStream[T: ClassTag](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { override def dependencies = List(parent) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 674b27118c..f01e67fe13 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -19,6 +19,8 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} +import scala.reflect.ClassTag + /** * This is the abstract base class for all input streams. This class provides to methods * start() and stop() which called by the scheduler to start and stop receiving data/ @@ -30,7 +32,7 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} * that requires running a receiver on the worker nodes, use NetworkInputDStream * as the parent class. */ -abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) +abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { var lastValidTime: Time = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index a5de5e1fb5..526f5564c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -31,11 +31,11 @@ import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient._ import scala.collection.Map - +import scala.reflect.ClassTag /** * Input stream that pulls messages from a Kafka Broker. - * + * * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. @@ -43,8 +43,8 @@ import scala.collection.Map */ private[streaming] class KafkaInputDStream[ - K: ClassManifest, - V: ClassManifest, + K: ClassTag, + V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, @@ -61,8 +61,8 @@ class KafkaInputDStream[ private[streaming] class KafkaReceiver[ - K: ClassManifest, - V: ClassManifest, + K: ClassTag, + V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( kafkaParams: Map[String, String], @@ -104,17 +104,18 @@ class KafkaReceiver[ tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) } - // Create Threads for each Topic/Message Stream we are listening - val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties]) + val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[K]] - val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties]) + val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[V]] + // Create Threads for each Topic/Message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) + // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } @@ -122,7 +123,7 @@ class KafkaReceiver[ } // Handles Kafka Messages - private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V]) + private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) extends Runnable { def run() { logInfo("Starting MessageHandler.") @@ -146,7 +147,7 @@ class KafkaReceiver[ zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case _ : Throwable => // swallow } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index ac0528213d..ef4a737568 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -37,6 +37,7 @@ import org.eclipse.paho.client.mqttv3.MqttTopic import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ +import scala.reflect.ClassTag /** * Input stream that subscribe messages from a Mqtt Broker. @@ -47,7 +48,7 @@ import scala.collection.JavaConversions._ */ private[streaming] -class MQTTInputDStream[T: ClassManifest]( +class MQTTInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index 5329601a6f..8a04060e5b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( +class MapPartitionedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 8290df90a2..0ce364fd46 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag private[streaming] -class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( +class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( parent: DStream[(K, V)], mapValueFunc: V => U ) extends DStream[(K, U)](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index b1682afea3..c0b7491d09 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class MappedDStream[T: ClassManifest, U: ClassManifest] ( +class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index a82862c802..5add20871e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -21,17 +21,19 @@ import java.util.concurrent.ArrayBlockingQueue import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.reflect.ClassTag import akka.actor.{Props, Actor} import akka.pattern.ask -import akka.dispatch.Await -import akka.util.duration._ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.rdd.{RDD, BlockRDD} import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} /** * Abstract class for defining any InputDStream that has to start a receiver on worker @@ -42,7 +44,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ -abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) +abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { // This is an unique identifier that is used to match the network receiver with the @@ -84,7 +86,7 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe * Abstract class of a receiver that can be run on worker nodes to receive external data. See * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation. */ -abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging { +abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { initLogging() @@ -176,8 +178,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log logInfo("Attempting to register with tracker") val ip = System.getProperty("spark.driver.host", "localhost") val port = System.getProperty("spark.driver.port", "7077").toInt - val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) - val tracker = env.actorSystem.actorFor(url) + val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) + val tracker = env.actorSystem.actorSelection(url) val timeout = 5.seconds override def preStart() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala index 15782f5c11..6f9477020a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala @@ -18,9 +18,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.StreamingContext +import scala.reflect.ClassTag private[streaming] -class PluggableInputDStream[T: ClassManifest]( +class PluggableInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 7d9f3521b1..97325f8ea3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD - import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer import org.apache.spark.streaming.{Time, StreamingContext} +import scala.reflect.ClassTag private[streaming] -class QueueInputDStream[T: ClassManifest]( +class QueueInputDStream[T: ClassTag]( @transient ssc: StreamingContext, val queue: Queue[RDD[T]], oneAtATime: Boolean, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index 10ed4ef78d..dea0f26f90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -21,6 +21,8 @@ import org.apache.spark.Logging import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.StreamingContext +import scala.reflect.ClassTag + import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.channels.{ReadableByteChannel, SocketChannel} @@ -35,7 +37,7 @@ import java.util.concurrent.ArrayBlockingQueue * in the format that the system is configured with. */ private[streaming] -class RawInputDStream[T: ClassManifest]( +class RawInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index b88a4db959..db56345ca8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -28,8 +28,11 @@ import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + private[streaming] -class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( +class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( parent: DStream[(K, V)], reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, @@ -49,7 +52,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - // Reduce each batch of data using reduceByKey which will be further reduced by window + // Reduce each batch of data using reduceByKey which will be further reduced by window // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) @@ -170,5 +173,3 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( } } } - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index a95e66d761..e6e0022097 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -21,9 +21,10 @@ import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.streaming.{Duration, DStream, Time} +import scala.reflect.ClassTag private[streaming] -class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( +class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( parent: DStream[(K,V)], createCombiner: V => C, mergeValue: (C, V) => C, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index e2539c7396..2cdd13f205 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -21,11 +21,13 @@ import org.apache.spark.streaming.StreamingContext import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NextIterator +import scala.reflect.ClassTag + import java.io._ import java.net.Socket private[streaming] -class SocketInputDStream[T: ClassManifest]( +class SocketInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, @@ -39,7 +41,7 @@ class SocketInputDStream[T: ClassManifest]( } private[streaming] -class SocketReceiver[T: ClassManifest]( +class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 362a6bf4cc..e0ff3ccba4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -23,8 +23,10 @@ import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Duration, Time, DStream} +import scala.reflect.ClassTag + private[streaming] -class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( +class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 71bcb2b390..aeea060df7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Time} +import scala.reflect.ClassTag private[streaming] -class TransformedDStream[U: ClassManifest] ( +class TransformedDStream[U: ClassTag] ( parents: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[U] ) extends DStream[U](parents.head.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index c696bb70a8..0d84ec84f2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -22,8 +22,11 @@ import org.apache.spark.rdd.RDD import collection.mutable.ArrayBuffer import org.apache.spark.rdd.UnionRDD +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + private[streaming] -class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) +class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) extends DStream[T](parents.head.ssc) { if (parents.length == 0) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 3c57294269..73d959331a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -22,8 +22,10 @@ import org.apache.spark.rdd.UnionRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import scala.reflect.ClassTag + private[streaming] -class WindowedDStream[T: ClassManifest]( +class WindowedDStream[T: ClassTag]( parent: DStream[T], _windowDuration: Duration, _slideDuration: Duration) @@ -52,6 +54,3 @@ class WindowedDStream[T: ClassManifest]( Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) } } - - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index ef0f85a717..fdf5371a89 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -20,6 +20,10 @@ package org.apache.spark.streaming.receivers import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } +import akka.actor.SupervisorStrategy._ + +import scala.concurrent.duration._ +import scala.reflect.ClassTag import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.dstream.NetworkReceiver @@ -28,12 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer -/** A helper with set of defaults for supervisor strategy **/ +/** A helper with set of defaults for supervisor strategy */ object ReceiverSupervisorStrategy { - import akka.util.duration._ - import akka.actor.SupervisorStrategy._ - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException ⇒ Restart @@ -48,10 +49,10 @@ object ReceiverSupervisorStrategy { * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html * * @example {{{ - * class MyActor extends Actor with Receiver{ - * def receive { - * case anything :String ⇒ pushBlock(anything) - * } + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String => pushBlock(anything) + * } * } * //Can be plugged in actorStream as follows * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") @@ -65,11 +66,11 @@ object ReceiverSupervisorStrategy { * */ trait Receiver { self: Actor ⇒ - def pushBlock[T: ClassManifest](iter: Iterator[T]) { + def pushBlock[T: ClassTag](iter: Iterator[T]) { context.parent ! Data(iter) } - def pushBlock[T: ClassManifest](data: T) { + def pushBlock[T: ClassTag](data: T) { context.parent ! Data(data) } @@ -83,8 +84,8 @@ case class Statistics(numberOfMsgs: Int, numberOfHiccups: Int, otherInfo: String) -/** Case class to receive data sent by child actors **/ -private[streaming] case class Data[T: ClassManifest](data: T) +/** Case class to receive data sent by child actors */ +private[streaming] case class Data[T: ClassTag](data: T) /** * Provides Actors as receivers for receiving stream. @@ -95,19 +96,19 @@ private[streaming] case class Data[T: ClassManifest](data: T) * his own Actor to run as receiver for Spark Streaming input source. * * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. - * + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * * Here's a way to start more supervisor/workers as its children. * * @example {{{ - * context.parent ! Props(new Supervisor) + * context.parent ! Props(new Supervisor) * }}} OR {{{ * context.parent ! Props(new Worker,"Worker") * }}} * * */ -private[streaming] class ActorReceiver[T: ClassManifest]( +private[streaming] class ActorReceiver[T: ClassTag]( props: Props, name: String, storageLevel: StorageLevel, @@ -120,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassManifest]( protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), "Supervisor" + streamId) - private class Supervisor extends Actor { + class Supervisor extends Actor { override val supervisorStrategy = receiverSupervisorStrategy val worker = context.actorOf(props, name) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala index 043bb8c8bf..f164d516b0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala @@ -17,7 +17,10 @@ package org.apache.spark.streaming.receivers +import scala.reflect.ClassTag + import akka.actor.Actor +import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging @@ -25,12 +28,12 @@ import org.apache.spark.Logging /** * A receiver to subscribe to ZeroMQ stream. */ -private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, +private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) + bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) extends Actor with Receiver with Logging { - override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) def receive: Receive = { @@ -38,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, case Connecting ⇒ logInfo("connecting ...") case m: ZMQMessage ⇒ - logDebug("Received message for:" + m.firstFrameAsString) + logDebug("Received message for:" + m.frame(0)) //We ignore first frame for processing as it is the topic - val bytes = m.frames.tail.map(_.payload) + val bytes = m.frames.tail pushBlock(bytesToObjects(bytes)) case Closed ⇒ logInfo("received closed ") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala new file mode 100644 index 0000000000..88e4af59b7 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import org.apache.spark.streaming.Time + +/** + * Class having information on completed batches. + */ +case class BatchInfo( + batchTime: Time, + submissionTime: Long, + processingStartTime: Option[Long], + processingEndTime: Option[Long] + ) { + + def schedulingDelay = processingStartTime.map(_ - submissionTime) + + def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption + + def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 2128b7c7a6..7341bfbc99 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -15,13 +15,17 @@ * limitations under the License. */ -package org.apache.spark.streaming +package org.apache.spark.streaming.scheduler -import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.streaming.Time +/** + * Class representing a Spark computation. It may contain multiple Spark jobs. + */ private[streaming] class Job(val time: Time, func: () => _) { - val id = Job.getNewId() + var id: String = _ + def run(): Long = { val startTime = System.currentTimeMillis func() @@ -29,13 +33,9 @@ class Job(val time: Time, func: () => _) { (stopTime - startTime) } - override def toString = "streaming job " + id + " @ " + time -} - -private[streaming] -object Job { - val id = new AtomicLong(0) - - def getNewId() = id.getAndIncrement() -} + def setId(number: Int) { + id = "streaming job " + time + "." + number + } + override def toString = id +}
\ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 4cd8695df5..cf3fc82a2a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -15,31 +15,35 @@ * limitations under the License. */ -package org.apache.spark.streaming +package org.apache.spark.streaming.scheduler -import util.{ManualClock, RecurringTimer, Clock} import org.apache.spark.SparkEnv import org.apache.spark.Logging +import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} +import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} +/** + * This class generates jobs from DStreams as well as drives checkpointing and cleaning + * up DStream metadata. + */ private[streaming] -class Scheduler(ssc: StreamingContext) extends Logging { +class JobGenerator(jobScheduler: JobScheduler) extends Logging { initLogging() - val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt - val jobManager = new JobManager(ssc, concurrentJobs) - val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) - } else { - null - } - + val ssc = jobScheduler.ssc val clockClass = System.getProperty( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => generateJobs(new Time(longTime))) val graph = ssc.graph + lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { + new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) + } else { + null + } + var latestTime: Time = null def start() = synchronized { @@ -48,26 +52,24 @@ class Scheduler(ssc: StreamingContext) extends Logging { } else { startFirstTime() } - logInfo("Scheduler started") + logInfo("JobGenerator started") } def stop() = synchronized { timer.stop() - jobManager.stop() if (checkpointWriter != null) checkpointWriter.stop() ssc.graph.stop() - logInfo("Scheduler stopped") + logInfo("JobGenerator stopped") } private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) - logInfo("Scheduler's timer started at " + startTime) + logInfo("JobGenerator's timer started at " + startTime) } private def restart() { - // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time @@ -93,35 +95,34 @@ class Scheduler(ssc: StreamingContext) extends Logging { val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => - graph.generateJobs(time).foreach(jobManager.runJob) + jobScheduler.runJobs(time, graph.generateJobs(time)) ) // Restart the timer timer.start(restartTime.milliseconds) - logInfo("Scheduler's timer restarted at " + restartTime) + logInfo("JobGenerator's timer restarted at " + restartTime) } /** Generate jobs and perform checkpoint for the given `time`. */ - def generateJobs(time: Time) { + private def generateJobs(time: Time) { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") - graph.generateJobs(time).foreach(jobManager.runJob) + jobScheduler.runJobs(time, graph.generateJobs(time)) latestTime = time doCheckpoint(time) } /** - * Clear old metadata assuming jobs of `time` have finished processing. - * And also perform checkpoint. + * On batch completion, clear old metadata and checkpoint computation. */ - def clearOldMetadata(time: Time) { + private[streaming] def onBatchCompletion(time: Time) { ssc.graph.clearOldMetadata(time) doCheckpoint(time) } /** Perform checkpoint for the give `time`. */ - def doCheckpoint(time: Time) = synchronized { - if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { + private def doCheckpoint(time: Time) = synchronized { + if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala new file mode 100644 index 0000000000..33c5322358 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import org.apache.spark.Logging +import org.apache.spark.SparkEnv +import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} +import scala.collection.mutable.HashSet +import org.apache.spark.streaming._ + +/** + * This class drives the generation of Spark jobs from the DStreams. + */ +private[streaming] +class JobScheduler(val ssc: StreamingContext) extends Logging { + + initLogging() + + val jobSets = new ConcurrentHashMap[Time, JobSet] + val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt + val executor = Executors.newFixedThreadPool(numConcurrentJobs) + val generator = new JobGenerator(this) + val listenerBus = new StreamingListenerBus() + + def clock = generator.clock + + def start() { + generator.start() + } + + def stop() { + generator.stop() + executor.shutdown() + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow() + } + } + + def runJobs(time: Time, jobs: Seq[Job]) { + if (jobs.isEmpty) { + logInfo("No jobs added for time " + time) + } else { + val jobSet = new JobSet(time, jobs) + jobSets.put(time, jobSet) + jobSet.jobs.foreach(job => executor.execute(new JobHandler(job))) + logInfo("Added jobs for time " + time) + } + } + + def getPendingTimes(): Array[Time] = { + jobSets.keySet.toArray(new Array[Time](0)) + } + + private def beforeJobStart(job: Job) { + val jobSet = jobSets.get(job.time) + if (!jobSet.hasStarted) { + listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo())) + } + jobSet.beforeJobStart(job) + logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) + SparkEnv.set(generator.ssc.env) + } + + private def afterJobEnd(job: Job) { + val jobSet = jobSets.get(job.time) + jobSet.afterJobStop(job) + logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) + if (jobSet.hasCompleted) { + jobSets.remove(jobSet.time) + generator.onBatchCompletion(jobSet.time) + logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( + jobSet.totalDelay / 1000.0, jobSet.time.toString, + jobSet.processingDelay / 1000.0 + )) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo())) + } + } + + class JobHandler(job: Job) extends Runnable { + def run() { + beforeJobStart(job) + try { + job.run() + } catch { + case e: Exception => + logError("Running " + job + " failed", e) + } + afterJobEnd(job) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala new file mode 100644 index 0000000000..05233d095b --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import scala.collection.mutable.HashSet +import org.apache.spark.streaming.Time + +private[streaming] +case class JobSet(time: Time, jobs: Seq[Job]) { + + private val incompleteJobs = new HashSet[Job]() + var submissionTime = System.currentTimeMillis() + var processingStartTime = -1L + var processingEndTime = -1L + + jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) } + incompleteJobs ++= jobs + + def beforeJobStart(job: Job) { + if (processingStartTime < 0) processingStartTime = System.currentTimeMillis() + } + + def afterJobStop(job: Job) { + incompleteJobs -= job + if (hasCompleted) processingEndTime = System.currentTimeMillis() + } + + def hasStarted() = (processingStartTime > 0) + + def hasCompleted() = incompleteJobs.isEmpty + + def processingDelay = processingEndTime - processingStartTime + + def totalDelay = { + processingEndTime - time.milliseconds + } + + def toBatchInfo(): BatchInfo = { + new BatchInfo( + time, + submissionTime, + if (processingStartTime >= 0 ) Some(processingStartTime) else None, + if (processingEndTime >= 0 ) Some(processingEndTime) else None + ) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index b97fb7e6e3..abff55d77c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming +package org.apache.spark.streaming.scheduler import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} @@ -25,12 +25,13 @@ import org.apache.spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue +import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import akka.util.duration._ import akka.dispatch._ import org.apache.spark.storage.BlockId +import org.apache.spark.streaming.{Time, StreamingContext} private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala new file mode 100644 index 0000000000..36225e190c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import scala.collection.mutable.Queue +import org.apache.spark.util.Distribution + +/** Base trait for events related to StreamingListener */ +sealed trait StreamingListenerEvent + +case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent + +case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent + + +/** + * A listener interface for receiving information about an ongoing streaming + * computation. + */ +trait StreamingListener { + /** + * Called when processing of a batch has completed + */ + def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } + + /** + * Called when processing of a batch has started + */ + def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } +} + + +/** + * A simple StreamingListener that logs summary statistics across Spark Streaming batches + * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) + */ +class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { + // Queue containing latest completed batches + val batchInfos = new Queue[BatchInfo]() + + override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { + batchInfos.enqueue(batchStarted.batchInfo) + if (batchInfos.size > numBatchInfos) batchInfos.dequeue() + printStats() + } + + def printStats() { + showMillisDistribution("Total delay: ", _.totalDelay) + showMillisDistribution("Processing time: ", _.processingDelay) + } + + def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) { + org.apache.spark.scheduler.StatsReportListener.showMillisDistribution( + heading, extractDistribution(getMetric)) + } + + def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala new file mode 100644 index 0000000000..110a20f282 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import org.apache.spark.Logging +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import java.util.concurrent.LinkedBlockingQueue + +/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ +private[spark] class StreamingListenerBus() extends Logging { + private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener] + + /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than + * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ + private val EVENT_QUEUE_CAPACITY = 10000 + private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY) + private var queueFullErrorMessageLogged = false + + new Thread("StreamingListenerBus") { + setDaemon(true) + override def run() { + while (true) { + val event = eventQueue.take + event match { + case batchStarted: StreamingListenerBatchStarted => + listeners.foreach(_.onBatchStarted(batchStarted)) + case batchCompleted: StreamingListenerBatchCompleted => + listeners.foreach(_.onBatchCompleted(batchCompleted)) + case _ => + } + } + } + }.start() + + def addListener(listener: StreamingListener) { + listeners += listener + } + + def post(event: StreamingListenerEvent) { + val eventAdded = eventQueue.offer(event) + if (!eventAdded && !queueFullErrorMessageLogged) { + logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with the " + + "rate at which tasks are being started by the scheduler.") + queueFullErrorMessageLogged = true + } + } + + /** + * Waits until there are no more events in the queue, or until the specified time has elapsed. + * Used for testing only. Returns true if the queue has emptied and false is the specified time + * elapsed before the queue emptied. + */ + def waitUntilEmpty(timeoutMillis: Int): Boolean = { + val finishTime = System.currentTimeMillis + timeoutMillis + while (!eventQueue.isEmpty()) { + if (System.currentTimeMillis > finishTime) { + return false + } + /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify + * add overhead in the general case. */ + Thread.sleep(10) + } + return true + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 6977957126..4a3993e3e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -25,6 +25,7 @@ import StreamingContext._ import scala.util.Random import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag import java.io.{File, ObjectInputStream, IOException} import java.util.UUID @@ -120,7 +121,7 @@ object MasterFailureTest extends Logging { * Tests stream operation with multiple master failures, and verifies whether the * final set of output values is as expected or not. */ - def testOperation[T: ClassManifest]( + def testOperation[T: ClassTag]( directory: String, batchDuration: Duration, input: Seq[String], @@ -158,7 +159,7 @@ object MasterFailureTest extends Logging { * and batch duration. Returns the streaming context and the directory to which * files should be written for testing. */ - private def setupStreams[T: ClassManifest]( + private def setupStreams[T: ClassTag]( directory: String, batchDuration: Duration, operation: DStream[String] => DStream[T] @@ -192,7 +193,7 @@ object MasterFailureTest extends Logging { * Repeatedly starts and kills the streaming context until timed out or * the last expected output is generated. Finally, return */ - private def runStreams[T: ClassManifest]( + private def runStreams[T: ClassTag]( ssc_ : StreamingContext, lastExpectedOutput: T, maxTimeToRun: Long @@ -274,7 +275,7 @@ object MasterFailureTest extends Logging { * duplicate batch outputs of values from the `output`. As a result, the * expected output should not have consecutive batches with the same values as output. */ - private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) { + private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) { // Verify whether expected outputs do not consecutive batches with same output for (i <- 0 until expectedOutput.size - 1) { assert(expectedOutput(i) != expectedOutput(i+1), @@ -305,7 +306,7 @@ object MasterFailureTest extends Logging { * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. */ private[streaming] -class TestOutputStream[T: ClassManifest]( +class TestOutputStream[T: ClassTag]( parent: DStream[T], val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] ) extends ForEachDStream[T]( @@ -380,24 +381,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 - var done = false - while (!done && tries < maxTries) { - tries += 1 - try { - // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) - fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) - fs.rename(tempHadoopFile, hadoopFile) - done = true - } catch { - case ioe: IOException => { - fs = testDir.getFileSystem(new Configuration()) - logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) - } - } + var done = false + while (!done && tries < maxTries) { + tries += 1 + try { + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) + done = true + } catch { + case ioe: IOException => { + fs = testDir.getFileSystem(new Configuration()) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + } + } } - if (!done) + if (!done) logError("Could not generate file " + hadoopFile) - else + else logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) Thread.sleep(interval) localFile.delete() @@ -411,5 +412,3 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } } } - - diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index ad4a8b9535..daeb99f5b7 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -21,28 +21,31 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; + import kafka.serializer.StringDecoder; + import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import scala.Tuple2; +import twitter4j.Status; + import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.SparkFlumeEvent; import org.apache.spark.streaming.JavaTestUtils; import org.apache.spark.streaming.JavaCheckpointTestUtils; -import org.apache.spark.streaming.InputStreamsSuite; import java.io.*; import java.util.*; @@ -51,7 +54,6 @@ import akka.actor.Props; import akka.zeromq.Subscribe; - // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. @@ -86,8 +88,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(3L), Arrays.asList(1L)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream count = stream.count(); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Long> count = stream.count(); JavaTestUtils.attachTestOutputStream(count); List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); @@ -103,8 +105,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(5,5), Arrays.asList(9,4)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function<String, Integer>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); @@ -129,8 +131,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,4,5,6), Arrays.asList(7,8,9)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(2000)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> windowed = stream.window(new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -153,8 +155,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), Arrays.asList(13,14,15,16,17,18)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4); @@ -171,8 +173,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("giants"), Arrays.asList("yankees")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(new Function<String, Boolean>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return s.contains("a"); @@ -227,8 +229,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(Arrays.asList("giants", "dodgers")), Arrays.asList(Arrays.asList("yankees", "red socks"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream glommed = stream.glom(); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<List<String>> glommed = stream.glom(); JavaTestUtils.attachTestOutputStream(glommed); List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -245,8 +247,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("GIANTSDODGERS"), Arrays.asList("YANKEESRED SOCKS")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { @Override public Iterable<String> call(Iterator<String> in) { String out = ""; @@ -288,8 +290,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(15), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reduced = stream.reduce(new IntegerSum()); JavaTestUtils.attachTestOutputStream(reduced); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -309,8 +311,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(39), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -695,8 +697,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), Arrays.asList("a","t","h","l","e","t","i","c","s")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(x.split("(?!^)")); @@ -742,8 +744,8 @@ public class JavaAPISuite implements Serializable { new Tuple2<Integer, String>(9, "c"), new Tuple2<Integer, String>(9, "s"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { List<Tuple2<Integer, String>> out = Lists.newArrayList(); @@ -776,10 +778,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(2,2,5,5), Arrays.asList(3,3,6,6)); - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); - JavaDStream unioned = stream1.union(stream2); + JavaDStream<Integer> unioned = stream1.union(stream2); JavaTestUtils.attachTestOutputStream(unioned); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -790,7 +792,7 @@ public class JavaAPISuite implements Serializable { * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. */ - public static <T extends Comparable> void assertOrderInvariantEquals( + public static <T extends Comparable<T>> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { for (List<T> list: expected) { Collections.sort(list); @@ -813,11 +815,11 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, Integer>("giants", 6)), Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = stream.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2 call(String in) throws Exception { + public Tuple2<String, Integer> call(String in) throws Exception { return new Tuple2<String, Integer>(in, in.length()); } }); @@ -1540,8 +1542,8 @@ public class JavaAPISuite implements Serializable { File tempDir = Files.createTempDir(); ssc.checkpoint(tempDir.getAbsolutePath()); - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function<String, Integer>() { + JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); @@ -1616,7 +1618,7 @@ public class JavaAPISuite implements Serializable { @Test public void testSocketTextStream() { - JavaDStream test = ssc.socketTextStream("localhost", 12345); + JavaDStream<String> test = ssc.socketTextStream("localhost", 12345); } @Test @@ -1636,7 +1638,7 @@ public class JavaAPISuite implements Serializable { } } - JavaDStream test = ssc.socketStream( + JavaDStream<String> test = ssc.socketStream( "localhost", 12345, new Converter(), @@ -1645,39 +1647,39 @@ public class JavaAPISuite implements Serializable { @Test public void testTextFileStream() { - JavaDStream test = ssc.textFileStream("/tmp/foo"); + JavaDStream<String> test = ssc.textFileStream("/tmp/foo"); } @Test public void testRawSocketStream() { - JavaDStream test = ssc.rawSocketStream("localhost", 12345); + JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } @Test public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); + JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); } @Test public void testFileStream() { JavaPairDStream<String, String> foo = - ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); } @Test public void testTwitterStream() { String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); + JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); } @Test public void testActorStream() { - JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); + JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); } @Test public void testZeroMQStream() { - JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { + JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { @Override public Iterable<String> call(byte[][] b) throws Exception { return null; diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 5e384eeee4..42ab9590d6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -17,7 +17,9 @@ package org.apache.spark.streaming -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag + import java.util.{List => JList} import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} import org.apache.spark.streaming._ @@ -31,15 +33,15 @@ trait JavaTestBase extends TestSuiteBase { /** * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context. * The stream will be derived from the supplied lists of Java objects. - **/ + */ def attachTestInputStream[T]( ssc: JavaStreamingContext, data: JList[JList[T]], numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) @@ -52,8 +54,8 @@ trait JavaTestBase extends TestSuiteBase { def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( dstream: JavaDStreamLike[T, This, R]) = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) dstream.dstream.ssc.registerOutputStream(ostream) } @@ -67,8 +69,8 @@ trait JavaTestBase extends TestSuiteBase { */ def runStreams[V]( ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[V]]() res.map(entry => out.append(new ArrayList[V](entry))) @@ -85,8 +87,8 @@ trait JavaTestBase extends TestSuiteBase { */ def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[JList[V]]]() res.map{entry => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 259ef1608c..b35ca00b53 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -26,18 +26,6 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { - override def framework() = "BasicOperationsSuite" - - before { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - } - - after { - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") - } - test("map") { val input = Seq(1 to 4, 5 to 8, 9 to 12) testOperation( diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index e51754977c..4e25c9566c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,50 +17,42 @@ package org.apache.spark.streaming -import dstream.FileInputDStream -import org.apache.spark.streaming.StreamingContext._ import java.io.File -import runtime.RichInt -import org.scalatest.BeforeAndAfter + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import org.apache.commons.io.FileUtils -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import util.{Clock, ManualClock} import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration - - +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.FileInputDStream +import org.apache.spark.streaming.util.ManualClock /** * This test suites tests the checkpointing functionality of DStreams - * the checkpointing of a DStream's RDDs as well as the checkpointing of * the whole DStream graph. */ -class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { +class CheckpointSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + var ssc: StreamingContext = null + + override def batchDuration = Milliseconds(500) - before { + override def actuallyWait = true // to allow checkpoints to be written + + override def beforeFunction() { + super.beforeFunction() FileUtils.deleteDirectory(new File(checkpointDir)) } - after { + override def afterFunction() { + super.afterFunction() if (ssc != null) ssc.stop() - //FileUtils.deleteDirectory(new File(checkpointDir)) - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") + FileUtils.deleteDirectory(new File(checkpointDir)) } - var ssc: StreamingContext = null - - override def framework = "CheckpointSuite" - - override def batchDuration = Milliseconds(500) - - override def actuallyWait = true - test("basic rdd checkpoints + dstream graph checkpoint recovery") { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") @@ -76,13 +68,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Setup the streams val input = (1 to 10).map(_ => Seq("a")).toSeq val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) } st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) + .updateStateByKey(updateFunc) .checkpoint(stateStreamCheckpointInterval) - .map(t => (t._1, t._2.self)) + .map(t => (t._1, t._2)) } var ssc = setupStreams(input, operation) var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head @@ -187,13 +179,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val input = (1 to 10).map(_ => Seq("a")).toSeq val output = (1 to 10).map(x => Seq(("a", x))).toSeq val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) } st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) + .updateStateByKey(updateFunc) .checkpoint(batchDuration * 2) - .map(t => (t._1, t._2.self)) + .map(t => (t._1, t._2)) } testCheckpointedOperation(input, operation, output, 7) } @@ -320,7 +312,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { * NOTE: This takes into consideration that the last batch processed before * master failure will be re-processed after restart/recovery. */ - def testCheckpointedOperation[U: ClassManifest, V: ClassManifest]( + def testCheckpointedOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], @@ -364,7 +356,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { * Advances the manual clock on the streaming scheduler by given number of batches. * It also waits for the expected amount of time for each batch. */ - def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { + def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.time) for (i <- 1 to numBatches.toInt) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index 6337c5359c..da9b04de1a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -32,17 +32,22 @@ import collection.mutable.ArrayBuffer * This testsuite tests master failures at random times while the stream is running using * the real clock. */ -class FailureSuite extends FunSuite with BeforeAndAfter with Logging { +class FailureSuite extends TestSuiteBase with Logging { var directory = "FailureSuite" val numBatches = 30 - val batchDuration = Milliseconds(1000) - before { + override def batchDuration = Milliseconds(1000) + + override def useManualClock = false + + override def beforeFunction() { + super.beforeFunction() FileUtils.deleteDirectory(new File(directory)) } - after { + override def afterFunction() { + super.afterFunction() FileUtils.deleteDirectory(new File(directory)) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 7dc82decef..62a9f120b4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -50,18 +50,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val testPort = 9999 - override def checkpointDir = "checkpoint" - - before { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - } - - after { - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") - } - test("socket input stream") { // Start the server val testServer = new TestServer() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala new file mode 100644 index 0000000000..16410a21e3 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.streaming.scheduler._ +import scala.collection.mutable.ArrayBuffer +import org.scalatest.matchers.ShouldMatchers + +class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{ + + val input = (1 to 4).map(Seq(_)).toSeq + val operation = (d: DStream[Int]) => d.map(x => x) + + // To make sure that the processing start and end times in collected + // information are different for successive batches + override def batchDuration = Milliseconds(100) + override def actuallyWait = true + + test("basic BatchInfo generation") { + val ssc = setupStreams(input, operation) + val collector = new BatchInfoCollector + ssc.addStreamingListener(collector) + runStreams(ssc, input.size, input.size) + val batchInfos = collector.batchInfos + batchInfos should have size 4 + + batchInfos.foreach(info => { + info.schedulingDelay should not be None + info.processingDelay should not be None + info.totalDelay should not be None + info.schedulingDelay.get should be >= 0L + info.processingDelay.get should be >= 0L + info.totalDelay.get should be >= 0L + }) + + isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) + isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true) + isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true) + } + + /** Check if a sequence of numbers is in increasing order */ + def isInIncreasingOrder(seq: Seq[Long]): Boolean = { + for(i <- 1 until seq.size) { + if (seq(i - 1) > seq(i)) return false + } + true + } + + /** Listener that collects information on processed batches */ + class BatchInfoCollector extends StreamingListener { + val batchInfos = new ArrayBuffer[BatchInfo] + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { + batchInfos += batchCompleted.batchInfo + } + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 8c8c359e6e..e969e91d13 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -20,8 +20,9 @@ package org.apache.spark.streaming import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} import org.apache.spark.streaming.util.ManualClock -import collection.mutable.ArrayBuffer -import collection.mutable.SynchronizedBuffer +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.SynchronizedBuffer +import scala.reflect.ClassTag import java.io.{ObjectInputStream, IOException} @@ -35,7 +36,7 @@ import org.apache.spark.rdd.RDD * replayable, reliable message queue like Kafka. It requires a sequence as input, and * returns the i_th element at the i_th batch unde manual clock. */ -class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) +class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) extends InputDStream[T](ssc_) { def start() {} @@ -63,7 +64,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ * * The buffer contains a sequence of RDD's, each containing a sequence of items */ -class TestOutputStream[T: ClassManifest](parent: DStream[T], +class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() @@ -85,7 +86,7 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each * containing a sequence of items. */ -class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], +class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) @@ -109,7 +110,7 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Name of the framework for Spark context - def framework = "TestSuiteBase" + def framework = this.getClass.getSimpleName // Master for Spark context def master = "local[2]" @@ -126,14 +127,44 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Maximum time to wait before the test times out def maxWaitTimeMillis = 10000 + // Whether to use manual clock or not + def useManualClock = true + // Whether to actually wait in real time before changing manual clock def actuallyWait = false + // Default before function for any streaming test suite. Override this + // if you want to add your stuff to "before" (i.e., don't call before { } ) + def beforeFunction() { + if (useManualClock) { + System.setProperty( + "spark.streaming.clock", + "org.apache.spark.streaming.util.ManualClock" + ) + } else { + System.clearProperty("spark.streaming.clock") + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + } + + // Default after function for any streaming test suite. Override this + // if you want to add your stuff to "after" (i.e., don't call after { } ) + def afterFunction() { + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + } + + before(beforeFunction) + after(afterFunction) + /** * Set up required DStreams to test the DStream operation using the two sequences * of input collections. */ - def setupStreams[U: ClassManifest, V: ClassManifest]( + def setupStreams[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], numPartitions: Int = numInputPartitions @@ -159,7 +190,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Set up required DStreams to test the binary operation using the sequence * of input collections. */ - def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W] @@ -190,7 +221,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * * Returns a sequence of items for each RDD. */ - def runStreams[V: ClassManifest]( + def runStreams[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int @@ -207,7 +238,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * representing one partition. */ - def runStreamsWithPartitions[V: ClassManifest]( + def runStreamsWithPartitions[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int @@ -263,7 +294,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * is same as the expected output values, by comparing the output * collections either as lists (order matters) or sets (order does not matter) */ - def verifyOutput[V: ClassManifest]( + def verifyOutput[V: ClassTag]( output: Seq[Seq[V]], expectedOutput: Seq[Seq[V]], useSet: Boolean @@ -293,7 +324,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Test unary DStream operation with a list of inputs, with number of * batches to run same as the number of expected output values */ - def testOperation[U: ClassManifest, V: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], @@ -311,7 +342,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * @param useSet Compare the output values with the expected output values * as sets (order matters) or as lists (order does not matter) */ - def testOperation[U: ClassManifest, V: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], @@ -328,7 +359,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Test binary DStream operation with two lists of inputs, with number of * batches to run same as the number of expected output values */ - def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W], @@ -348,7 +379,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * @param useSet Compare the output values with the expected output values * as sets (order matters) or as lists (order does not matter) */ - def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W], diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index f50e05c0d8..6b4aaefcdf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -22,19 +22,9 @@ import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + override def maxWaitTimeMillis = 20000 // large window tests can sometimes take longer - override def framework = "WindowOperationsSuite" - - override def maxWaitTimeMillis = 20000 - - override def batchDuration = Seconds(1) - - after { - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") - } + override def batchDuration = Seconds(1) // making sure its visible in this class val largerSlideInput = Seq( Seq(("a", 1)), |