aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-18 17:51:14 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-18 17:51:14 -0800
commite93b391d75a1c2e17ad93caff39e8fc34d640935 (patch)
tree925eb68b33ddbabc4482aae4b6ead51668a22653 /streaming
parentb80ec05635132f96772545803a10a1bbfa1250e7 (diff)
parent5ea187277c2b11e5db813f7ff9f214d7b85190f6 (diff)
downloadspark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.gz
spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.bz2
spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.zip
Merge branch 'apache-master' into scheduler-update
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/pom.xml22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala37
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala63
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala45
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala79
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala93
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala108
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala23
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala35
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala45
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java88
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala29
42 files changed, 458 insertions, 406 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 40892937b8..e3b6fee9b2 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.9.3</artifactId>
+ <artifactId>spark-streaming_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Streaming</name>
<url>http://spark.incubator.apache.org/</url>
@@ -48,7 +48,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -61,8 +61,8 @@
<version>1.9.11</version>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
+ <groupId>com.sksamuel.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.0-beta1</version>
<exclusions>
<exclusion>
@@ -111,16 +111,16 @@
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-zeromq</artifactId>
+ <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.9.3</artifactId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -134,14 +134,18 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
<version>0.4.0</version>
</dependency>
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 8001c49a76..a78d3965ee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -26,13 +26,14 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
@@ -51,7 +52,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
* - A function that is used to generate an RDD after each time interval
*/
-abstract class DStream[T: ClassManifest] (
+abstract class DStream[T: ClassTag] (
@transient protected[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
@@ -77,7 +78,7 @@ abstract class DStream[T: ClassManifest] (
// RDDs generated, marked as protected[streaming] so that testsuites can access it
@transient
protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
-
+
// Time zero for the DStream
protected[streaming] var zeroTime: Time = null
@@ -269,16 +270,16 @@ abstract class DStream[T: ClassManifest] (
/**
* Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
* method that should not be called directly.
- */
+ */
protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
-
- // If an RDD was already generated and is being reused, then
+
+ // If an RDD was already generated and is being reused, then
// probably all RDDs in this DStream will be reused and hence should be cached
case Some(oldRDD) => Some(oldRDD)
-
+
// if RDD was not generated, and if the time is valid
// (based on sliding time of this DStream), then generate the RDD
case None => {
@@ -295,7 +296,7 @@ abstract class DStream[T: ClassManifest] (
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
- case None =>
+ case None =>
None
}
} else {
@@ -339,7 +340,7 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.clearOldMetadata(time))
}
- /* Adds metadata to the Stream while it is running.
+ /* Adds metadata to the Stream while it is running.
* This methd should be overwritten by sublcasses of InputDStream.
*/
protected[streaming] def addMetadata(metadata: Any) {
@@ -411,7 +412,7 @@ abstract class DStream[T: ClassManifest] (
// =======================================================================
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
+ def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
@@ -419,7 +420,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
+ def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
@@ -445,7 +446,7 @@ abstract class DStream[T: ClassManifest] (
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[U: ClassManifest](
+ def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U] = {
@@ -492,16 +493,14 @@ abstract class DStream[T: ClassManifest] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
- newStream
+ ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
}
@@ -509,7 +508,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
//new TransformedDStream(this, context.sparkContext.clean(transformFunc))
val cleanedF = context.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
@@ -523,7 +522,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[U: ClassManifest, V: ClassManifest](
+ def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
@@ -534,7 +533,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[U: ClassManifest, V: ClassManifest](
+ def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
index 58a0da2870..3fd5d52403 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
@@ -20,13 +20,16 @@ package org.apache.spark.streaming
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
+
import collection.mutable.HashMap
import org.apache.spark.Logging
+import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
private[streaming]
-class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
+class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()
@@ -107,4 +110,3 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
"[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
}
}
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 8c12fd11ef..80af96c060 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -18,16 +18,15 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
-import org.apache.spark.streaming.dstream.{ShuffledDStream}
-import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
+import org.apache.spark.streaming.dstream._
import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{Manifests, RDD, PairRDDFunctions}
+import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@@ -35,7 +34,7 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
-class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
+class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
private[streaming] def ssc = self.ssc
@@ -105,7 +104,7 @@ extends Serializable {
* combineByKey for RDDs. Please refer to combineByKey in
* [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
*/
- def combineByKey[C: ClassManifest](
+ def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
@@ -205,7 +204,7 @@ extends Serializable {
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
+ reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = {
@@ -336,7 +335,7 @@ extends Serializable {
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
@@ -351,7 +350,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
): DStream[(K, S)] = {
@@ -367,7 +366,7 @@ extends Serializable {
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = {
@@ -390,7 +389,7 @@ extends Serializable {
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
@@ -402,7 +401,7 @@ extends Serializable {
* Return a new DStream by applying a map function to the value of each key-value pairs in
* 'this' DStream without changing the key.
*/
- def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
+ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
@@ -410,7 +409,7 @@ extends Serializable {
* Return a new DStream by applying a flatmap function to the value of each key-value pairs in
* 'this' DStream without changing the key.
*/
- def flatMapValues[U: ClassManifest](
+ def flatMapValues[U: ClassTag](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
@@ -421,7 +420,7 @@ extends Serializable {
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
@@ -429,7 +428,7 @@ extends Serializable {
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(numPartitions))
}
@@ -437,7 +436,7 @@ extends Serializable {
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
*/
- def cogroup[W: ClassManifest](
+ def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -451,7 +450,7 @@ extends Serializable {
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
- def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
@@ -459,7 +458,7 @@ extends Serializable {
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner(numPartitions))
}
@@ -467,7 +466,7 @@ extends Serializable {
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
- def join[W: ClassManifest](
+ def join[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
@@ -482,7 +481,7 @@ extends Serializable {
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
leftOuterJoin[W](other, defaultPartitioner())
}
@@ -491,7 +490,7 @@ extends Serializable {
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
- def leftOuterJoin[W: ClassManifest](
+ def leftOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (V, Option[W]))] = {
@@ -503,7 +502,7 @@ extends Serializable {
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
- def leftOuterJoin[W: ClassManifest](
+ def leftOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, Option[W]))] = {
@@ -518,7 +517,7 @@ extends Serializable {
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+ def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
rightOuterJoin[W](other, defaultPartitioner())
}
@@ -527,7 +526,7 @@ extends Serializable {
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
- def rightOuterJoin[W: ClassManifest](
+ def rightOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (Option[V], W))] = {
@@ -539,7 +538,7 @@ extends Serializable {
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
- def rightOuterJoin[W: ClassManifest](
+ def rightOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Option[V], W))] = {
@@ -556,8 +555,8 @@ extends Serializable {
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassManifest[F]) {
- saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ )(implicit fm: ClassTag[F]) {
+ saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -586,8 +585,8 @@ extends Serializable {
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassManifest[F]) {
- saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ )(implicit fm: ClassTag[F]) {
+ saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -609,9 +608,7 @@ extends Serializable {
self.foreach(saveFunc)
}
- private def getKeyClass() = implicitly[ClassManifest[K]].erasure
+ private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
- private def getValueClass() = implicitly[ClassManifest[V]].erasure
+ private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
}
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 83f1cadb48..fedbbde80c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -34,6 +34,7 @@ import org.apache.spark.streaming.receivers.ActorReceiver
import scala.collection.mutable.Queue
import scala.collection.Map
+import scala.reflect.ClassTag
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
@@ -47,7 +48,7 @@ import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.streaming.scheduler._
-
+import akka.util.ByteString
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -193,7 +194,7 @@ class StreamingContext private (
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of NetworkReceiver
*/
- def networkStream[T: ClassManifest](
+ def networkStream[T: ClassTag](
receiver: NetworkReceiver[T]): DStream[T] = {
val inputStream = new PluggableInputDStream[T](this,
receiver)
@@ -213,7 +214,7 @@ class StreamingContext private (
* to ensure the type safety, i.e parametrized type of data received and actorStream
* should be same.
*/
- def actorStream[T: ClassManifest](
+ def actorStream[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
@@ -233,14 +234,14 @@ class StreamingContext private (
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
- def zeroMQStream[T: ClassManifest](
+ def zeroMQStream[T: ClassTag](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
- actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+ actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
@@ -277,8 +278,8 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
*/
def kafkaStream[
- K: ClassManifest,
- V: ClassManifest,
+ K: ClassTag,
+ V: ClassTag,
U <: kafka.serializer.Decoder[_]: Manifest,
T <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: Map[String, String],
@@ -317,7 +318,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def socketStream[T: ClassManifest](
+ def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
@@ -339,7 +340,7 @@ class StreamingContext private (
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
registerInputStream(inputStream)
inputStream
}
@@ -354,7 +355,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
- def rawSocketStream[T: ClassManifest](
+ def rawSocketStream[T: ClassTag](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
@@ -374,9 +375,9 @@ class StreamingContext private (
* @tparam F Input format for reading HDFS file
*/
def fileStream[
- K: ClassManifest,
- V: ClassManifest,
- F <: NewInputFormat[K, V]: ClassManifest
+ K: ClassTag,
+ V: ClassTag,
+ F <: NewInputFormat[K, V]: ClassTag
] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
registerInputStream(inputStream)
@@ -394,9 +395,9 @@ class StreamingContext private (
* @tparam F Input format for reading HDFS file
*/
def fileStream[
- K: ClassManifest,
- V: ClassManifest,
- F <: NewInputFormat[K, V]: ClassManifest
+ K: ClassTag,
+ V: ClassTag,
+ F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
registerInputStream(inputStream)
@@ -438,7 +439,7 @@ class StreamingContext private (
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
- def queueStream[T: ClassManifest](
+ def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): DStream[T] = {
@@ -454,7 +455,7 @@ class StreamingContext private (
* Set as null if no RDD should be returned when empty
* @tparam T Type of objects in the RDD
*/
- def queueStream[T: ClassManifest](
+ def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
@@ -482,7 +483,7 @@ class StreamingContext private (
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
- def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
+ def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
@@ -490,7 +491,7 @@ class StreamingContext private (
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams.
*/
- def transform[T: ClassManifest](
+ def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
@@ -569,7 +570,7 @@ class StreamingContext private (
object StreamingContext {
- implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
+ implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index 1a2aeaa879..d29033df32 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -23,9 +23,11 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
+
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
@@ -41,7 +43,7 @@ import org.apache.spark.rdd.RDD
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
*/
-class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
+class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
@@ -103,6 +105,6 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
object JavaDStream {
- implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
+ implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] =
new JavaDStream[T](dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 09189eadd8..64f38ce1c0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -21,6 +21,7 @@ import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.spark.streaming._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
@@ -32,7 +33,7 @@ import JavaDStream._
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
extends Serializable {
- implicit val classManifest: ClassManifest[T]
+ implicit val classTag: ClassTag[T]
def dstream: DStream[T]
@@ -136,7 +137,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -157,7 +158,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -260,8 +261,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -272,8 +273,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -285,10 +286,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ implicit val cmk: ClassTag[K2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
+ implicit val cmv: ClassTag[V2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -300,10 +301,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ implicit val cmk: ClassTag[K2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
+ implicit val cmv: ClassTag[V2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -317,10 +318,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmu: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
- implicit val cmv: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cmu: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cmv: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
@@ -334,12 +335,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
): JavaPairDStream[K2, V2] = {
- implicit val cmu: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
- implicit val cmk2: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv2: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ implicit val cmu: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cmk2: ClassTag[K2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
+ implicit val cmv2: ClassTag[V2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
@@ -353,12 +354,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmk2: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv2: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
- implicit val cmw: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cmk2: ClassTag[K2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
+ implicit val cmv2: ClassTag[V2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmw: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
@@ -372,14 +373,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
): JavaPairDStream[K3, V3] = {
- implicit val cmk2: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv2: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
- implicit val cmk3: ClassManifest[K3] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]]
- implicit val cmv3: ClassManifest[V3] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]]
+ implicit val cmk2: ClassTag[K2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
+ implicit val cmv2: ClassTag[V2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmk3: ClassTag[K3] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
+ implicit val cmv3: ClassTag[V3] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index c6cd635afa..dfd6e27c3e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -21,6 +21,7 @@ import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
@@ -36,8 +37,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
- implicit val kManifest: ClassManifest[K],
- implicit val vManifest: ClassManifest[V])
+ implicit val kManifest: ClassTag[K],
+ implicit val vManifest: ClassTag[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
@@ -162,8 +163,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
- implicit val cm: ClassManifest[C] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ implicit val cm: ClassTag[C] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
@@ -428,8 +429,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
- implicit val cm: ClassManifest[S] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
+ implicit val cm: ClassTag[S] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
@@ -446,8 +447,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
- implicit val cm: ClassManifest[S] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
+ implicit val cm: ClassTag[S] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}
@@ -464,8 +465,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
- implicit val cm: ClassManifest[S] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
+ implicit val cm: ClassTag[S] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}
@@ -475,8 +476,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* 'this' DStream without changing the key.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.mapValues(f)
}
@@ -487,8 +488,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.flatMapValues(fn)
}
@@ -498,8 +499,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -511,8 +512,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.cogroup(other.dstream, numPartitions)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -525,8 +526,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -536,8 +537,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.join(other.dstream)
}
@@ -546,8 +547,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.join(other.dstream, numPartitions)
}
@@ -559,8 +560,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.join(other.dstream, partitioner)
}
@@ -570,8 +571,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.leftOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -585,8 +586,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -599,8 +600,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -611,8 +612,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.rightOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -626,8 +627,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -641,8 +642,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -722,24 +723,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
new JavaDStream[(K, V)](dstream)
}
- override val classManifest: ClassManifest[(K, V)] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
}
object JavaPairDStream {
- implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = {
+ implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = {
new JavaPairDStream[K, V](dstream)
}
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmk: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val cmv: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
new JavaPairDStream[K, V](dstream.dstream)
}
- def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long])
+ def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
: JavaPairDStream[K, JLong] = {
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 7f9dab0ef9..80dcf87491 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -22,12 +22,15 @@ import java.io.InputStream
import java.util.{Map => JMap, List => JList}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
+import akka.util.ByteString
+
import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
@@ -141,10 +144,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
groupId: String,
topics: JMap[String, JInt])
: JavaPairDStream[String, String] = {
- implicit val cmt: ClassManifest[String] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
StorageLevel.MEMORY_ONLY_SER_2)
+
}
/**
@@ -162,8 +166,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaPairDStream[String, String] = {
- implicit val cmt: ClassManifest[String] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
@@ -189,10 +193,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaPairDStream[K, V] = {
- implicit val keyCmt: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val valueCmt: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val keyCmt: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val valueCmt: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
@@ -245,8 +249,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
storageLevel: StorageLevel)
: JavaDStream[T] = {
def fn = (x: InputStream) => converter.apply(x).toIterator
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
}
@@ -274,8 +278,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
hostname: String,
port: Int,
storageLevel: StorageLevel): JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
}
@@ -289,8 +293,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of the objects in the received blocks
*/
def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
}
@@ -304,12 +308,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
- implicit val cmf: ClassManifest[F] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
+ implicit val cmk: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val cmv: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val cmf: ClassTag[F] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
ssc.fileStream[K, V, F](directory)
}
@@ -404,7 +408,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def twitterStream(): JavaDStream[Status] = {
ssc.twitterStream()
}
-
+
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
@@ -422,8 +426,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
}
@@ -443,8 +447,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
name: String,
storageLevel: StorageLevel
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel)
}
@@ -462,8 +466,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
props: Props,
name: String
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name)
}
@@ -480,12 +484,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def zeroMQStream[T](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
}
@@ -505,9 +509,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
@@ -525,9 +529,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
@@ -547,8 +551,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue)
@@ -564,8 +568,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime)
@@ -585,8 +589,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
queue: java.util.Queue[JavaRDD[T]],
oneAtATime: Boolean,
defaultRDD: JavaRDD[T]): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
@@ -597,7 +601,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
*/
def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
- implicit val cm: ClassManifest[T] = first.classManifest
+ implicit val cm: ClassTag[T] = first.classTag
ssc.union(dstreams)(cm)
}
@@ -609,9 +613,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
rest: JList[JavaPairDStream[K, V]]
): JavaPairDStream[K, V] = {
val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
- implicit val cm: ClassManifest[(K, V)] = first.classManifest
- implicit val kcm: ClassManifest[K] = first.kManifest
- implicit val vcm: ClassManifest[V] = first.vManifest
+ implicit val cm: ClassTag[(K, V)] = first.classTag
+ implicit val kcm: ClassTag[K] = first.kManifest
+ implicit val vcm: ClassTag[V] = first.vManifest
new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm)
}
@@ -628,8 +632,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
dstreams: JList[JavaDStream[_]],
transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]]
): JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val scalaDStreams = dstreams.map(_.dstream).toSeq
val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
@@ -651,10 +655,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
dstreams: JList[JavaDStream[_]],
transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
): JavaPairDStream[K, V] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmk: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val cmv: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val scalaDStreams = dstreams.map(_.dstream).toSeq
val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index a9a05c9981..f396c34758 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -19,11 +19,12 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, StreamingContext}
+import scala.reflect.ClassTag
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
-class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T])
+class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {
override def start() {}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index fea0573b77..39e25239bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,14 +26,16 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.{HashSet, HashMap}
+import scala.reflect.ClassTag
+
import java.io.{ObjectInputStream, IOException}
private[streaming]
-class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
+class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
- newFilesOnly: Boolean = true)
+ newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
@@ -54,7 +56,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
-
+
override def stop() { }
/**
@@ -100,7 +102,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
return true
- }
+ }
}
}
logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
@@ -195,5 +197,3 @@ private[streaming]
object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index 91ee2c1a36..db2e0a4cee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class FilteredDStream[T: ClassManifest](
+class FilteredDStream[T: ClassTag](
parent: DStream[T],
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index ca7d7ca49e..244dc3ee4f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
private[streaming]
-class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
parent: DStream[(K, V)],
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index b37966f9a7..336c4b7a92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
index a0189eca04..60d79175f1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
@@ -34,7 +35,7 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
private[streaming]
-class FlumeInputDStream[T: ClassManifest](
+class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 0072248b7d..364abcde68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.streaming.scheduler.Job
+import scala.reflect.ClassTag
private[streaming]
-class ForEachDStream[T: ClassManifest] (
+class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index 4294b07d91..23136f44fa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class GlommedDStream[T: ClassManifest](parent: DStream[T])
+class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 674b27118c..f01e67fe13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -19,6 +19,8 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
+import scala.reflect.ClassTag
+
/**
* This is the abstract base class for all input streams. This class provides to methods
* start() and stop() which called by the scheduler to start and stop receiving data/
@@ -30,7 +32,7 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
* that requires running a receiver on the worker nodes, use NetworkInputDStream
* as the parent class.
*/
-abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
+abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index a5de5e1fb5..526f5564c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -31,11 +31,11 @@ import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
import scala.collection.Map
-
+import scala.reflect.ClassTag
/**
* Input stream that pulls messages from a Kafka Broker.
- *
+ *
* @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -43,8 +43,8 @@ import scala.collection.Map
*/
private[streaming]
class KafkaInputDStream[
- K: ClassManifest,
- V: ClassManifest,
+ K: ClassTag,
+ V: ClassTag,
U <: Decoder[_]: Manifest,
T <: Decoder[_]: Manifest](
@transient ssc_ : StreamingContext,
@@ -61,8 +61,8 @@ class KafkaInputDStream[
private[streaming]
class KafkaReceiver[
- K: ClassManifest,
- V: ClassManifest,
+ K: ClassTag,
+ V: ClassTag,
U <: Decoder[_]: Manifest,
T <: Decoder[_]: Manifest](
kafkaParams: Map[String, String],
@@ -104,17 +104,18 @@ class KafkaReceiver[
tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
}
- // Create Threads for each Topic/Message Stream we are listening
- val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties])
+ val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
- val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties])
+ val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]
+ // Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
+
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
@@ -122,7 +123,7 @@ class KafkaReceiver[
}
// Handles Kafka Messages
- private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V])
+ private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
@@ -146,7 +147,7 @@ class KafkaReceiver[
zk.deleteRecursive(dir)
zk.close()
} catch {
- case _ => // swallow
+ case _ : Throwable => // swallow
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
index ac0528213d..ef4a737568 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
@@ -37,6 +37,7 @@ import org.eclipse.paho.client.mqttv3.MqttTopic
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
/**
* Input stream that subscribe messages from a Mqtt Broker.
@@ -47,7 +48,7 @@ import scala.collection.JavaConversions._
*/
private[streaming]
-class MQTTInputDStream[T: ClassManifest](
+class MQTTInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 5329601a6f..8a04060e5b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+class MapPartitionedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 8290df90a2..0ce364fd46 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
private[streaming]
-class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
parent: DStream[(K, V)],
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index b1682afea3..c0b7491d09 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class MappedDStream[T: ClassManifest, U: ClassManifest] (
+class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 1df7f547c9..5add20871e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -21,11 +21,12 @@ import java.util.concurrent.ArrayBlockingQueue
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
import akka.actor.{Props, Actor}
import akka.pattern.ask
-import akka.dispatch.Await
-import akka.util.duration._
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.streaming._
@@ -43,7 +44,7 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
+abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
// This is an unique identifier that is used to match the network receiver with the
@@ -85,7 +86,7 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
* Abstract class of a receiver that can be run on worker nodes to receive external data. See
* [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation.
*/
-abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging {
+abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
initLogging()
@@ -177,8 +178,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
logInfo("Attempting to register with tracker")
val ip = System.getProperty("spark.driver.host", "localhost")
val port = System.getProperty("spark.driver.port", "7077").toInt
- val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
- val tracker = env.actorSystem.actorFor(url)
+ val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
+ val tracker = env.actorSystem.actorSelection(url)
val timeout = 5.seconds
override def preStart() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 15782f5c11..6f9477020a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -18,9 +18,10 @@
package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.StreamingContext
+import scala.reflect.ClassTag
private[streaming]
-class PluggableInputDStream[T: ClassManifest](
+class PluggableInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 7d9f3521b1..97325f8ea3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
-
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.streaming.{Time, StreamingContext}
+import scala.reflect.ClassTag
private[streaming]
-class QueueInputDStream[T: ClassManifest](
+class QueueInputDStream[T: ClassTag](
@transient ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index 10ed4ef78d..dea0f26f90 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -21,6 +21,8 @@ import org.apache.spark.Logging
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.StreamingContext
+import scala.reflect.ClassTag
+
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
@@ -35,7 +37,7 @@ import java.util.concurrent.ArrayBlockingQueue
* in the format that the system is configured with.
*/
private[streaming]
-class RawInputDStream[T: ClassManifest](
+class RawInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index b88a4db959..db56345ca8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -28,8 +28,11 @@ import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
private[streaming]
-class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
+class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -49,7 +52,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)
- // Reduce each batch of data using reduceByKey which will be further reduced by window
+ // Reduce each batch of data using reduceByKey which will be further reduced by window
// by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
@@ -170,5 +173,3 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
}
}
}
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index a95e66d761..e6e0022097 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -21,9 +21,10 @@ import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Duration, DStream, Time}
+import scala.reflect.ClassTag
private[streaming]
-class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index e2539c7396..2cdd13f205 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -21,11 +21,13 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NextIterator
+import scala.reflect.ClassTag
+
import java.io._
import java.net.Socket
private[streaming]
-class SocketInputDStream[T: ClassManifest](
+class SocketInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
@@ -39,7 +41,7 @@ class SocketInputDStream[T: ClassManifest](
}
private[streaming]
-class SocketReceiver[T: ClassManifest](
+class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 362a6bf4cc..e0ff3ccba4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -23,8 +23,10 @@ import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, Time, DStream}
+import scala.reflect.ClassTag
+
private[streaming]
-class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
+class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 71bcb2b390..aeea060df7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, DStream, Time}
+import scala.reflect.ClassTag
private[streaming]
-class TransformedDStream[U: ClassManifest] (
+class TransformedDStream[U: ClassTag] (
parents: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[U]
) extends DStream[U](parents.head.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index c696bb70a8..0d84ec84f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -22,8 +22,11 @@ import org.apache.spark.rdd.RDD
import collection.mutable.ArrayBuffer
import org.apache.spark.rdd.UnionRDD
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
private[streaming]
-class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
+class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
if (parents.length == 0) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 3c57294269..73d959331a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -22,8 +22,10 @@ import org.apache.spark.rdd.UnionRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import scala.reflect.ClassTag
+
private[streaming]
-class WindowedDStream[T: ClassManifest](
+class WindowedDStream[T: ClassTag](
parent: DStream[T],
_windowDuration: Duration,
_slideDuration: Duration)
@@ -52,6 +54,3 @@ class WindowedDStream[T: ClassManifest](
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}
-
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index ef0f85a717..fdf5371a89 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -20,6 +20,10 @@ package org.apache.spark.streaming.receivers
import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
import akka.actor.{ actorRef2Scala, ActorRef }
import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+import akka.actor.SupervisorStrategy._
+
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.dstream.NetworkReceiver
@@ -28,12 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
-/** A helper with set of defaults for supervisor strategy **/
+/** A helper with set of defaults for supervisor strategy */
object ReceiverSupervisorStrategy {
- import akka.util.duration._
- import akka.actor.SupervisorStrategy._
-
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException ⇒ Restart
@@ -48,10 +49,10 @@ object ReceiverSupervisorStrategy {
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
*
* @example {{{
- * class MyActor extends Actor with Receiver{
- * def receive {
- * case anything :String ⇒ pushBlock(anything)
- * }
+ * class MyActor extends Actor with Receiver{
+ * def receive {
+ * case anything :String => pushBlock(anything)
+ * }
* }
* //Can be plugged in actorStream as follows
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
@@ -65,11 +66,11 @@ object ReceiverSupervisorStrategy {
*
*/
trait Receiver { self: Actor ⇒
- def pushBlock[T: ClassManifest](iter: Iterator[T]) {
+ def pushBlock[T: ClassTag](iter: Iterator[T]) {
context.parent ! Data(iter)
}
- def pushBlock[T: ClassManifest](data: T) {
+ def pushBlock[T: ClassTag](data: T) {
context.parent ! Data(data)
}
@@ -83,8 +84,8 @@ case class Statistics(numberOfMsgs: Int,
numberOfHiccups: Int,
otherInfo: String)
-/** Case class to receive data sent by child actors **/
-private[streaming] case class Data[T: ClassManifest](data: T)
+/** Case class to receive data sent by child actors */
+private[streaming] case class Data[T: ClassTag](data: T)
/**
* Provides Actors as receivers for receiving stream.
@@ -95,19 +96,19 @@ private[streaming] case class Data[T: ClassManifest](data: T)
* his own Actor to run as receiver for Spark Streaming input source.
*
* This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
- *
+ * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ *
* Here's a way to start more supervisor/workers as its children.
*
* @example {{{
- * context.parent ! Props(new Supervisor)
+ * context.parent ! Props(new Supervisor)
* }}} OR {{{
* context.parent ! Props(new Worker,"Worker")
* }}}
*
*
*/
-private[streaming] class ActorReceiver[T: ClassManifest](
+private[streaming] class ActorReceiver[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel,
@@ -120,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassManifest](
protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
- private class Supervisor extends Actor {
+ class Supervisor extends Actor {
override val supervisorStrategy = receiverSupervisorStrategy
val worker = context.actorOf(props, name)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
index 043bb8c8bf..f164d516b0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -17,7 +17,10 @@
package org.apache.spark.streaming.receivers
+import scala.reflect.ClassTag
+
import akka.actor.Actor
+import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
@@ -25,12 +28,12 @@ import org.apache.spark.Logging
/**
* A receiver to subscribe to ZeroMQ stream.
*/
-private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
+private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+ override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
Connect(publisherUrl), subscribe)
def receive: Receive = {
@@ -38,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
case Connecting ⇒ logInfo("connecting ...")
case m: ZMQMessage ⇒
- logDebug("Received message for:" + m.firstFrameAsString)
+ logDebug("Received message for:" + m.frame(0))
//We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail.map(_.payload)
+ val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes))
case Closed ⇒ logInfo("received closed ")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index c759302a61..abff55d77c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
+import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.util.duration._
import akka.dispatch._
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming.{Time, StreamingContext}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 6977957126..4a3993e3e3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -25,6 +25,7 @@ import StreamingContext._
import scala.util.Random
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
import java.io.{File, ObjectInputStream, IOException}
import java.util.UUID
@@ -120,7 +121,7 @@ object MasterFailureTest extends Logging {
* Tests stream operation with multiple master failures, and verifies whether the
* final set of output values is as expected or not.
*/
- def testOperation[T: ClassManifest](
+ def testOperation[T: ClassTag](
directory: String,
batchDuration: Duration,
input: Seq[String],
@@ -158,7 +159,7 @@ object MasterFailureTest extends Logging {
* and batch duration. Returns the streaming context and the directory to which
* files should be written for testing.
*/
- private def setupStreams[T: ClassManifest](
+ private def setupStreams[T: ClassTag](
directory: String,
batchDuration: Duration,
operation: DStream[String] => DStream[T]
@@ -192,7 +193,7 @@ object MasterFailureTest extends Logging {
* Repeatedly starts and kills the streaming context until timed out or
* the last expected output is generated. Finally, return
*/
- private def runStreams[T: ClassManifest](
+ private def runStreams[T: ClassTag](
ssc_ : StreamingContext,
lastExpectedOutput: T,
maxTimeToRun: Long
@@ -274,7 +275,7 @@ object MasterFailureTest extends Logging {
* duplicate batch outputs of values from the `output`. As a result, the
* expected output should not have consecutive batches with the same values as output.
*/
- private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
+ private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
assert(expectedOutput(i) != expectedOutput(i+1),
@@ -305,7 +306,7 @@ object MasterFailureTest extends Logging {
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
private[streaming]
-class TestOutputStream[T: ClassManifest](
+class TestOutputStream[T: ClassTag](
parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
) extends ForEachDStream[T](
@@ -380,24 +381,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
- var done = false
- while (!done && tries < maxTries) {
- tries += 1
- try {
- // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
- fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
- fs.rename(tempHadoopFile, hadoopFile)
- done = true
- } catch {
- case ioe: IOException => {
- fs = testDir.getFileSystem(new Configuration())
- logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
- }
- }
+ var done = false
+ while (!done && tries < maxTries) {
+ tries += 1
+ try {
+ // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+ fs.rename(tempHadoopFile, hadoopFile)
+ done = true
+ } catch {
+ case ioe: IOException => {
+ fs = testDir.getFileSystem(new Configuration())
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ }
+ }
}
- if (!done)
+ if (!done)
logError("Could not generate file " + hadoopFile)
- else
+ else
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
Thread.sleep(interval)
localFile.delete()
@@ -411,5 +412,3 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
}
}
}
-
-
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index ad4a8b9535..daeb99f5b7 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -21,28 +21,31 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+
import kafka.serializer.StringDecoder;
+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+
import scala.Tuple2;
+import twitter4j.Status;
+
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.SparkFlumeEvent;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.JavaCheckpointTestUtils;
-import org.apache.spark.streaming.InputStreamsSuite;
import java.io.*;
import java.util.*;
@@ -51,7 +54,6 @@ import akka.actor.Props;
import akka.zeromq.Subscribe;
-
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
@@ -86,8 +88,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(3L),
Arrays.asList(1L));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream count = stream.count();
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Long> count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
@@ -103,8 +105,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(5,5),
Arrays.asList(9,4));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
@@ -129,8 +131,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,4,5,6),
Arrays.asList(7,8,9));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.window(new Duration(2000));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> windowed = stream.window(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -153,8 +155,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
Arrays.asList(13,14,15,16,17,18));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
@@ -171,8 +173,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("giants"),
Arrays.asList("yankees"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("a");
@@ -227,8 +229,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(Arrays.asList("giants", "dodgers")),
Arrays.asList(Arrays.asList("yankees", "red socks")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream glommed = stream.glom();
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<List<String>> glommed = stream.glom();
JavaTestUtils.attachTestOutputStream(glommed);
List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -245,8 +247,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("GIANTSDODGERS"),
Arrays.asList("YANKEESRED SOCKS"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> in) {
String out = "";
@@ -288,8 +290,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(15),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream reduced = stream.reduce(new IntegerSum());
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -309,8 +311,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(39),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -695,8 +697,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
Arrays.asList("a","t","h","l","e","t","i","c","s"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split("(?!^)"));
@@ -742,8 +744,8 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, String>(9, "c"),
new Tuple2<Integer, String>(9, "s")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
List<Tuple2<Integer, String>> out = Lists.newArrayList();
@@ -776,10 +778,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(2,2,5,5),
Arrays.asList(3,3,6,6));
- JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
- JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
- JavaDStream unioned = stream1.union(stream2);
+ JavaDStream<Integer> unioned = stream1.union(stream2);
JavaTestUtils.attachTestOutputStream(unioned);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -790,7 +792,7 @@ public class JavaAPISuite implements Serializable {
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
*/
- public static <T extends Comparable> void assertOrderInvariantEquals(
+ public static <T extends Comparable<T>> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
for (List<T> list: expected) {
Collections.sort(list);
@@ -813,11 +815,11 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = stream.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2 call(String in) throws Exception {
+ public Tuple2<String, Integer> call(String in) throws Exception {
return new Tuple2<String, Integer>(in, in.length());
}
});
@@ -1540,8 +1542,8 @@ public class JavaAPISuite implements Serializable {
File tempDir = Files.createTempDir();
ssc.checkpoint(tempDir.getAbsolutePath());
- JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
@@ -1616,7 +1618,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void testSocketTextStream() {
- JavaDStream test = ssc.socketTextStream("localhost", 12345);
+ JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1636,7 +1638,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = ssc.socketStream(
+ JavaDStream<String> test = ssc.socketStream(
"localhost",
12345,
new Converter(),
@@ -1645,39 +1647,39 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTextFileStream() {
- JavaDStream test = ssc.textFileStream("/tmp/foo");
+ JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
}
@Test
public void testRawSocketStream() {
- JavaDStream test = ssc.rawSocketStream("localhost", 12345);
+ JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
- JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
+ JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
}
@Test
public void testFileStream() {
JavaPairDStream<String, String> foo =
- ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+ ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
}
@Test
public void testTwitterStream() {
String[] filters = new String[] { "good", "bad", "ugly" };
- JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
+ JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
}
@Test
public void testActorStream() {
- JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+ JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
}
@Test
public void testZeroMQStream() {
- JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+ JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
@Override
public Iterable<String> call(byte[][] b) throws Exception {
return null;
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 5e384eeee4..42ab9590d6 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -17,7 +17,9 @@
package org.apache.spark.streaming
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
+
import java.util.{List => JList}
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import org.apache.spark.streaming._
@@ -31,15 +33,15 @@ trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context.
* The stream will be derived from the supplied lists of Java objects.
- **/
+ */
def attachTestInputStream[T](
ssc: JavaStreamingContext,
data: JList[JList[T]],
numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
@@ -52,8 +54,8 @@ trait JavaTestBase extends TestSuiteBase {
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
dstream: JavaDStreamLike[T, This, R]) =
{
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
dstream.dstream.ssc.registerOutputStream(ostream)
}
@@ -67,8 +69,8 @@ trait JavaTestBase extends TestSuiteBase {
*/
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
- implicit val cm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
@@ -85,8 +87,8 @@ trait JavaTestBase extends TestSuiteBase {
*/
def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
numExpectedOutput: Int): JList[JList[JList[V]]] = {
- implicit val cm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[JList[V]]]()
res.map{entry =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c93075e3b3..67a0841535 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -20,14 +20,20 @@ package org.apache.spark.streaming
import dstream.FileInputDStream
import org.apache.spark.streaming.StreamingContext._
import java.io.File
-import runtime.RichInt
-import org.scalatest.BeforeAndAfter
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
import org.apache.commons.io.FileUtils
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import util.{Clock, ManualClock}
-import scala.util.Random
+import org.scalatest.BeforeAndAfter
+
import com.google.common.io.Files
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.FileInputDStream
+import org.apache.spark.streaming.util.ManualClock
+
+
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -68,13 +74,13 @@ class CheckpointSuite extends TestSuiteBase {
// Setup the streams
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(stateStreamCheckpointInterval)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
var ssc = setupStreams(input, operation)
var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
@@ -174,13 +180,13 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(batchDuration * 2)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
testCheckpointedOperation(input, operation, output, 7)
}
@@ -306,7 +312,7 @@ class CheckpointSuite extends TestSuiteBase {
* NOTE: This takes into consideration that the last batch processed before
* master failure will be re-processed after restart/recovery.
*/
- def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
+ def testCheckpointedOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -350,7 +356,7 @@ class CheckpointSuite extends TestSuiteBase {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
- def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
+ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index fbbeb8f0ee..e969e91d13 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -20,8 +20,9 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream}
import org.apache.spark.streaming.util.ManualClock
-import collection.mutable.ArrayBuffer
-import collection.mutable.SynchronizedBuffer
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.SynchronizedBuffer
+import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
@@ -35,7 +36,7 @@ import org.apache.spark.rdd.RDD
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
-class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](ssc_) {
def start() {}
@@ -63,7 +64,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
-class TestOutputStream[T: ClassManifest](parent: DStream[T],
+class TestOutputStream[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
@@ -85,7 +86,7 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T],
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequence of items.
*/
-class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
+class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
@@ -163,7 +164,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
@@ -189,7 +190,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the binary operation using the sequence
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
@@ -220,7 +221,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
*
* Returns a sequence of items for each RDD.
*/
- def runStreams[V: ClassManifest](
+ def runStreams[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
@@ -237,7 +238,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
* representing one partition.
*/
- def runStreamsWithPartitions[V: ClassManifest](
+ def runStreamsWithPartitions[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
@@ -293,7 +294,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* is same as the expected output values, by comparing the output
* collections either as lists (order matters) or sets (order does not matter)
*/
- def verifyOutput[V: ClassManifest](
+ def verifyOutput[V: ClassTag](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
useSet: Boolean
@@ -323,7 +324,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test unary DStream operation with a list of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -341,7 +342,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -358,7 +359,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test binary DStream operation with two lists of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],
@@ -378,7 +379,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],