diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-18 17:51:14 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-18 17:51:14 -0800 |
commit | e93b391d75a1c2e17ad93caff39e8fc34d640935 (patch) | |
tree | 925eb68b33ddbabc4482aae4b6ead51668a22653 /streaming/src/main | |
parent | b80ec05635132f96772545803a10a1bbfa1250e7 (diff) | |
parent | 5ea187277c2b11e5db813f7ff9f214d7b85190f6 (diff) | |
download | spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.gz spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.bz2 spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.zip |
Merge branch 'apache-master' into scheduler-update
Conflicts:
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
Diffstat (limited to 'streaming/src/main')
37 files changed, 352 insertions, 315 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 8001c49a76..a78d3965ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -26,13 +26,14 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.MetadataCleaner import scala.collection.mutable.HashMap +import scala.reflect.ClassTag import java.io.{ObjectInputStream, IOException, ObjectOutputStream} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] * for more details on RDDs). DStreams can either be created from live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each @@ -51,7 +52,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} * - A function that is used to generate an RDD after each time interval */ -abstract class DStream[T: ClassManifest] ( +abstract class DStream[T: ClassTag] ( @transient protected[streaming] var ssc: StreamingContext ) extends Serializable with Logging { @@ -77,7 +78,7 @@ abstract class DStream[T: ClassManifest] ( // RDDs generated, marked as protected[streaming] so that testsuites can access it @transient protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () - + // Time zero for the DStream protected[streaming] var zeroTime: Time = null @@ -269,16 +270,16 @@ abstract class DStream[T: ClassManifest] ( /** * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal * method that should not be called directly. - */ + */ protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { // If this DStream was not initialized (i.e., zeroTime not set), then do it // If RDD was already generated, then retrieve it from HashMap generatedRDDs.get(time) match { - - // If an RDD was already generated and is being reused, then + + // If an RDD was already generated and is being reused, then // probably all RDDs in this DStream will be reused and hence should be cached case Some(oldRDD) => Some(oldRDD) - + // if RDD was not generated, and if the time is valid // (based on sliding time of this DStream), then generate the RDD case None => { @@ -295,7 +296,7 @@ abstract class DStream[T: ClassManifest] ( } generatedRDDs.put(time, newRDD) Some(newRDD) - case None => + case None => None } } else { @@ -339,7 +340,7 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.clearOldMetadata(time)) } - /* Adds metadata to the Stream while it is running. + /* Adds metadata to the Stream while it is running. * This methd should be overwritten by sublcasses of InputDStream. */ protected[streaming] def addMetadata(metadata: Any) { @@ -411,7 +412,7 @@ abstract class DStream[T: ClassManifest] ( // ======================================================================= /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { + def map[U: ClassTag](mapFunc: T => U): DStream[U] = { new MappedDStream(this, context.sparkContext.clean(mapFunc)) } @@ -419,7 +420,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { + def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } @@ -445,7 +446,7 @@ abstract class DStream[T: ClassManifest] ( * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition * of the RDD. */ - def mapPartitions[U: ClassManifest]( + def mapPartitions[U: ClassTag]( mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false ): DStream[U] = { @@ -492,16 +493,14 @@ abstract class DStream[T: ClassManifest] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) - newStream + ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) } /** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) } @@ -509,7 +508,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) val cleanedF = context.sparkContext.clean(transformFunc) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { @@ -523,7 +522,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[U: ClassManifest, V: ClassManifest]( + def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { val cleanedF = ssc.sparkContext.clean(transformFunc) @@ -534,7 +533,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[U: ClassManifest, V: ClassManifest]( + def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { val cleanedF = ssc.sparkContext.clean(transformFunc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index 58a0da2870..3fd5d52403 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -20,13 +20,16 @@ package org.apache.spark.streaming import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.conf.Configuration + import collection.mutable.HashMap import org.apache.spark.Logging +import scala.collection.mutable.HashMap +import scala.reflect.ClassTag private[streaming] -class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) +class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() @@ -107,4 +110,3 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]" } } - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 8c12fd11ef..80af96c060 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -18,16 +18,15 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} -import org.apache.spark.streaming.dstream.{ShuffledDStream} -import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} +import org.apache.spark.streaming.dstream._ import org.apache.spark.{Partitioner, HashPartitioner} import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{Manifests, RDD, PairRDDFunctions} +import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions} import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer +import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} @@ -35,7 +34,7 @@ import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration -class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) +class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { private[streaming] def ssc = self.ssc @@ -105,7 +104,7 @@ extends Serializable { * combineByKey for RDDs. Please refer to combineByKey in * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ - def combineByKey[C: ClassManifest]( + def combineByKey[C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, @@ -205,7 +204,7 @@ extends Serializable { * DStream's batching interval */ def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, + reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration ): DStream[(K, V)] = { @@ -336,7 +335,7 @@ extends Serializable { * corresponding state key-value pair will be eliminated. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) @@ -351,7 +350,7 @@ extends Serializable { * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int ): DStream[(K, S)] = { @@ -367,7 +366,7 @@ extends Serializable { * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = { @@ -390,7 +389,7 @@ extends Serializable { * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean @@ -402,7 +401,7 @@ extends Serializable { * Return a new DStream by applying a map function to the value of each key-value pairs in * 'this' DStream without changing the key. */ - def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { + def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { new MapValuedDStream[K, V, U](self, mapValuesFunc) } @@ -410,7 +409,7 @@ extends Serializable { * Return a new DStream by applying a flatmap function to the value of each key-value pairs in * 'this' DStream without changing the key. */ - def flatMapValues[U: ClassManifest]( + def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) @@ -421,7 +420,7 @@ extends Serializable { * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ - def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner()) } @@ -429,7 +428,7 @@ extends Serializable { * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -437,7 +436,7 @@ extends Serializable { * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. */ - def cogroup[W: ClassManifest]( + def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Seq[V], Seq[W]))] = { @@ -451,7 +450,7 @@ extends Serializable { * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ - def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } @@ -459,7 +458,7 @@ extends Serializable { * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner(numPartitions)) } @@ -467,7 +466,7 @@ extends Serializable { * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def join[W: ClassManifest]( + def join[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, W))] = { @@ -482,7 +481,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ - def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { leftOuterJoin[W](other, defaultPartitioner()) } @@ -491,7 +490,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` * partitions. */ - def leftOuterJoin[W: ClassManifest]( + def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int ): DStream[(K, (V, Option[W]))] = { @@ -503,7 +502,7 @@ extends Serializable { * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ - def leftOuterJoin[W: ClassManifest]( + def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, Option[W]))] = { @@ -518,7 +517,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ - def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { rightOuterJoin[W](other, defaultPartitioner()) } @@ -527,7 +526,7 @@ extends Serializable { * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` * partitions. */ - def rightOuterJoin[W: ClassManifest]( + def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int ): DStream[(K, (Option[V], W))] = { @@ -539,7 +538,7 @@ extends Serializable { * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ - def rightOuterJoin[W: ClassManifest]( + def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Option[V], W))] = { @@ -556,8 +555,8 @@ extends Serializable { def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassManifest[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + )(implicit fm: ClassTag[F]) { + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -586,8 +585,8 @@ extends Serializable { def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassManifest[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + )(implicit fm: ClassTag[F]) { + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -609,9 +608,7 @@ extends Serializable { self.foreach(saveFunc) } - private def getKeyClass() = implicitly[ClassManifest[K]].erasure + private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass - private def getValueClass() = implicitly[ClassManifest[V]].erasure + private def getValueClass() = implicitly[ClassTag[V]].runtimeClass } - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 83f1cadb48..fedbbde80c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -34,6 +34,7 @@ import org.apache.spark.streaming.receivers.ActorReceiver import scala.collection.mutable.Queue import scala.collection.Map +import scala.reflect.ClassTag import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger @@ -47,7 +48,7 @@ import org.apache.hadoop.fs.Path import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.streaming.scheduler._ - +import akka.util.ByteString /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -193,7 +194,7 @@ class StreamingContext private ( * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of NetworkReceiver */ - def networkStream[T: ClassManifest]( + def networkStream[T: ClassTag]( receiver: NetworkReceiver[T]): DStream[T] = { val inputStream = new PluggableInputDStream[T](this, receiver) @@ -213,7 +214,7 @@ class StreamingContext private ( * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. */ - def actorStream[T: ClassManifest]( + def actorStream[T: ClassTag]( props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, @@ -233,14 +234,14 @@ class StreamingContext private ( * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ - def zeroMQStream[T: ClassManifest]( + def zeroMQStream[T: ClassTag]( publisherUrl:String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy ): DStream[T] = { - actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } @@ -277,8 +278,8 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects */ def kafkaStream[ - K: ClassManifest, - V: ClassManifest, + K: ClassTag, + V: ClassTag, U <: kafka.serializer.Decoder[_]: Manifest, T <: kafka.serializer.Decoder[_]: Manifest]( kafkaParams: Map[String, String], @@ -317,7 +318,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def socketStream[T: ClassManifest]( + def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], @@ -339,7 +340,7 @@ class StreamingContext private ( port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) + val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel) registerInputStream(inputStream) inputStream } @@ -354,7 +355,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects in the received blocks */ - def rawSocketStream[T: ClassManifest]( + def rawSocketStream[T: ClassTag]( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 @@ -374,9 +375,9 @@ class StreamingContext private ( * @tparam F Input format for reading HDFS file */ def fileStream[ - K: ClassManifest, - V: ClassManifest, - F <: NewInputFormat[K, V]: ClassManifest + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag ] (directory: String): DStream[(K, V)] = { val inputStream = new FileInputDStream[K, V, F](this, directory) registerInputStream(inputStream) @@ -394,9 +395,9 @@ class StreamingContext private ( * @tparam F Input format for reading HDFS file */ def fileStream[ - K: ClassManifest, - V: ClassManifest, - F <: NewInputFormat[K, V]: ClassManifest + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) registerInputStream(inputStream) @@ -438,7 +439,7 @@ class StreamingContext private ( * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @tparam T Type of objects in the RDD */ - def queueStream[T: ClassManifest]( + def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean = true ): DStream[T] = { @@ -454,7 +455,7 @@ class StreamingContext private ( * Set as null if no RDD should be returned when empty * @tparam T Type of objects in the RDD */ - def queueStream[T: ClassManifest]( + def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] @@ -482,7 +483,7 @@ class StreamingContext private ( /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ - def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { + def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = { new UnionDStream[T](streams.toArray) } @@ -490,7 +491,7 @@ class StreamingContext private ( * Create a new DStream in which each RDD is generated by applying a function on RDDs of * the DStreams. */ - def transform[T: ClassManifest]( + def transform[T: ClassTag]( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] ): DStream[T] = { @@ -569,7 +570,7 @@ class StreamingContext private ( object StreamingContext { - implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { + implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index 1a2aeaa879..d29033df32 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -23,9 +23,11 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag + /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] * for more details on RDDs). DStreams can either be created from live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each @@ -41,7 +43,7 @@ import org.apache.spark.rdd.RDD * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval */ -class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) +class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]) extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] { override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) @@ -103,6 +105,6 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM } object JavaDStream { - implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = + implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] = new JavaDStream[T](dstream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 09189eadd8..64f38ce1c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -21,6 +21,7 @@ import java.util.{List => JList} import java.lang.{Long => JLong} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.spark.streaming._ import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} @@ -32,7 +33,7 @@ import JavaDStream._ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] extends Serializable { - implicit val classManifest: ClassManifest[T] + implicit val classTag: ClassTag[T] def dstream: DStream[T] @@ -136,7 +137,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** Return a new DStream by applying a function to all elements of this DStream. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } @@ -157,7 +158,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } @@ -260,8 +261,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] def scalaTransform (in: RDD[T]): RDD[U] = transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) @@ -272,8 +273,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] def scalaTransform (in: RDD[T], time: Time): RDD[U] = transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) @@ -285,10 +286,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T */ def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + implicit val cmk: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] def scalaTransform (in: RDD[T]): RDD[(K2, V2)] = transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) @@ -300,10 +301,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T */ def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + implicit val cmk: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] = transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) @@ -317,10 +318,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaDStream[U], transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmu: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - implicit val cmv: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cmu: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cmv: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _)) @@ -334,12 +335,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaDStream[U], transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]] ): JavaPairDStream[K2, V2] = { - implicit val cmu: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - implicit val cmk2: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv2: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + implicit val cmu: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _)) @@ -353,12 +354,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaPairDStream[K2, V2], transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmk2: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv2: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] - implicit val cmw: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmw: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _)) @@ -372,14 +373,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaPairDStream[K2, V2], transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]] ): JavaPairDStream[K3, V3] = { - implicit val cmk2: ClassManifest[K2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] - implicit val cmv2: ClassManifest[V2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] - implicit val cmk3: ClassManifest[K3] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]] - implicit val cmv3: ClassManifest[V3] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]] + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmk3: ClassTag[K3] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]] + implicit val cmv3: ClassTag[V3] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]] def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c6cd635afa..dfd6e27c3e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -21,6 +21,7 @@ import java.util.{List => JList} import java.lang.{Long => JLong} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ @@ -36,8 +37,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( - implicit val kManifest: ClassManifest[K], - implicit val vManifest: ClassManifest[V]) + implicit val kManifest: ClassTag[K], + implicit val vManifest: ClassTag[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) @@ -162,8 +163,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner ): JavaPairDStream[K, C] = { - implicit val cm: ClassManifest[C] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + implicit val cm: ClassTag[C] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } @@ -428,8 +429,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( */ def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) : JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) } @@ -446,8 +447,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } @@ -464,8 +465,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } @@ -475,8 +476,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * 'this' DStream without changing the key. */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] dstream.mapValues(f) } @@ -487,8 +488,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] dstream.flatMapValues(fn) } @@ -498,8 +499,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * of partitions. */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -511,8 +512,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream, numPartitions) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -525,8 +526,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream, partitioner) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -536,8 +537,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream) } @@ -546,8 +547,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream, numPartitions) } @@ -559,8 +560,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream, partitioner) } @@ -570,8 +571,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.leftOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -585,8 +586,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -599,8 +600,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -611,8 +612,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.rightOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -626,8 +627,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -641,8 +642,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -722,24 +723,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( new JavaDStream[(K, V)](dstream) } - override val classManifest: ClassManifest[(K, V)] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + override val classTag: ClassTag[(K, V)] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream { - implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = { + implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] new JavaPairDStream[K, V](dstream.dstream) } - def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]) + def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long]) : JavaPairDStream[K, JLong] = { StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 7f9dab0ef9..80dcf87491 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -22,12 +22,15 @@ import java.io.InputStream import java.util.{Map => JMap, List => JList} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import twitter4j.Status import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe +import akka.util.ByteString + import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD @@ -141,10 +144,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { groupId: String, topics: JMap[String, JInt]) : JavaPairDStream[String, String] = { - implicit val cmt: ClassManifest[String] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), StorageLevel.MEMORY_ONLY_SER_2) + } /** @@ -162,8 +166,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { topics: JMap[String, JInt], storageLevel: StorageLevel) : JavaPairDStream[String, String] = { - implicit val cmt: ClassManifest[String] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } @@ -189,10 +193,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { topics: JMap[String, JInt], storageLevel: StorageLevel) : JavaPairDStream[K, V] = { - implicit val keyCmt: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val valueCmt: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] @@ -245,8 +249,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { storageLevel: StorageLevel) : JavaDStream[T] = { def fn = (x: InputStream) => converter.apply(x).toIterator - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.socketStream(hostname, port, fn, storageLevel) } @@ -274,8 +278,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { hostname: String, port: Int, storageLevel: StorageLevel): JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel)) } @@ -289,8 +293,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam T Type of the objects in the received blocks */ def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port)) } @@ -304,12 +308,12 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam F Input format for reading HDFS file */ def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] - implicit val cmf: ClassManifest[F] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + implicit val cmf: ClassTag[F] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]] ssc.fileStream[K, V, F](directory) } @@ -404,7 +408,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def twitterStream(): JavaDStream[Status] = { ssc.twitterStream() } - + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor @@ -422,8 +426,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) } @@ -443,8 +447,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { name: String, storageLevel: StorageLevel ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name, storageLevel) } @@ -462,8 +466,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { props: Props, name: String ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name) } @@ -480,12 +484,12 @@ class JavaStreamingContext(val ssc: StreamingContext) { def zeroMQStream[T]( publisherUrl:String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) } @@ -505,9 +509,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], storageLevel: StorageLevel ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) } @@ -525,9 +529,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] ): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator ssc.zeroMQStream[T](publisherUrl, subscribe, fn) } @@ -547,8 +551,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam T Type of objects in the RDD */ def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] sQueue.enqueue(queue.map(_.rdd).toSeq: _*) ssc.queueStream(sQueue) @@ -564,8 +568,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam T Type of objects in the RDD */ def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] sQueue.enqueue(queue.map(_.rdd).toSeq: _*) ssc.queueStream(sQueue, oneAtATime) @@ -585,8 +589,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean, defaultRDD: JavaRDD[T]): JavaDStream[T] = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] sQueue.enqueue(queue.map(_.rdd).toSeq: _*) ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) @@ -597,7 +601,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { */ def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = { val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) - implicit val cm: ClassManifest[T] = first.classManifest + implicit val cm: ClassTag[T] = first.classTag ssc.union(dstreams)(cm) } @@ -609,9 +613,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { rest: JList[JavaPairDStream[K, V]] ): JavaPairDStream[K, V] = { val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) - implicit val cm: ClassManifest[(K, V)] = first.classManifest - implicit val kcm: ClassManifest[K] = first.kManifest - implicit val vcm: ClassManifest[V] = first.vManifest + implicit val cm: ClassTag[(K, V)] = first.classTag + implicit val kcm: ClassTag[K] = first.kManifest + implicit val vcm: ClassTag[V] = first.vManifest new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) } @@ -628,8 +632,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { dstreams: JList[JavaDStream[_]], transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]] ): JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val scalaDStreams = dstreams.map(_.dstream).toSeq val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList @@ -651,10 +655,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { dstreams: JList[JavaDStream[_]], transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] ): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val scalaDStreams = dstreams.map(_.dstream).toSeq val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index a9a05c9981..f396c34758 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -19,11 +19,12 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, StreamingContext} +import scala.reflect.ClassTag /** * An input stream that always returns the same RDD on each timestep. Useful for testing. */ -class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) +class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T]) extends InputDStream[T](ssc_) { override def start() {} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index fea0573b77..39e25239bf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,14 +26,16 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import scala.collection.mutable.{HashSet, HashMap} +import scala.reflect.ClassTag + import java.io.{ObjectInputStream, IOException} private[streaming] -class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( +class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @transient ssc_ : StreamingContext, directory: String, filter: Path => Boolean = FileInputDStream.defaultFilter, - newFilesOnly: Boolean = true) + newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData @@ -54,7 +56,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly) } - + override def stop() { } /** @@ -100,7 +102,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K latestModTimeFiles += path.toString logDebug("Accepted " + path) return true - } + } } } logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime) @@ -195,5 +197,3 @@ private[streaming] object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index 91ee2c1a36..db2e0a4cee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class FilteredDStream[T: ClassManifest]( +class FilteredDStream[T: ClassTag]( parent: DStream[T], filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index ca7d7ca49e..244dc3ee4f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag private[streaming] -class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( +class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( parent: DStream[(K, V)], flatMapValueFunc: V => TraversableOnce[U] ) extends DStream[(K, U)](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index b37966f9a7..336c4b7a92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( +class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => Traversable[U] ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala index a0189eca04..60d79175f1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala @@ -22,6 +22,7 @@ import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.flume.source.avro.AvroSourceProtocol import org.apache.flume.source.avro.AvroFlumeEvent @@ -34,7 +35,7 @@ import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel private[streaming] -class FlumeInputDStream[T: ClassManifest]( +class FlumeInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 0072248b7d..364abcde68 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.streaming.scheduler.Job +import scala.reflect.ClassTag private[streaming] -class ForEachDStream[T: ClassManifest] ( +class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 4294b07d91..23136f44fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class GlommedDStream[T: ClassManifest](parent: DStream[T]) +class GlommedDStream[T: ClassTag](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { override def dependencies = List(parent) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 674b27118c..f01e67fe13 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -19,6 +19,8 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} +import scala.reflect.ClassTag + /** * This is the abstract base class for all input streams. This class provides to methods * start() and stop() which called by the scheduler to start and stop receiving data/ @@ -30,7 +32,7 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} * that requires running a receiver on the worker nodes, use NetworkInputDStream * as the parent class. */ -abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) +abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { var lastValidTime: Time = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index a5de5e1fb5..526f5564c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -31,11 +31,11 @@ import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient._ import scala.collection.Map - +import scala.reflect.ClassTag /** * Input stream that pulls messages from a Kafka Broker. - * + * * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. @@ -43,8 +43,8 @@ import scala.collection.Map */ private[streaming] class KafkaInputDStream[ - K: ClassManifest, - V: ClassManifest, + K: ClassTag, + V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, @@ -61,8 +61,8 @@ class KafkaInputDStream[ private[streaming] class KafkaReceiver[ - K: ClassManifest, - V: ClassManifest, + K: ClassTag, + V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( kafkaParams: Map[String, String], @@ -104,17 +104,18 @@ class KafkaReceiver[ tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) } - // Create Threads for each Topic/Message Stream we are listening - val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties]) + val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[K]] - val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties]) + val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[V]] + // Create Threads for each Topic/Message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) + // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } @@ -122,7 +123,7 @@ class KafkaReceiver[ } // Handles Kafka Messages - private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V]) + private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) extends Runnable { def run() { logInfo("Starting MessageHandler.") @@ -146,7 +147,7 @@ class KafkaReceiver[ zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case _ : Throwable => // swallow } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index ac0528213d..ef4a737568 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -37,6 +37,7 @@ import org.eclipse.paho.client.mqttv3.MqttTopic import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ +import scala.reflect.ClassTag /** * Input stream that subscribe messages from a Mqtt Broker. @@ -47,7 +48,7 @@ import scala.collection.JavaConversions._ */ private[streaming] -class MQTTInputDStream[T: ClassManifest]( +class MQTTInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index 5329601a6f..8a04060e5b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( +class MapPartitionedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 8290df90a2..0ce364fd46 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag private[streaming] -class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( +class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( parent: DStream[(K, V)], mapValueFunc: V => U ) extends DStream[(K, U)](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index b1682afea3..c0b7491d09 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class MappedDStream[T: ClassManifest, U: ClassManifest] ( +class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 1df7f547c9..5add20871e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -21,11 +21,12 @@ import java.util.concurrent.ArrayBlockingQueue import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.reflect.ClassTag import akka.actor.{Props, Actor} import akka.pattern.ask -import akka.dispatch.Await -import akka.util.duration._ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} import org.apache.spark.streaming._ @@ -43,7 +44,7 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ -abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) +abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { // This is an unique identifier that is used to match the network receiver with the @@ -85,7 +86,7 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe * Abstract class of a receiver that can be run on worker nodes to receive external data. See * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation. */ -abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging { +abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { initLogging() @@ -177,8 +178,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log logInfo("Attempting to register with tracker") val ip = System.getProperty("spark.driver.host", "localhost") val port = System.getProperty("spark.driver.port", "7077").toInt - val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) - val tracker = env.actorSystem.actorFor(url) + val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) + val tracker = env.actorSystem.actorSelection(url) val timeout = 5.seconds override def preStart() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala index 15782f5c11..6f9477020a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala @@ -18,9 +18,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.StreamingContext +import scala.reflect.ClassTag private[streaming] -class PluggableInputDStream[T: ClassManifest]( +class PluggableInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 7d9f3521b1..97325f8ea3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD - import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer import org.apache.spark.streaming.{Time, StreamingContext} +import scala.reflect.ClassTag private[streaming] -class QueueInputDStream[T: ClassManifest]( +class QueueInputDStream[T: ClassTag]( @transient ssc: StreamingContext, val queue: Queue[RDD[T]], oneAtATime: Boolean, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index 10ed4ef78d..dea0f26f90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -21,6 +21,8 @@ import org.apache.spark.Logging import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.StreamingContext +import scala.reflect.ClassTag + import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.channels.{ReadableByteChannel, SocketChannel} @@ -35,7 +37,7 @@ import java.util.concurrent.ArrayBlockingQueue * in the format that the system is configured with. */ private[streaming] -class RawInputDStream[T: ClassManifest]( +class RawInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index b88a4db959..db56345ca8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -28,8 +28,11 @@ import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + private[streaming] -class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( +class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( parent: DStream[(K, V)], reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, @@ -49,7 +52,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - // Reduce each batch of data using reduceByKey which will be further reduced by window + // Reduce each batch of data using reduceByKey which will be further reduced by window // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) @@ -170,5 +173,3 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( } } } - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index a95e66d761..e6e0022097 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -21,9 +21,10 @@ import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.streaming.{Duration, DStream, Time} +import scala.reflect.ClassTag private[streaming] -class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( +class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( parent: DStream[(K,V)], createCombiner: V => C, mergeValue: (C, V) => C, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index e2539c7396..2cdd13f205 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -21,11 +21,13 @@ import org.apache.spark.streaming.StreamingContext import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NextIterator +import scala.reflect.ClassTag + import java.io._ import java.net.Socket private[streaming] -class SocketInputDStream[T: ClassManifest]( +class SocketInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, @@ -39,7 +41,7 @@ class SocketInputDStream[T: ClassManifest]( } private[streaming] -class SocketReceiver[T: ClassManifest]( +class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 362a6bf4cc..e0ff3ccba4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -23,8 +23,10 @@ import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Duration, Time, DStream} +import scala.reflect.ClassTag + private[streaming] -class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( +class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 71bcb2b390..aeea060df7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Time} +import scala.reflect.ClassTag private[streaming] -class TransformedDStream[U: ClassManifest] ( +class TransformedDStream[U: ClassTag] ( parents: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[U] ) extends DStream[U](parents.head.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index c696bb70a8..0d84ec84f2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -22,8 +22,11 @@ import org.apache.spark.rdd.RDD import collection.mutable.ArrayBuffer import org.apache.spark.rdd.UnionRDD +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + private[streaming] -class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) +class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) extends DStream[T](parents.head.ssc) { if (parents.length == 0) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 3c57294269..73d959331a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -22,8 +22,10 @@ import org.apache.spark.rdd.UnionRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import scala.reflect.ClassTag + private[streaming] -class WindowedDStream[T: ClassManifest]( +class WindowedDStream[T: ClassTag]( parent: DStream[T], _windowDuration: Duration, _slideDuration: Duration) @@ -52,6 +54,3 @@ class WindowedDStream[T: ClassManifest]( Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) } } - - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index ef0f85a717..fdf5371a89 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -20,6 +20,10 @@ package org.apache.spark.streaming.receivers import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } +import akka.actor.SupervisorStrategy._ + +import scala.concurrent.duration._ +import scala.reflect.ClassTag import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.dstream.NetworkReceiver @@ -28,12 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer -/** A helper with set of defaults for supervisor strategy **/ +/** A helper with set of defaults for supervisor strategy */ object ReceiverSupervisorStrategy { - import akka.util.duration._ - import akka.actor.SupervisorStrategy._ - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException ⇒ Restart @@ -48,10 +49,10 @@ object ReceiverSupervisorStrategy { * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html * * @example {{{ - * class MyActor extends Actor with Receiver{ - * def receive { - * case anything :String ⇒ pushBlock(anything) - * } + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String => pushBlock(anything) + * } * } * //Can be plugged in actorStream as follows * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") @@ -65,11 +66,11 @@ object ReceiverSupervisorStrategy { * */ trait Receiver { self: Actor ⇒ - def pushBlock[T: ClassManifest](iter: Iterator[T]) { + def pushBlock[T: ClassTag](iter: Iterator[T]) { context.parent ! Data(iter) } - def pushBlock[T: ClassManifest](data: T) { + def pushBlock[T: ClassTag](data: T) { context.parent ! Data(data) } @@ -83,8 +84,8 @@ case class Statistics(numberOfMsgs: Int, numberOfHiccups: Int, otherInfo: String) -/** Case class to receive data sent by child actors **/ -private[streaming] case class Data[T: ClassManifest](data: T) +/** Case class to receive data sent by child actors */ +private[streaming] case class Data[T: ClassTag](data: T) /** * Provides Actors as receivers for receiving stream. @@ -95,19 +96,19 @@ private[streaming] case class Data[T: ClassManifest](data: T) * his own Actor to run as receiver for Spark Streaming input source. * * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. - * + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * * Here's a way to start more supervisor/workers as its children. * * @example {{{ - * context.parent ! Props(new Supervisor) + * context.parent ! Props(new Supervisor) * }}} OR {{{ * context.parent ! Props(new Worker,"Worker") * }}} * * */ -private[streaming] class ActorReceiver[T: ClassManifest]( +private[streaming] class ActorReceiver[T: ClassTag]( props: Props, name: String, storageLevel: StorageLevel, @@ -120,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassManifest]( protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), "Supervisor" + streamId) - private class Supervisor extends Actor { + class Supervisor extends Actor { override val supervisorStrategy = receiverSupervisorStrategy val worker = context.actorOf(props, name) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala index 043bb8c8bf..f164d516b0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala @@ -17,7 +17,10 @@ package org.apache.spark.streaming.receivers +import scala.reflect.ClassTag + import akka.actor.Actor +import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging @@ -25,12 +28,12 @@ import org.apache.spark.Logging /** * A receiver to subscribe to ZeroMQ stream. */ -private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, +private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) + bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) extends Actor with Receiver with Logging { - override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) def receive: Receive = { @@ -38,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, case Connecting ⇒ logInfo("connecting ...") case m: ZMQMessage ⇒ - logDebug("Received message for:" + m.firstFrameAsString) + logDebug("Received message for:" + m.frame(0)) //We ignore first frame for processing as it is the topic - val bytes = m.frames.tail.map(_.payload) + val bytes = m.frames.tail pushBlock(bytesToObjects(bytes)) case Closed ⇒ logInfo("received closed ") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index c759302a61..abff55d77c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue +import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import akka.util.duration._ import akka.dispatch._ import org.apache.spark.storage.BlockId import org.apache.spark.streaming.{Time, StreamingContext} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 6977957126..4a3993e3e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -25,6 +25,7 @@ import StreamingContext._ import scala.util.Random import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag import java.io.{File, ObjectInputStream, IOException} import java.util.UUID @@ -120,7 +121,7 @@ object MasterFailureTest extends Logging { * Tests stream operation with multiple master failures, and verifies whether the * final set of output values is as expected or not. */ - def testOperation[T: ClassManifest]( + def testOperation[T: ClassTag]( directory: String, batchDuration: Duration, input: Seq[String], @@ -158,7 +159,7 @@ object MasterFailureTest extends Logging { * and batch duration. Returns the streaming context and the directory to which * files should be written for testing. */ - private def setupStreams[T: ClassManifest]( + private def setupStreams[T: ClassTag]( directory: String, batchDuration: Duration, operation: DStream[String] => DStream[T] @@ -192,7 +193,7 @@ object MasterFailureTest extends Logging { * Repeatedly starts and kills the streaming context until timed out or * the last expected output is generated. Finally, return */ - private def runStreams[T: ClassManifest]( + private def runStreams[T: ClassTag]( ssc_ : StreamingContext, lastExpectedOutput: T, maxTimeToRun: Long @@ -274,7 +275,7 @@ object MasterFailureTest extends Logging { * duplicate batch outputs of values from the `output`. As a result, the * expected output should not have consecutive batches with the same values as output. */ - private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) { + private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) { // Verify whether expected outputs do not consecutive batches with same output for (i <- 0 until expectedOutput.size - 1) { assert(expectedOutput(i) != expectedOutput(i+1), @@ -305,7 +306,7 @@ object MasterFailureTest extends Logging { * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. */ private[streaming] -class TestOutputStream[T: ClassManifest]( +class TestOutputStream[T: ClassTag]( parent: DStream[T], val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] ) extends ForEachDStream[T]( @@ -380,24 +381,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 - var done = false - while (!done && tries < maxTries) { - tries += 1 - try { - // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) - fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) - fs.rename(tempHadoopFile, hadoopFile) - done = true - } catch { - case ioe: IOException => { - fs = testDir.getFileSystem(new Configuration()) - logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) - } - } + var done = false + while (!done && tries < maxTries) { + tries += 1 + try { + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) + done = true + } catch { + case ioe: IOException => { + fs = testDir.getFileSystem(new Configuration()) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + } + } } - if (!done) + if (!done) logError("Could not generate file " + hadoopFile) - else + else logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) Thread.sleep(interval) localFile.delete() @@ -411,5 +412,3 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } } } - - |