aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/pom.xml20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala31
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala47
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala31
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala57
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala84
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala35
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala45
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala18
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala25
28 files changed, 308 insertions, 270 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 3b25fb49fb..3f2033f34a 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.9.3</artifactId>
+ <artifactId>spark-streaming_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Streaming</name>
<url>http://spark.incubator.apache.org/</url>
@@ -42,7 +42,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -87,17 +87,17 @@
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-zeromq</artifactId>
- <version>2.0.3</version>
+ <artifactId>akka-zeromq_${scala-short.version}</artifactId>
+ <version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_${scala-short.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.9.3</artifactId>
+ <artifactId>scalacheck_${scala-short.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -110,10 +110,14 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-${scala-short.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala-short.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 80da6bd30b..cd404fd408 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -29,6 +29,7 @@ import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
@@ -56,7 +57,7 @@ import org.apache.hadoop.conf.Configuration
* - A function that is used to generate an RDD after each time interval
*/
-abstract class DStream[T: ClassManifest] (
+abstract class DStream[T: ClassTag] (
@transient protected[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
@@ -82,7 +83,7 @@ abstract class DStream[T: ClassManifest] (
// RDDs generated, marked as protected[streaming] so that testsuites can access it
@transient
protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
-
+
// Time zero for the DStream
protected[streaming] var zeroTime: Time = null
@@ -274,16 +275,16 @@ abstract class DStream[T: ClassManifest] (
/**
* Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
* method that should not be called directly.
- */
+ */
protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
-
- // If an RDD was already generated and is being reused, then
+
+ // If an RDD was already generated and is being reused, then
// probably all RDDs in this DStream will be reused and hence should be cached
case Some(oldRDD) => Some(oldRDD)
-
+
// if RDD was not generated, and if the time is valid
// (based on sliding time of this DStream), then generate the RDD
case None => {
@@ -300,7 +301,7 @@ abstract class DStream[T: ClassManifest] (
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
- case None =>
+ case None =>
None
}
} else {
@@ -344,7 +345,7 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.clearOldMetadata(time))
}
- /* Adds metadata to the Stream while it is running.
+ /* Adds metadata to the Stream while it is running.
* This methd should be overwritten by sublcasses of InputDStream.
*/
protected[streaming] def addMetadata(metadata: Any) {
@@ -416,7 +417,7 @@ abstract class DStream[T: ClassManifest] (
// =======================================================================
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
+ def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
@@ -424,7 +425,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
+ def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
@@ -443,7 +444,7 @@ abstract class DStream[T: ClassManifest] (
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[U: ClassManifest](
+ def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U] = {
@@ -490,16 +491,14 @@ abstract class DStream[T: ClassManifest] (
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
- newStream
+ ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
- def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
transform((r: RDD[T], t: Time) => transformFunc(r))
}
@@ -507,7 +506,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
- def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, context.sparkContext.clean(transformFunc))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
index 58a0da2870..3fd5d52403 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
@@ -20,13 +20,16 @@ package org.apache.spark.streaming
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
+
import collection.mutable.HashMap
import org.apache.spark.Logging
+import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
private[streaming]
-class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
+class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()
@@ -107,4 +110,3 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
"[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
}
}
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
index aae79a4e6f..b761646dff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@ -28,8 +28,7 @@ import scala.collection.mutable.Queue
import akka.actor._
import akka.pattern.ask
-import akka.util.duration._
-import akka.dispatch._
+import scala.concurrent.duration._
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 757bc98981..f021e29619 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -24,10 +24,11 @@ import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStrea
import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{Manifests, RDD, PairRDDFunctions}
+import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@@ -35,7 +36,7 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
-class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
+class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
private[streaming] def ssc = self.ssc
@@ -105,7 +106,7 @@ extends Serializable {
* combineByKey for RDDs. Please refer to combineByKey in
* [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
*/
- def combineByKey[C: ClassManifest](
+ def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
@@ -205,7 +206,7 @@ extends Serializable {
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
+ reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = {
@@ -336,7 +337,7 @@ extends Serializable {
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
@@ -351,7 +352,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
): DStream[(K, S)] = {
@@ -367,7 +368,7 @@ extends Serializable {
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = {
@@ -390,7 +391,7 @@ extends Serializable {
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
@@ -399,11 +400,11 @@ extends Serializable {
}
- def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
+ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
- def flatMapValues[U: ClassManifest](
+ def flatMapValues[U: ClassTag](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
@@ -415,7 +416,7 @@ extends Serializable {
* key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
* of partitions.
*/
- def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
@@ -424,7 +425,7 @@ extends Serializable {
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
- def cogroup[W: ClassManifest](
+ def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -434,8 +435,8 @@ extends Serializable {
partitioner
)
val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
- classManifest[K],
- Manifests.seqSeqManifest
+ classTag[K],
+ ClassTags.seqSeqClassTag
)
pdfs.mapValues {
case Seq(vs, ws) =>
@@ -447,7 +448,7 @@ extends Serializable {
* Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
- def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
@@ -456,7 +457,7 @@ extends Serializable {
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
- def join[W: ClassManifest](
+ def join[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
@@ -474,8 +475,8 @@ extends Serializable {
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassManifest[F]) {
- saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ )(implicit fm: ClassTag[F]) {
+ saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -504,8 +505,8 @@ extends Serializable {
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassManifest[F]) {
- saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ )(implicit fm: ClassTag[F]) {
+ saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -527,9 +528,7 @@ extends Serializable {
self.foreach(saveFunc)
}
- private def getKeyClass() = implicitly[ClassManifest[K]].erasure
+ private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
- private def getValueClass() = implicitly[ClassManifest[V]].erasure
+ private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
}
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705..c722aa15ab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -34,6 +34,7 @@ import org.apache.spark.streaming.receivers.ActorReceiver
import scala.collection.mutable.Queue
import scala.collection.Map
+import scala.reflect.ClassTag
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
+import akka.util.ByteString
/**
@@ -187,7 +189,7 @@ class StreamingContext private (
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of NetworkReceiver
*/
- def networkStream[T: ClassManifest](
+ def networkStream[T: ClassTag](
receiver: NetworkReceiver[T]): DStream[T] = {
val inputStream = new PluggableInputDStream[T](this,
receiver)
@@ -207,7 +209,7 @@ class StreamingContext private (
* to ensure the type safety, i.e parametrized type of data received and actorStream
* should be same.
*/
- def actorStream[T: ClassManifest](
+ def actorStream[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
@@ -227,14 +229,14 @@ class StreamingContext private (
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
- def zeroMQStream[T: ClassManifest](
+ def zeroMQStream[T: ClassTag](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
- actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+ actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
@@ -266,7 +268,7 @@ class StreamingContext private (
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
- def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest](
+ def kafkaStream[T: ClassTag, D <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
@@ -303,7 +305,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def socketStream[T: ClassManifest](
+ def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
@@ -325,7 +327,7 @@ class StreamingContext private (
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
registerInputStream(inputStream)
inputStream
}
@@ -340,7 +342,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
- def rawSocketStream[T: ClassManifest](
+ def rawSocketStream[T: ClassTag](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
@@ -360,9 +362,9 @@ class StreamingContext private (
* @tparam F Input format for reading HDFS file
*/
def fileStream[
- K: ClassManifest,
- V: ClassManifest,
- F <: NewInputFormat[K, V]: ClassManifest
+ K: ClassTag,
+ V: ClassTag,
+ F <: NewInputFormat[K, V]: ClassTag
] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
registerInputStream(inputStream)
@@ -380,9 +382,9 @@ class StreamingContext private (
* @tparam F Input format for reading HDFS file
*/
def fileStream[
- K: ClassManifest,
- V: ClassManifest,
- F <: NewInputFormat[K, V]: ClassManifest
+ K: ClassTag,
+ V: ClassTag,
+ F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
registerInputStream(inputStream)
@@ -424,7 +426,7 @@ class StreamingContext private (
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
- def queueStream[T: ClassManifest](
+ def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): DStream[T] = {
@@ -440,7 +442,7 @@ class StreamingContext private (
* Set as null if no RDD should be returned when empty
* @tparam T Type of objects in the RDD
*/
- def queueStream[T: ClassManifest](
+ def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
@@ -453,7 +455,7 @@ class StreamingContext private (
/**
* Create a unified DStream from multiple DStreams of the same type and same interval
*/
- def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
+ def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
@@ -530,7 +532,7 @@ class StreamingContext private (
object StreamingContext {
- implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
+ implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index d1932b6b05..0d54d78ed3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -23,6 +23,8 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
+
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
@@ -41,7 +43,7 @@ import org.apache.spark.rdd.RDD
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
*/
-class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
+class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
@@ -97,6 +99,6 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
object JavaDStream {
- implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
+ implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] =
new JavaDStream[T](dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 459695b7ca..4508e48590 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -21,6 +21,7 @@ import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.spark.streaming._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
@@ -31,7 +32,7 @@ import JavaDStream._
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
extends Serializable {
- implicit val classManifest: ClassManifest[T]
+ implicit val classTag: ClassTag[T]
def dstream: DStream[T]
@@ -133,7 +134,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -154,7 +155,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -257,8 +258,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -269,8 +270,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -282,10 +283,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ implicit val cmk: ClassTag[K2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
+ implicit val cmv: ClassTag[V2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -297,10 +298,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ implicit val cmk: ClassTag[K2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
+ implicit val cmv: ClassTag[V2] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 978fca33ad..c80545b530 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -21,6 +21,7 @@ import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
@@ -36,8 +37,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
- implicit val kManifiest: ClassManifest[K],
- implicit val vManifest: ClassManifest[V])
+ implicit val kTag: ClassTag[K],
+ implicit val vTag: ClassTag[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
@@ -156,8 +157,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
- implicit val cm: ClassManifest[C] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ implicit val cm: ClassTag[C] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
@@ -422,8 +423,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
- implicit val cm: ClassManifest[S] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
+ implicit val cm: ClassTag[S] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
@@ -436,7 +437,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
@@ -452,7 +453,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S: ClassTag](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
@@ -460,16 +461,16 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.mapValues(f)
}
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.flatMapValues(fn)
}
@@ -480,8 +481,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -492,8 +493,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -503,8 +504,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* to partition each generated RDD into default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.join(other.dstream)
}
@@ -515,8 +516,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ implicit val cm: ClassTag[W] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.join(other.dstream, partitioner)
}
@@ -590,24 +591,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
- override val classManifest: ClassManifest[(K, V)] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
}
object JavaPairDStream {
- implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
+ implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)])
:JavaPairDStream[K, V] =
new JavaPairDStream[K, V](dstream)
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmk: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val cmv: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
new JavaPairDStream[K, V](dstream.dstream)
}
- def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long])
+ def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
: JavaPairDStream[K, JLong] = {
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 54ba3e6025..8242af6d5f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -22,12 +22,15 @@ import java.io.InputStream
import java.util.{Map => JMap}
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
+import akka.util.ByteString
+
import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
@@ -142,10 +145,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
groupId: String,
topics: JMap[String, JInt])
: JavaDStream[String] = {
- implicit val cmt: ClassManifest[String] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
StorageLevel.MEMORY_ONLY_SER_2)
+
}
/**
@@ -163,8 +167,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaDStream[String] = {
- implicit val cmt: ClassManifest[String] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
@@ -186,8 +190,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
ssc.kafkaStream[T, D](
kafkaParams.toMap,
@@ -237,8 +241,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
storageLevel: StorageLevel)
: JavaDStream[T] = {
def fn = (x: InputStream) => converter.apply(x).toIterator
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
}
@@ -266,8 +270,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
hostname: String,
port: Int,
storageLevel: StorageLevel): JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
}
@@ -281,8 +285,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of the objects in the received blocks
*/
def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
}
@@ -296,12 +300,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
- implicit val cmf: ClassManifest[F] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
+ implicit val cmk: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val cmv: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val cmf: ClassTag[F] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
ssc.fileStream[K, V, F](directory);
}
@@ -396,7 +400,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def twitterStream(): JavaDStream[Status] = {
ssc.twitterStream()
}
-
+
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
@@ -414,8 +418,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
}
@@ -435,8 +439,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
name: String,
storageLevel: StorageLevel
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel)
}
@@ -454,8 +458,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
props: Props,
name: String
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name)
}
@@ -472,12 +476,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def zeroMQStream[T](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
}
@@ -497,9 +501,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
@@ -517,9 +521,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
@@ -539,8 +543,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue)
@@ -556,8 +560,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime)
@@ -577,8 +581,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
queue: java.util.Queue[JavaRDD[T]],
oneAtATime: Boolean,
defaultRDD: JavaRDD[T]): JavaDStream[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index fea0573b77..39e25239bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,14 +26,16 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.{HashSet, HashMap}
+import scala.reflect.ClassTag
+
import java.io.{ObjectInputStream, IOException}
private[streaming]
-class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
+class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
- newFilesOnly: Boolean = true)
+ newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
@@ -54,7 +56,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
-
+
override def stop() { }
/**
@@ -100,7 +102,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
return true
- }
+ }
}
}
logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
@@ -195,5 +197,3 @@ private[streaming]
object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
index 18de772946..b72ab90e60 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
@@ -34,7 +35,7 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
private[streaming]
-class FlumeInputDStream[T: ClassManifest](
+class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 674b27118c..f01e67fe13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -19,6 +19,8 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
+import scala.reflect.ClassTag
+
/**
* This is the abstract base class for all input streams. This class provides to methods
* start() and stop() which called by the scheduler to start and stop receiving data/
@@ -30,7 +32,7 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
* that requires running a receiver on the worker nodes, use NetworkInputDStream
* as the parent class.
*/
-abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
+abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index 51e913675d..96134868cc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -35,18 +35,18 @@ import org.I0Itec.zkclient._
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-
+import scala.reflect.ClassTag
/**
* Input stream that pulls messages from a Kafka Broker.
- *
+ *
* @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*/
private[streaming]
-class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest](
+class KafkaInputDStream[T: ClassTag, D <: Decoder[_]: Manifest](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
@@ -61,7 +61,7 @@ class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest](
}
private[streaming]
-class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
+class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
@@ -102,7 +102,7 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
}
// Create Threads for each Topic/Message Stream we are listening
- val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]]
+ val decoder = manifest[D].runtimeClass.newInstance.asInstanceOf[Decoder[T]]
val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
// Start the messages handler for each partition
@@ -112,7 +112,7 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
}
// Handles Kafka Messages
- private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
+ private class MessageHandler[T: ClassTag](stream: KafkaStream[T]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
@@ -135,7 +135,7 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
zk.deleteRecursive(dir)
zk.close()
} catch {
- case _ => // swallow
+ case _ : Throwable => // swallow
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 31f9891560..394a39fbb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -21,11 +21,12 @@ import java.util.concurrent.ArrayBlockingQueue
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
import akka.actor.{Props, Actor}
import akka.pattern.ask
-import akka.dispatch.Await
-import akka.util.duration._
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.streaming._
@@ -42,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
+abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
// This is an unique identifier that is used to match the network receiver with the
@@ -84,7 +85,7 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
* Abstract class of a receiver that can be run on worker nodes to receive external data. See
* [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation.
*/
-abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging {
+abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
initLogging()
@@ -176,7 +177,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
logInfo("Attempting to register with tracker")
val ip = System.getProperty("spark.driver.host", "localhost")
val port = System.getProperty("spark.driver.port", "7077").toInt
- val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
+ val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorFor(url)
val timeout = 5.seconds
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index c91f12ecd7..a4746f06ad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -21,6 +21,8 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
+import scala.reflect.ClassTag
+
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
@@ -35,7 +37,7 @@ import java.util.concurrent.ArrayBlockingQueue
* in the format that the system is configured with.
*/
private[streaming]
-class RawInputDStream[T: ClassManifest](
+class RawInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index b88a4db959..db56345ca8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -28,8 +28,11 @@ import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
private[streaming]
-class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
+class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -49,7 +52,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)
- // Reduce each batch of data using reduceByKey which will be further reduced by window
+ // Reduce each batch of data using reduceByKey which will be further reduced by window
// by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
@@ -170,5 +173,3 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
}
}
}
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index e2539c7396..2cdd13f205 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -21,11 +21,13 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NextIterator
+import scala.reflect.ClassTag
+
import java.io._
import java.net.Socket
private[streaming]
-class SocketInputDStream[T: ClassManifest](
+class SocketInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
@@ -39,7 +41,7 @@ class SocketInputDStream[T: ClassManifest](
}
private[streaming]
-class SocketReceiver[T: ClassManifest](
+class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 362a6bf4cc..e0ff3ccba4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -23,8 +23,10 @@ import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, Time, DStream}
+import scala.reflect.ClassTag
+
private[streaming]
-class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
+class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index c696bb70a8..0d84ec84f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -22,8 +22,11 @@ import org.apache.spark.rdd.RDD
import collection.mutable.ArrayBuffer
import org.apache.spark.rdd.UnionRDD
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
private[streaming]
-class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
+class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
if (parents.length == 0) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 3c57294269..73d959331a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -22,8 +22,10 @@ import org.apache.spark.rdd.UnionRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import scala.reflect.ClassTag
+
private[streaming]
-class WindowedDStream[T: ClassManifest](
+class WindowedDStream[T: ClassTag](
parent: DStream[T],
_windowDuration: Duration,
_slideDuration: Duration)
@@ -52,6 +54,3 @@ class WindowedDStream[T: ClassManifest](
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}
-
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index 4b5d8c467e..ee087a1cf0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -20,6 +20,10 @@ package org.apache.spark.streaming.receivers
import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
import akka.actor.{ actorRef2Scala, ActorRef }
import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+import akka.actor.SupervisorStrategy._
+
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.NetworkReceiver
@@ -28,12 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
-/** A helper with set of defaults for supervisor strategy **/
+/** A helper with set of defaults for supervisor strategy */
object ReceiverSupervisorStrategy {
- import akka.util.duration._
- import akka.actor.SupervisorStrategy._
-
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException ⇒ Restart
@@ -48,10 +49,10 @@ object ReceiverSupervisorStrategy {
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
*
* @example {{{
- * class MyActor extends Actor with Receiver{
- * def receive {
- * case anything :String ⇒ pushBlock(anything)
- * }
+ * class MyActor extends Actor with Receiver{
+ * def receive {
+ * case anything :String => pushBlock(anything)
+ * }
* }
* //Can be plugged in actorStream as follows
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
@@ -65,11 +66,11 @@ object ReceiverSupervisorStrategy {
*
*/
trait Receiver { self: Actor ⇒
- def pushBlock[T: ClassManifest](iter: Iterator[T]) {
+ def pushBlock[T: ClassTag](iter: Iterator[T]) {
context.parent ! Data(iter)
}
- def pushBlock[T: ClassManifest](data: T) {
+ def pushBlock[T: ClassTag](data: T) {
context.parent ! Data(data)
}
@@ -83,8 +84,8 @@ case class Statistics(numberOfMsgs: Int,
numberOfHiccups: Int,
otherInfo: String)
-/** Case class to receive data sent by child actors **/
-private[streaming] case class Data[T: ClassManifest](data: T)
+/** Case class to receive data sent by child actors */
+private[streaming] case class Data[T: ClassTag](data: T)
/**
* Provides Actors as receivers for receiving stream.
@@ -95,19 +96,19 @@ private[streaming] case class Data[T: ClassManifest](data: T)
* his own Actor to run as receiver for Spark Streaming input source.
*
* This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
- *
+ * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ *
* Here's a way to start more supervisor/workers as its children.
*
* @example {{{
- * context.parent ! Props(new Supervisor)
+ * context.parent ! Props(new Supervisor)
* }}} OR {{{
* context.parent ! Props(new Worker,"Worker")
* }}}
*
*
*/
-private[streaming] class ActorReceiver[T: ClassManifest](
+private[streaming] class ActorReceiver[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel,
@@ -120,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassManifest](
protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
- private class Supervisor extends Actor {
+ class Supervisor extends Actor {
override val supervisorStrategy = receiverSupervisorStrategy
val worker = context.actorOf(props, name)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
index 043bb8c8bf..ce8c56fa8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -18,19 +18,22 @@
package org.apache.spark.streaming.receivers
import akka.actor.Actor
+import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
+import scala.reflect.ClassTag
+
/**
* A receiver to subscribe to ZeroMQ stream.
*/
-private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
+private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+ override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
Connect(publisherUrl), subscribe)
def receive: Receive = {
@@ -38,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
case Connecting ⇒ logInfo("connecting ...")
case m: ZMQMessage ⇒
- logDebug("Received message for:" + m.firstFrameAsString)
+ logDebug("Received message for:" + m.frame(0))
//We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail.map(_.payload)
+ val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes))
case Closed ⇒ logInfo("received closed ")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 6977957126..4a3993e3e3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -25,6 +25,7 @@ import StreamingContext._
import scala.util.Random
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
import java.io.{File, ObjectInputStream, IOException}
import java.util.UUID
@@ -120,7 +121,7 @@ object MasterFailureTest extends Logging {
* Tests stream operation with multiple master failures, and verifies whether the
* final set of output values is as expected or not.
*/
- def testOperation[T: ClassManifest](
+ def testOperation[T: ClassTag](
directory: String,
batchDuration: Duration,
input: Seq[String],
@@ -158,7 +159,7 @@ object MasterFailureTest extends Logging {
* and batch duration. Returns the streaming context and the directory to which
* files should be written for testing.
*/
- private def setupStreams[T: ClassManifest](
+ private def setupStreams[T: ClassTag](
directory: String,
batchDuration: Duration,
operation: DStream[String] => DStream[T]
@@ -192,7 +193,7 @@ object MasterFailureTest extends Logging {
* Repeatedly starts and kills the streaming context until timed out or
* the last expected output is generated. Finally, return
*/
- private def runStreams[T: ClassManifest](
+ private def runStreams[T: ClassTag](
ssc_ : StreamingContext,
lastExpectedOutput: T,
maxTimeToRun: Long
@@ -274,7 +275,7 @@ object MasterFailureTest extends Logging {
* duplicate batch outputs of values from the `output`. As a result, the
* expected output should not have consecutive batches with the same values as output.
*/
- private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
+ private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
assert(expectedOutput(i) != expectedOutput(i+1),
@@ -305,7 +306,7 @@ object MasterFailureTest extends Logging {
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
private[streaming]
-class TestOutputStream[T: ClassManifest](
+class TestOutputStream[T: ClassTag](
parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
) extends ForEachDStream[T](
@@ -380,24 +381,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
- var done = false
- while (!done && tries < maxTries) {
- tries += 1
- try {
- // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
- fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
- fs.rename(tempHadoopFile, hadoopFile)
- done = true
- } catch {
- case ioe: IOException => {
- fs = testDir.getFileSystem(new Configuration())
- logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
- }
- }
+ var done = false
+ while (!done && tries < maxTries) {
+ tries += 1
+ try {
+ // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+ fs.rename(tempHadoopFile, hadoopFile)
+ done = true
+ } catch {
+ case ioe: IOException => {
+ fs = testDir.getFileSystem(new Configuration())
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ }
+ }
}
- if (!done)
+ if (!done)
logError("Could not generate file " + hadoopFile)
- else
+ else
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
Thread.sleep(interval)
localFile.delete()
@@ -411,5 +412,3 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
}
}
}
-
-
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..783b8dea31 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -48,7 +48,7 @@ import java.util.*;
import akka.actor.Props;
import akka.zeromq.Subscribe;
-
+import akka.util.ByteString;
// The test suite itself is Serializable so that anonymous Function implementations can be
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 8a6604904d..d5cdad4998 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -17,7 +17,9 @@
package org.apache.spark.streaming
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
+
import java.util.{List => JList}
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import org.apache.spark.streaming._
@@ -31,15 +33,15 @@ trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context.
* The stream will be derived from the supplied lists of Java objects.
- **/
+ */
def attachTestInputStream[T](
ssc: JavaStreamingContext,
data: JList[JList[T]],
numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
@@ -52,8 +54,8 @@ trait JavaTestBase extends TestSuiteBase {
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
dstream: JavaDStreamLike[T, This, R]) =
{
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStream(dstream.dstream,
new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
dstream.dstream.ssc.registerOutputStream(ostream)
@@ -66,8 +68,8 @@ trait JavaTestBase extends TestSuiteBase {
*/
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
- implicit val cm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a327de80b3..07de51bebb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -20,14 +20,20 @@ package org.apache.spark.streaming
import dstream.FileInputDStream
import org.apache.spark.streaming.StreamingContext._
import java.io.File
-import runtime.RichInt
-import org.scalatest.BeforeAndAfter
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
import org.apache.commons.io.FileUtils
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import util.{Clock, ManualClock}
-import scala.util.Random
+import org.scalatest.BeforeAndAfter
+
import com.google.common.io.Files
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.FileInputDStream
+import org.apache.spark.streaming.util.ManualClock
+
+
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -74,13 +80,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Setup the streams
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(stateStreamCheckpointInterval)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
var ssc = setupStreams(input, operation)
var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
@@ -180,13 +186,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(batchDuration * 2)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
testCheckpointedOperation(input, operation, output, 7)
}
@@ -312,7 +318,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* NOTE: This takes into consideration that the last batch processed before
* master failure will be re-processed after restart/recovery.
*/
- def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
+ def testCheckpointedOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -356,7 +362,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
- def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
+ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 37dd9c4cc6..c91f9ba46d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -20,8 +20,9 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream}
import org.apache.spark.streaming.util.ManualClock
-import collection.mutable.ArrayBuffer
-import collection.mutable.SynchronizedBuffer
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.SynchronizedBuffer
+import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
@@ -35,7 +36,7 @@ import org.apache.spark.rdd.RDD
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
-class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](ssc_) {
def start() {}
@@ -61,7 +62,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* This is a output stream just for the testsuites. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
-class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
+class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
@@ -106,7 +107,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
): StreamingContext = {
@@ -130,7 +131,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the binary operation using the sequence
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
@@ -158,7 +159,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*/
- def runStreams[V: ClassManifest](
+ def runStreams[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
@@ -214,7 +215,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* is same as the expected output values, by comparing the output
* collections either as lists (order matters) or sets (order does not matter)
*/
- def verifyOutput[V: ClassManifest](
+ def verifyOutput[V: ClassTag](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
useSet: Boolean
@@ -244,7 +245,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test unary DStream operation with a list of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -262,7 +263,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -279,7 +280,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test binary DStream operation with two lists of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],
@@ -299,7 +300,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],