aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-02 14:05:51 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-02 14:05:51 -0400
commit43b81eb2719c4666b7869d7d0290f2ee83daeafa (patch)
treef3ff92c219d541d4b25819f0d4bc0b2ef3f007a0 /streaming
parent29bf44473c9d76622628f2511588f7846e9b1f3c (diff)
downloadspark-43b81eb2719c4666b7869d7d0290f2ee83daeafa.tar.gz
spark-43b81eb2719c4666b7869d7d0290f2ee83daeafa.tar.bz2
spark-43b81eb2719c4666b7869d7d0290f2ee83daeafa.zip
Renamed RDS to DStream, plus minor style fixes
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ConstantInputRDS.scala)4
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala (renamed from streaming/src/main/scala/spark/streaming/RDS.scala)130
-rw-r--r--streaming/src/main/scala/spark/streaming/FileInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FileInputRDS.scala)16
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala (renamed from streaming/src/main/scala/spark/streaming/PairRDSFunctions.scala)28
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/QueueInputRDS.scala)4
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ReducedWindowedRDS.scala)38
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/SparkStreamContext.scala58
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/WindowedRDS.scala)16
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuite.scala (renamed from streaming/src/test/scala/spark/streaming/RDSSuite.scala)20
16 files changed, 183 insertions, 185 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputRDS.scala b/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala
index bf2e6f7e16..6a2be34633 100644
--- a/streaming/src/main/scala/spark/streaming/ConstantInputRDS.scala
+++ b/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala
@@ -5,8 +5,8 @@ import spark.RDD
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
-class ConstantInputRDS[T: ClassManifest](ssc: SparkStreamContext, rdd: RDD[T])
- extends InputRDS[T](ssc) {
+class ConstantInputDStream[T: ClassManifest](ssc: SparkStreamContext, rdd: RDD[T])
+ extends InputDStream[T](ssc) {
override def start() {}
diff --git a/streaming/src/main/scala/spark/streaming/RDS.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index fd923929e7..e19d2ecef5 100644
--- a/streaming/src/main/scala/spark/streaming/RDS.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -15,7 +15,7 @@ import scala.collection.mutable.HashMap
import java.util.concurrent.ArrayBlockingQueue
-abstract class RDS[T: ClassManifest] (@transient val ssc: SparkStreamContext)
+abstract class DStream[T: ClassManifest] (@transient val ssc: SparkStreamContext)
extends Logging with Serializable {
initLogging()
@@ -26,25 +26,25 @@ extends Logging with Serializable {
* ----------------------------------------------
*/
- // Time by which the window slides in this RDS
+ // Time by which the window slides in this DStream
def slideTime: Time
- // List of parent RDSs on which this RDS depends on
- def dependencies: List[RDS[_]]
+ // List of parent DStreams on which this DStream depends on
+ def dependencies: List[DStream[_]]
// Key method that computes RDD for a valid time
def compute (validTime: Time): Option[RDD[T]]
/**
* ---------------------------------------
- * Other general fields and methods of RDS
+ * Other general fields and methods of DStream
* ---------------------------------------
*/
// Variable to store the RDDs generated earlier in time
@transient private val generatedRDDs = new HashMap[Time, RDD[T]] ()
- // Variable to be set to the first time seen by the RDS (effective time zero)
+ // Variable to be set to the first time seen by the DStream (effective time zero)
private[streaming] var zeroTime: Time = null
// Variable to specify storage level
@@ -58,11 +58,11 @@ extends Logging with Serializable {
def persist(
storageLevel: StorageLevel,
checkpointLevel: StorageLevel,
- checkpointInterval: Time): RDS[T] = {
+ checkpointInterval: Time): DStream[T] = {
if (this.storageLevel != StorageLevel.NONE && this.storageLevel != storageLevel) {
- // TODO: not sure this is necessary for RDSes
+ // TODO: not sure this is necessary for DStreams
throw new UnsupportedOperationException(
- "Cannot change storage level of an RDS after it was already assigned a level")
+ "Cannot change storage level of an DStream after it was already assigned a level")
}
this.storageLevel = storageLevel
this.checkpointLevel = checkpointLevel
@@ -70,20 +70,20 @@ extends Logging with Serializable {
this
}
- // Set caching level for the RDDs created by this RDS
- def persist(newLevel: StorageLevel): RDS[T] = persist(newLevel, StorageLevel.NONE, null)
+ // Set caching level for the RDDs created by this DStream
+ def persist(newLevel: StorageLevel): DStream[T] = persist(newLevel, StorageLevel.NONE, null)
- def persist(): RDS[T] = persist(StorageLevel.MEMORY_ONLY_DESER)
+ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_DESER)
// Turn on the default caching level for this RDD
- def cache(): RDS[T] = persist()
+ def cache(): DStream[T] = persist()
def isInitialized = (zeroTime != null)
/**
- * This method initializes the RDS by setting the "zero" time, based on which
+ * This method initializes the DStream by setting the "zero" time, based on which
* the validity of future times is calculated. This method also recursively initializes
- * its parent RDSs.
+ * its parent DStreams.
*/
def initialize(time: Time) {
if (zeroTime == null) {
@@ -105,20 +105,20 @@ extends Logging with Serializable {
}
/**
- * This method either retrieves a precomputed RDD of this RDS,
+ * This method either retrieves a precomputed RDD of this DStream,
* or computes the RDD (if the time is valid)
*/
def getOrCompute(time: Time): Option[RDD[T]] = {
- // If this RDS was not initialized (i.e., zeroTime not set), then do it
+ // If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// If an RDD was already generated and is being reused, then
- // probably all RDDs in this RDS will be reused and hence should be cached
+ // probably all RDDs in this DStream will be reused and hence should be cached
case Some(oldRDD) => Some(oldRDD)
// if RDD was not generated, and if the time is valid
- // (based on sliding time of this RDS), then generate the RDD
+ // (based on sliding time of this DStream), then generate the RDD
case None =>
if (isTimeValid(time)) {
compute(time) match {
@@ -160,21 +160,21 @@ extends Logging with Serializable {
/**
* --------------
- * RDS operations
+ * DStream operations
* --------------
*/
- def map[U: ClassManifest](mapFunc: T => U) = new MappedRDS(this, ssc.sc.clean(mapFunc))
+ def map[U: ClassManifest](mapFunc: T => U) = new MappedDStream(this, ssc.sc.clean(mapFunc))
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) =
- new FlatMappedRDS(this, ssc.sc.clean(flatMapFunc))
+ new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
- def filter(filterFunc: T => Boolean) = new FilteredRDS(this, filterFunc)
+ def filter(filterFunc: T => Boolean) = new FilteredDStream(this, filterFunc)
- def glom() = new GlommedRDS(this)
+ def glom() = new GlommedDStream(this)
def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) =
- new MapPartitionedRDS(this, ssc.sc.clean(mapPartFunc))
+ new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2)
@@ -183,18 +183,18 @@ extends Logging with Serializable {
def collect() = this.map(x => (1, x)).groupByKey(1).map(_._2)
def foreach(foreachFunc: T => Unit) = {
- val newrds = new PerElementForEachRDS(this, ssc.sc.clean(foreachFunc))
- ssc.registerOutputStream(newrds)
- newrds
+ val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc))
+ ssc.registerOutputStream(newStream)
+ newStream
}
def foreachRDD(foreachFunc: RDD[T] => Unit) = {
- val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc))
- ssc.registerOutputStream(newrds)
- newrds
+ val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ ssc.registerOutputStream(newStream)
+ newStream
}
- private[streaming] def toQueue() = {
+ private[streaming] def toQueue = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
println("Added RDD " + rdd.id)
@@ -213,12 +213,12 @@ extends Logging with Serializable {
if (first11.size > 10) println("...")
println()
}
- val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc))
- ssc.registerOutputStream(newrds)
- newrds
+ val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ ssc.registerOutputStream(newStream)
+ newStream
}
- def window(windowTime: Time, slideTime: Time) = new WindowedRDS(this, windowTime, slideTime)
+ def window(windowTime: Time, slideTime: Time) = new WindowedDStream(this, windowTime, slideTime)
def batch(batchTime: Time) = window(batchTime, batchTime)
@@ -241,15 +241,17 @@ extends Logging with Serializable {
this.map(_ => 1).reduceByWindow(add _, subtract _, windowTime, slideTime)
}
- def union(that: RDS[T]) = new UnifiedRDS(Array(this, that))
+ def union(that: DStream[T]) = new UnifiedDStream(Array(this, that))
- def register() = ssc.registerOutputStream(this)
+ def register() {
+ ssc.registerOutputStream(this)
+ }
}
-abstract class InputRDS[T: ClassManifest] (
+abstract class InputDStream[T: ClassManifest] (
ssc: SparkStreamContext)
-extends RDS[T](ssc) {
+extends DStream[T](ssc) {
override def dependencies = List()
@@ -265,10 +267,10 @@ extends RDS[T](ssc) {
* TODO
*/
-class MappedRDS[T: ClassManifest, U: ClassManifest] (
- parent: RDS[T],
+class MappedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
mapFunc: T => U)
-extends RDS[U](parent.ssc) {
+extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -284,10 +286,10 @@ extends RDS[U](parent.ssc) {
* TODO
*/
-class FlatMappedRDS[T: ClassManifest, U: ClassManifest](
- parent: RDS[T],
+class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
flatMapFunc: T => Traversable[U])
-extends RDS[U](parent.ssc) {
+extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -303,8 +305,8 @@ extends RDS[U](parent.ssc) {
* TODO
*/
-class FilteredRDS[T: ClassManifest](parent: RDS[T], filterFunc: T => Boolean)
-extends RDS[T](parent.ssc) {
+class FilteredDStream[T: ClassManifest](parent: DStream[T], filterFunc: T => Boolean)
+extends DStream[T](parent.ssc) {
override def dependencies = List(parent)
@@ -320,10 +322,10 @@ extends RDS[T](parent.ssc) {
* TODO
*/
-class MapPartitionedRDS[T: ClassManifest, U: ClassManifest](
- parent: RDS[T],
+class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U])
-extends RDS[U](parent.ssc) {
+extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -339,7 +341,7 @@ extends RDS[U](parent.ssc) {
* TODO
*/
-class GlommedRDS[T: ClassManifest](parent: RDS[T]) extends RDS[Array[T]](parent.ssc) {
+class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
@@ -355,13 +357,13 @@ class GlommedRDS[T: ClassManifest](parent: RDS[T]) extends RDS[Array[T]](parent.
* TODO
*/
-class ShuffledRDS[K: ClassManifest, V: ClassManifest, C: ClassManifest](
- parent: RDS[(K,V)],
+class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
numPartitions: Int)
- extends RDS [(K,C)] (parent.ssc) {
+ extends DStream [(K,C)] (parent.ssc) {
override def dependencies = List(parent)
@@ -388,8 +390,8 @@ class ShuffledRDS[K: ClassManifest, V: ClassManifest, C: ClassManifest](
* TODO
*/
-class UnifiedRDS[T: ClassManifest](parents: Array[RDS[T]])
-extends RDS[T](parents(0).ssc) {
+class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
+extends DStream[T](parents(0).ssc) {
if (parents.length == 0) {
throw new IllegalArgumentException("Empty array of parents")
@@ -426,10 +428,10 @@ extends RDS[T](parents(0).ssc) {
* TODO
*/
-class PerElementForEachRDS[T: ClassManifest] (
- parent: RDS[T],
+class PerElementForEachDStream[T: ClassManifest] (
+ parent: DStream[T],
foreachFunc: T => Unit)
-extends RDS[Unit](parent.ssc) {
+extends DStream[Unit](parent.ssc) {
override def dependencies = List(parent)
@@ -457,12 +459,12 @@ extends RDS[Unit](parent.ssc) {
* TODO
*/
-class PerRDDForEachRDS[T: ClassManifest] (
- parent: RDS[T],
+class PerRDDForEachDStream[T: ClassManifest] (
+ parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit)
-extends RDS[Unit](parent.ssc) {
+extends DStream[Unit](parent.ssc) {
- def this(parent: RDS[T], altForeachFunc: (RDD[T]) => Unit) =
+ def this(parent: DStream[T], altForeachFunc: (RDD[T]) => Unit) =
this(parent, (rdd: RDD[T], time: Time) => altForeachFunc(rdd))
override def dependencies = List(parent)
diff --git a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
index ebd246823d..88aa375289 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala
+++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
@@ -18,12 +18,12 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
+class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
ssc: SparkStreamContext,
directory: Path,
- filter: PathFilter = FileInputRDS.defaultPathFilter,
+ filter: PathFilter = FileInputDStream.defaultPathFilter,
newFilesOnly: Boolean = true)
- extends InputRDS[(K, V)](ssc) {
+ extends InputDStream[(K, V)](ssc) {
val fs = directory.getFileSystem(new Configuration())
var lastModTime: Long = 0
@@ -69,7 +69,7 @@ class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V]
}
}
-object FileInputRDS {
+object FileInputDStream {
val defaultPathFilter = new PathFilter {
def accept(path: Path): Boolean = {
val file = path.getName()
@@ -83,12 +83,12 @@ object FileInputRDS {
}
/*
-class NetworkInputRDS[T: ClassManifest](
+class NetworkInputDStream[T: ClassManifest](
val networkInputName: String,
val addresses: Array[InetSocketAddress],
batchDuration: Time,
ssc: SparkStreamContext)
-extends InputRDS[T](networkInputName, batchDuration, ssc) {
+extends InputDStream[T](networkInputName, batchDuration, ssc) {
// TODO(Haoyuan): This is for the performance test.
@@ -139,11 +139,11 @@ extends InputRDS[T](networkInputName, batchDuration, ssc) {
}
-class TestInputRDS(
+class TestInputDStream(
val testInputName: String,
batchDuration: Time,
ssc: SparkStreamContext)
-extends InputRDS[String](testInputName, batchDuration, ssc) {
+extends InputDStream[String](testInputName, batchDuration, ssc) {
@transient val references = new HashMap[Time,Array[String]]
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index 1960097216..088cbe4376 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -1,7 +1,6 @@
package spark.streaming
-case class Interval (val beginTime: Time, val endTime: Time) {
-
+case class Interval (beginTime: Time, endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs))
def duration(): Time = endTime - beginTime
@@ -33,7 +32,7 @@ case class Interval (val beginTime: Time, val endTime: Time) {
this + (endTime - beginTime)
}
- def isZero() = (beginTime.isZero && endTime.isZero)
+ def isZero = (beginTime.isZero && endTime.isZero)
def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString
@@ -41,7 +40,6 @@ case class Interval (val beginTime: Time, val endTime: Time) {
}
object Interval {
-
def zero() = new Interval (Time.zero, Time.zero)
def currentInterval(intervalDuration: Time): Interval = {
@@ -49,7 +47,6 @@ object Interval {
val intervalBegin = time.floor(intervalDuration)
Interval(intervalBegin, intervalBegin + intervalDuration)
}
-
}
diff --git a/streaming/src/main/scala/spark/streaming/PairRDSFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 403ae233a5..0cf296f21a 100644
--- a/streaming/src/main/scala/spark/streaming/PairRDSFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -3,23 +3,23 @@ package spark.streaming
import scala.collection.mutable.ArrayBuffer
import spark.streaming.SparkStreamContext._
-class PairRDSFunctions[K: ClassManifest, V: ClassManifest](rds: RDS[(K,V)])
+class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)])
extends Serializable {
- def ssc = rds.ssc
+ def ssc = stream.ssc
/* ---------------------------------- */
- /* RDS operations for key-value pairs */
+ /* DStream operations for key-value pairs */
/* ---------------------------------- */
- def groupByKey(numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = {
+ def groupByKey(numPartitions: Int = 0): ShuffledDStream[K, V, ArrayBuffer[V]] = {
def createCombiner(v: V) = ArrayBuffer[V](v)
def mergeValue(c: ArrayBuffer[V], v: V) = (c += v)
def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2)
combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, numPartitions)
}
- def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int = 0): ShuffledRDS[K, V, V] = {
+ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int = 0): ShuffledDStream[K, V, V] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, numPartitions)
}
@@ -28,23 +28,23 @@ extends Serializable {
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- numPartitions: Int) : ShuffledRDS[K, V, C] = {
- new ShuffledRDS[K, V, C](rds, createCombiner, mergeValue, mergeCombiner, numPartitions)
+ numPartitions: Int) : ShuffledDStream[K, V, C] = {
+ new ShuffledDStream[K, V, C](stream, createCombiner, mergeValue, mergeCombiner, numPartitions)
}
def groupByKeyAndWindow(
windowTime: Time,
slideTime: Time,
- numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = {
- rds.window(windowTime, slideTime).groupByKey(numPartitions)
+ numPartitions: Int = 0): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ stream.window(windowTime, slideTime).groupByKey(numPartitions)
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowTime: Time,
slideTime: Time,
- numPartitions: Int = 0): ShuffledRDS[K, V, V] = {
- rds.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), numPartitions)
+ numPartitions: Int = 0): ShuffledDStream[K, V, V] = {
+ stream.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), numPartitions)
}
// This method is the efficient sliding window reduce operation,
@@ -57,10 +57,10 @@ extends Serializable {
invReduceFunc: (V, V) => V,
windowTime: Time,
slideTime: Time,
- numPartitions: Int): ReducedWindowedRDS[K, V] = {
+ numPartitions: Int): ReducedWindowedDStream[K, V] = {
- new ReducedWindowedRDS[K, V](
- rds,
+ new ReducedWindowedDStream[K, V](
+ stream,
ssc.sc.clean(reduceFunc),
ssc.sc.clean(invReduceFunc),
windowTime,
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputRDS.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index 31e6a64e21..c78abd1a87 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputRDS.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -6,12 +6,12 @@ import spark.UnionRDD
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
-class QueueInputRDS[T: ClassManifest](
+class QueueInputDStream[T: ClassManifest](
ssc: SparkStreamContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
- ) extends InputRDS[T](ssc) {
+ ) extends InputDStream[T](ssc) {
override def start() { }
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedRDS.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index dd1f474657..11fa4e5443 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedRDS.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -11,28 +11,28 @@ import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
-class ReducedWindowedRDS[K: ClassManifest, V: ClassManifest](
- parent: RDS[(K, V)],
+class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
+ parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
_windowTime: Time,
_slideTime: Time,
numPartitions: Int)
-extends RDS[(K,V)](parent.ssc) {
+extends DStream[(K,V)](parent.ssc) {
if (!_windowTime.isMultipleOf(parent.slideTime))
- throw new Exception("The window duration of ReducedWindowedRDS (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+ throw new Exception("The window duration of ReducedWindowedDStream (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
if (!_slideTime.isMultipleOf(parent.slideTime))
- throw new Exception("The slide duration of ReducedWindowedRDS (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+ throw new Exception("The slide duration of ReducedWindowedDStream (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- val reducedRDS = parent.reduceByKey(reduceFunc, numPartitions)
+ val reducedStream = parent.reduceByKey(reduceFunc, numPartitions)
val allowPartialWindows = true
- //reducedRDS.persist(StorageLevel.MEMORY_ONLY_DESER_2)
+ //reducedStream.persist(StorageLevel.MEMORY_ONLY_DESER_2)
- override def dependencies = List(reducedRDS)
+ override def dependencies = List(reducedStream)
def windowTime: Time = _windowTime
@@ -41,9 +41,9 @@ extends RDS[(K,V)](parent.ssc) {
override def persist(
storageLevel: StorageLevel,
checkpointLevel: StorageLevel,
- checkpointInterval: Time): RDS[(K,V)] = {
+ checkpointInterval: Time): DStream[(K,V)] = {
super.persist(storageLevel, checkpointLevel, checkpointInterval)
- reducedRDS.persist(storageLevel, checkpointLevel, checkpointInterval)
+ reducedStream.persist(storageLevel, checkpointLevel, checkpointInterval)
}
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
@@ -80,7 +80,7 @@ extends RDS[(K,V)](parent.ssc) {
if (allowPartialWindows) {
if (currentTime - slideTime == parent.zeroTime) {
- reducedRDS.getOrCompute(currentTime) match {
+ reducedStream.getOrCompute(currentTime) match {
case Some(rdd) => return Some(rdd)
case None => throw new Exception("Could not get first reduced RDD for time " + currentTime)
}
@@ -94,11 +94,11 @@ extends RDS[(K,V)](parent.ssc) {
val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]()
var t = currentWindow.endTime
while (t > currentWindow.beginTime) {
- reducedRDS.getOrCompute(t) match {
+ reducedStream.getOrCompute(t) match {
case Some(rdd) => reducedRDDs += rdd
case None => throw new Exception("Could not get reduced RDD for time " + t)
}
- t -= reducedRDS.slideTime
+ t -= reducedStream.slideTime
}
if (reducedRDDs.size == 0) {
throw new Exception("Could not generate the first RDD for time " + validTime)
@@ -120,21 +120,21 @@ extends RDS[(K,V)](parent.ssc) {
// Get the RDDs of the reduced values in "old time steps"
var t = currentWindow.beginTime
while (t > previousWindow.beginTime) {
- reducedRDS.getOrCompute(t) match {
+ reducedStream.getOrCompute(t) match {
case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]]
case None => throw new Exception("Could not get old reduced RDD for time " + t)
}
- t -= reducedRDS.slideTime
+ t -= reducedStream.slideTime
}
// Get the RDDs of the reduced values in "new time steps"
t = currentWindow.endTime
while (t > previousWindow.endTime) {
- reducedRDS.getOrCompute(t) match {
+ reducedStream.getOrCompute(t) match {
case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]]
case None => throw new Exception("Could not get new reduced RDD for time " + t)
}
- t -= reducedRDS.slideTime
+ t -= reducedStream.slideTime
}
val partitioner = new HashPartitioner(numPartitions)
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 83f874e550..fff4924b4c 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -9,12 +9,11 @@ import scala.collection.mutable.HashMap
sealed trait SchedulerMessage
case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage
-case class Test extends SchedulerMessage
class Scheduler(
ssc: SparkStreamContext,
- inputRDSs: Array[InputRDS[_]],
- outputRDSs: Array[RDS[_]])
+ inputStreams: Array[InputDStream[_]],
+ outputStreams: Array[DStream[_]])
extends Logging {
initLogging()
@@ -26,21 +25,21 @@ extends Logging {
def start() {
val zeroTime = Time(timer.start())
- outputRDSs.foreach(_.initialize(zeroTime))
- inputRDSs.par.foreach(_.start())
+ outputStreams.foreach(_.initialize(zeroTime))
+ inputStreams.par.foreach(_.start())
logInfo("Scheduler started")
}
def stop() {
timer.stop()
- inputRDSs.par.foreach(_.stop())
+ inputStreams.par.foreach(_.stop())
logInfo("Scheduler stopped")
}
def generateRDDs (time: Time) {
logInfo("Generating RDDs for time " + time)
- outputRDSs.foreach(outputRDS => {
- outputRDS.generateJob(time) match {
+ outputStreams.foreach(outputStream => {
+ outputStream.generateJob(time) match {
case Some(job) => submitJob(job)
case None =>
}
diff --git a/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala b/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala
index d32f6d588c..2bec1091c0 100644
--- a/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala
+++ b/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala
@@ -31,8 +31,8 @@ class SparkStreamContext (
val sc = new SparkContext(master, frameworkName, sparkHome, jars)
val env = SparkEnv.get
- val inputRDSs = new ArrayBuffer[InputRDS[_]]()
- val outputRDSs = new ArrayBuffer[RDS[_]]()
+ val inputStreams = new ArrayBuffer[InputDStream[_]]()
+ val outputStreams = new ArrayBuffer[DStream[_]]()
var batchDuration: Time = null
var scheduler: Scheduler = null
@@ -48,17 +48,17 @@ class SparkStreamContext (
def createNetworkStream[T: ClassManifest](
name: String,
addresses: Array[InetSocketAddress],
- batchDuration: Time): RDS[T] = {
+ batchDuration: Time): DStream[T] = {
- val inputRDS = new NetworkInputRDS[T](this, addresses)
- inputRDSs += inputRDS
- inputRDS
+ val inputStream = new NetworkinputStream[T](this, addresses)
+ inputStreams += inputStream
+ inputStream
}
def createNetworkStream[T: ClassManifest](
name: String,
addresses: Array[String],
- batchDuration: Long): RDS[T] = {
+ batchDuration: Long): DStream[T] = {
def stringToInetSocketAddress (str: String): InetSocketAddress = {
val parts = str.split(":")
@@ -83,13 +83,13 @@ class SparkStreamContext (
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
- ](directory: String): RDS[(K, V)] = {
- val inputRDS = new FileInputRDS[K, V, F](this, new Path(directory))
- inputRDSs += inputRDS
- inputRDS
+ ](directory: String): DStream[(K, V)] = {
+ val inputStream = new FileInputDStream[K, V, F](this, new Path(directory))
+ inputStreams += inputStream
+ inputStream
}
- def createTextFileStream(directory: String): RDS[String] = {
+ def createTextFileStream(directory: String): DStream[String] = {
createFileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
@@ -101,26 +101,26 @@ class SparkStreamContext (
queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
- ): RDS[T] = {
- val inputRDS = new QueueInputRDS(this, queue, oneAtATime, defaultRDD)
- inputRDSs += inputRDS
- inputRDS
+ ): DStream[T] = {
+ val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
+ inputStreams += inputStream
+ inputStream
}
- def createQueueStream[T: ClassManifest](iterator: Iterator[RDD[T]]): RDS[T] = {
+ def createQueueStream[T: ClassManifest](iterator: Iterator[RDD[T]]): DStream[T] = {
val queue = new Queue[RDD[T]]
- val inputRDS = createQueueStream(queue, true, null)
+ val inputStream = createQueueStream(queue, true, null)
queue ++= iterator
- inputRDS
+ inputStream
}
/**
- * This function registers a RDS as an output stream that will be
+ * This function registers a DStream as an output stream that will be
* computed every interval.
*/
- def registerOutputStream (outputRDS: RDS[_]) {
- outputRDSs += outputRDS
+ def registerOutputStream (outputStream: DStream[_]) {
+ outputStreams += outputStream
}
/**
@@ -133,11 +133,11 @@ class SparkStreamContext (
if (batchDuration < Milliseconds(100)) {
logWarning("Batch duration of " + batchDuration + " is very low")
}
- if (inputRDSs.size == 0) {
- throw new Exception("No input RDSes created, so nothing to take input from")
+ if (inputStreams.size == 0) {
+ throw new Exception("No input streams created, so nothing to take input from")
}
- if (outputRDSs.size == 0) {
- throw new Exception("No output RDSes registered, so nothing to execute")
+ if (outputStreams.size == 0) {
+ throw new Exception("No output streams registered, so nothing to execute")
}
}
@@ -147,7 +147,7 @@ class SparkStreamContext (
*/
def start() {
verify()
- scheduler = new Scheduler(this, inputRDSs.toArray, outputRDSs.toArray)
+ scheduler = new Scheduler(this, inputStreams.toArray, outputStreams.toArray)
scheduler.start()
}
@@ -168,6 +168,6 @@ class SparkStreamContext (
object SparkStreamContext {
- implicit def rdsToPairRdsFunctions [K: ClassManifest, V: ClassManifest] (rds: RDS[(K,V)]) =
- new PairRDSFunctions (rds)
+ implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) =
+ new PairDStreamFunctions(stream)
}
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index c4573137ae..5c476f02c3 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -50,11 +50,11 @@ class Time(private var millis: Long) {
def isZero = (this.millis == 0)
- override def toString() = (millis.toString + " ms")
+ override def toString = (millis.toString + " ms")
- def toFormattedString() = millis.toString
+ def toFormattedString = millis.toString
- def milliseconds() = millis
+ def milliseconds = millis
}
object Time {
diff --git a/streaming/src/main/scala/spark/streaming/WindowedRDS.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index 812a982301..9a6617a1ee 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedRDS.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -8,19 +8,19 @@ import spark.SparkContext._
import scala.collection.mutable.ArrayBuffer
-class WindowedRDS[T: ClassManifest](
- parent: RDS[T],
+class WindowedDStream[T: ClassManifest](
+ parent: DStream[T],
_windowTime: Time,
_slideTime: Time)
- extends RDS[T](parent.ssc) {
+ extends DStream[T](parent.ssc) {
if (!_windowTime.isMultipleOf(parent.slideTime))
- throw new Exception("The window duration of WindowedRDS (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+ throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
if (!_slideTime.isMultipleOf(parent.slideTime))
- throw new Exception("The slide duration of WindowedRDS (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+ throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
val allowPartialWindows = true
@@ -44,7 +44,7 @@ class WindowedRDS[T: ClassManifest](
if (windowStartTime >= parent.zeroTime) {
// Walk back through time, from the 'windowEndTime' to 'windowStartTime'
- // and get all parent RDDs from the parent RDS
+ // and get all parent RDDs from the parent DStream
var t = windowEndTime
while (t > windowStartTime) {
parent.getOrCompute(t) match {
diff --git a/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala b/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala
index d56fdcdf29..669f575240 100644
--- a/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala
@@ -20,10 +20,10 @@ object ExampleOne {
ssc.setBatchDuration(Seconds(1))
// Create the queue through which RDDs can be pushed to
- // a QueueInputRDS
+ // a QueueInputDStream
val rddQueue = new SynchronizedQueue[RDD[Int]]()
- // Create the QueueInputRDs and use it do some processing
+ // Create the QueueInputDStream and use it do some processing
val inputStream = ssc.createQueueStream(rddQueue)
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
diff --git a/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala b/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala
index 4b8f6d609d..be47e47a5a 100644
--- a/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala
@@ -24,12 +24,12 @@ object ExampleTwo {
if (fs.exists(directory)) throw new Exception("This directory already exists")
fs.mkdirs(directory)
- // Create the FileInputRDS on the directory and use the
+ // Create the FileInputDStream on the directory and use the
// stream to count words in new files created
- val inputRDS = ssc.createTextFileStream(directory.toString)
- val wordsRDS = inputRDS.flatMap(_.split(" "))
- val wordCountsRDS = wordsRDS.map(x => (x, 1)).reduceByKey(_ + _)
- wordCountsRDS.print
+ val inputStream = ssc.createTextFileStream(directory.toString)
+ val words = inputStream.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
ssc.start()
// Creating new files in the directory
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala
index a155630151..ba7bc63d6a 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala
@@ -14,12 +14,12 @@ object WordCount {
val ssc = new SparkStreamContext(args(0), "ExampleTwo")
ssc.setBatchDuration(Seconds(2))
- // Create the FileInputRDS on the directory and use the
+ // Create the FileInputDStream on the directory and use the
// stream to count words in new files created
- val inputRDS = ssc.createTextFileStream(args(1))
- val wordsRDS = inputRDS.flatMap(_.split(" "))
- val wordCountsRDS = wordsRDS.map(x => (x, 1)).reduceByKey(_ + _)
- wordCountsRDS.print()
+ val lines = ssc.createTextFileStream(args(1))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
ssc.start()
}
}
diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
index 9925b1d07c..9fb1924798 100644
--- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
@@ -23,7 +23,7 @@ object Receiver {
count += 28
}
} catch {
- case e: Exception => e.printStackTrace
+ case e: Exception => e.printStackTrace()
}
val timeTaken = System.currentTimeMillis - time
val tput = (count / 1024.0) / (timeTaken / 1000.0)
diff --git a/streaming/src/test/scala/spark/streaming/RDSSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
index f51ea50a5d..ce7c3d2e2b 100644
--- a/streaming/src/test/scala/spark/streaming/RDSSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
@@ -1,6 +1,6 @@
package spark.streaming
-import spark.RDD
+import spark.{Logging, RDD}
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
@@ -8,15 +8,15 @@ import org.scalatest.BeforeAndAfter
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SynchronizedQueue
-class RDSSuite extends FunSuite with BeforeAndAfter {
+class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
var ssc: SparkStreamContext = null
val batchDurationMillis = 1000
def testOp[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
- operation: RDS[U] => RDS[V],
- expectedOutput: Seq[Seq[V]]) = {
+ operation: DStream[U] => DStream[V],
+ expectedOutput: Seq[Seq[V]]) {
try {
ssc = new SparkStreamContext("local", "test")
ssc.setBatchDuration(Milliseconds(batchDurationMillis))
@@ -31,7 +31,7 @@ class RDSSuite extends FunSuite with BeforeAndAfter {
val output = new ArrayBuffer[Seq[V]]()
while(outputQueue.size > 0) {
val rdd = outputQueue.take()
- println("Collecting RDD " + rdd.id + ", " + rdd.getClass().getSimpleName() + ", " + rdd.splits.size)
+ logInfo("Collecting RDD " + rdd.id + ", " + rdd.getClass.getSimpleName + ", " + rdd.splits.size)
output += (rdd.collect())
}
assert(output.size === expectedOutput.size)
@@ -47,19 +47,19 @@ class RDSSuite extends FunSuite with BeforeAndAfter {
val inputData = Array(1 to 4, 5 to 8, 9 to 12)
// map
- testOp(inputData, (r: RDS[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
+ testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
// flatMap
- testOp(inputData, (r: RDS[Int]) => r.flatMap(x => Array(x, x * 2)),
+ testOp(inputData, (r: DStream[Int]) => r.flatMap(x => Array(x, x * 2)),
inputData.map(_.flatMap(x => Array(x, x * 2)))
)
}
}
-object RDSSuite {
+object DStreamSuite {
def main(args: Array[String]) {
- val r = new RDSSuite()
+ val r = new DStreamSuite()
val inputData = Array(1 to 4, 5 to 8, 9 to 12)
- r.testOp(inputData, (r: RDS[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
+ r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
}
} \ No newline at end of file