diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-02 14:05:51 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-02 14:05:51 -0400 |
commit | 43b81eb2719c4666b7869d7d0290f2ee83daeafa (patch) | |
tree | f3ff92c219d541d4b25819f0d4bc0b2ef3f007a0 /streaming | |
parent | 29bf44473c9d76622628f2511588f7846e9b1f3c (diff) | |
download | spark-43b81eb2719c4666b7869d7d0290f2ee83daeafa.tar.gz spark-43b81eb2719c4666b7869d7d0290f2ee83daeafa.tar.bz2 spark-43b81eb2719c4666b7869d7d0290f2ee83daeafa.zip |
Renamed RDS to DStream, plus minor style fixes
Diffstat (limited to 'streaming')
16 files changed, 183 insertions, 185 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputRDS.scala b/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala index bf2e6f7e16..6a2be34633 100644 --- a/streaming/src/main/scala/spark/streaming/ConstantInputRDS.scala +++ b/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala @@ -5,8 +5,8 @@ import spark.RDD /** * An input stream that always returns the same RDD on each timestep. Useful for testing. */ -class ConstantInputRDS[T: ClassManifest](ssc: SparkStreamContext, rdd: RDD[T]) - extends InputRDS[T](ssc) { +class ConstantInputDStream[T: ClassManifest](ssc: SparkStreamContext, rdd: RDD[T]) + extends InputDStream[T](ssc) { override def start() {} diff --git a/streaming/src/main/scala/spark/streaming/RDS.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index fd923929e7..e19d2ecef5 100644 --- a/streaming/src/main/scala/spark/streaming/RDS.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -15,7 +15,7 @@ import scala.collection.mutable.HashMap import java.util.concurrent.ArrayBlockingQueue -abstract class RDS[T: ClassManifest] (@transient val ssc: SparkStreamContext) +abstract class DStream[T: ClassManifest] (@transient val ssc: SparkStreamContext) extends Logging with Serializable { initLogging() @@ -26,25 +26,25 @@ extends Logging with Serializable { * ---------------------------------------------- */ - // Time by which the window slides in this RDS + // Time by which the window slides in this DStream def slideTime: Time - // List of parent RDSs on which this RDS depends on - def dependencies: List[RDS[_]] + // List of parent DStreams on which this DStream depends on + def dependencies: List[DStream[_]] // Key method that computes RDD for a valid time def compute (validTime: Time): Option[RDD[T]] /** * --------------------------------------- - * Other general fields and methods of RDS + * Other general fields and methods of DStream * --------------------------------------- */ // Variable to store the RDDs generated earlier in time @transient private val generatedRDDs = new HashMap[Time, RDD[T]] () - // Variable to be set to the first time seen by the RDS (effective time zero) + // Variable to be set to the first time seen by the DStream (effective time zero) private[streaming] var zeroTime: Time = null // Variable to specify storage level @@ -58,11 +58,11 @@ extends Logging with Serializable { def persist( storageLevel: StorageLevel, checkpointLevel: StorageLevel, - checkpointInterval: Time): RDS[T] = { + checkpointInterval: Time): DStream[T] = { if (this.storageLevel != StorageLevel.NONE && this.storageLevel != storageLevel) { - // TODO: not sure this is necessary for RDSes + // TODO: not sure this is necessary for DStreams throw new UnsupportedOperationException( - "Cannot change storage level of an RDS after it was already assigned a level") + "Cannot change storage level of an DStream after it was already assigned a level") } this.storageLevel = storageLevel this.checkpointLevel = checkpointLevel @@ -70,20 +70,20 @@ extends Logging with Serializable { this } - // Set caching level for the RDDs created by this RDS - def persist(newLevel: StorageLevel): RDS[T] = persist(newLevel, StorageLevel.NONE, null) + // Set caching level for the RDDs created by this DStream + def persist(newLevel: StorageLevel): DStream[T] = persist(newLevel, StorageLevel.NONE, null) - def persist(): RDS[T] = persist(StorageLevel.MEMORY_ONLY_DESER) + def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_DESER) // Turn on the default caching level for this RDD - def cache(): RDS[T] = persist() + def cache(): DStream[T] = persist() def isInitialized = (zeroTime != null) /** - * This method initializes the RDS by setting the "zero" time, based on which + * This method initializes the DStream by setting the "zero" time, based on which * the validity of future times is calculated. This method also recursively initializes - * its parent RDSs. + * its parent DStreams. */ def initialize(time: Time) { if (zeroTime == null) { @@ -105,20 +105,20 @@ extends Logging with Serializable { } /** - * This method either retrieves a precomputed RDD of this RDS, + * This method either retrieves a precomputed RDD of this DStream, * or computes the RDD (if the time is valid) */ def getOrCompute(time: Time): Option[RDD[T]] = { - // If this RDS was not initialized (i.e., zeroTime not set), then do it + // If this DStream was not initialized (i.e., zeroTime not set), then do it // If RDD was already generated, then retrieve it from HashMap generatedRDDs.get(time) match { // If an RDD was already generated and is being reused, then - // probably all RDDs in this RDS will be reused and hence should be cached + // probably all RDDs in this DStream will be reused and hence should be cached case Some(oldRDD) => Some(oldRDD) // if RDD was not generated, and if the time is valid - // (based on sliding time of this RDS), then generate the RDD + // (based on sliding time of this DStream), then generate the RDD case None => if (isTimeValid(time)) { compute(time) match { @@ -160,21 +160,21 @@ extends Logging with Serializable { /** * -------------- - * RDS operations + * DStream operations * -------------- */ - def map[U: ClassManifest](mapFunc: T => U) = new MappedRDS(this, ssc.sc.clean(mapFunc)) + def map[U: ClassManifest](mapFunc: T => U) = new MappedDStream(this, ssc.sc.clean(mapFunc)) def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) = - new FlatMappedRDS(this, ssc.sc.clean(flatMapFunc)) + new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) - def filter(filterFunc: T => Boolean) = new FilteredRDS(this, filterFunc) + def filter(filterFunc: T => Boolean) = new FilteredDStream(this, filterFunc) - def glom() = new GlommedRDS(this) + def glom() = new GlommedDStream(this) def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) = - new MapPartitionedRDS(this, ssc.sc.clean(mapPartFunc)) + new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc)) def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2) @@ -183,18 +183,18 @@ extends Logging with Serializable { def collect() = this.map(x => (1, x)).groupByKey(1).map(_._2) def foreach(foreachFunc: T => Unit) = { - val newrds = new PerElementForEachRDS(this, ssc.sc.clean(foreachFunc)) - ssc.registerOutputStream(newrds) - newrds + val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + newStream } def foreachRDD(foreachFunc: RDD[T] => Unit) = { - val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc)) - ssc.registerOutputStream(newrds) - newrds + val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + newStream } - private[streaming] def toQueue() = { + private[streaming] def toQueue = { val queue = new ArrayBlockingQueue[RDD[T]](10000) this.foreachRDD(rdd => { println("Added RDD " + rdd.id) @@ -213,12 +213,12 @@ extends Logging with Serializable { if (first11.size > 10) println("...") println() } - val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc)) - ssc.registerOutputStream(newrds) - newrds + val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + newStream } - def window(windowTime: Time, slideTime: Time) = new WindowedRDS(this, windowTime, slideTime) + def window(windowTime: Time, slideTime: Time) = new WindowedDStream(this, windowTime, slideTime) def batch(batchTime: Time) = window(batchTime, batchTime) @@ -241,15 +241,17 @@ extends Logging with Serializable { this.map(_ => 1).reduceByWindow(add _, subtract _, windowTime, slideTime) } - def union(that: RDS[T]) = new UnifiedRDS(Array(this, that)) + def union(that: DStream[T]) = new UnifiedDStream(Array(this, that)) - def register() = ssc.registerOutputStream(this) + def register() { + ssc.registerOutputStream(this) + } } -abstract class InputRDS[T: ClassManifest] ( +abstract class InputDStream[T: ClassManifest] ( ssc: SparkStreamContext) -extends RDS[T](ssc) { +extends DStream[T](ssc) { override def dependencies = List() @@ -265,10 +267,10 @@ extends RDS[T](ssc) { * TODO */ -class MappedRDS[T: ClassManifest, U: ClassManifest] ( - parent: RDS[T], +class MappedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], mapFunc: T => U) -extends RDS[U](parent.ssc) { +extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -284,10 +286,10 @@ extends RDS[U](parent.ssc) { * TODO */ -class FlatMappedRDS[T: ClassManifest, U: ClassManifest]( - parent: RDS[T], +class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], flatMapFunc: T => Traversable[U]) -extends RDS[U](parent.ssc) { +extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -303,8 +305,8 @@ extends RDS[U](parent.ssc) { * TODO */ -class FilteredRDS[T: ClassManifest](parent: RDS[T], filterFunc: T => Boolean) -extends RDS[T](parent.ssc) { +class FilteredDStream[T: ClassManifest](parent: DStream[T], filterFunc: T => Boolean) +extends DStream[T](parent.ssc) { override def dependencies = List(parent) @@ -320,10 +322,10 @@ extends RDS[T](parent.ssc) { * TODO */ -class MapPartitionedRDS[T: ClassManifest, U: ClassManifest]( - parent: RDS[T], +class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], mapPartFunc: Iterator[T] => Iterator[U]) -extends RDS[U](parent.ssc) { +extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -339,7 +341,7 @@ extends RDS[U](parent.ssc) { * TODO */ -class GlommedRDS[T: ClassManifest](parent: RDS[T]) extends RDS[Array[T]](parent.ssc) { +class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { override def dependencies = List(parent) @@ -355,13 +357,13 @@ class GlommedRDS[T: ClassManifest](parent: RDS[T]) extends RDS[Array[T]](parent. * TODO */ -class ShuffledRDS[K: ClassManifest, V: ClassManifest, C: ClassManifest]( - parent: RDS[(K,V)], +class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + parent: DStream[(K,V)], createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, numPartitions: Int) - extends RDS [(K,C)] (parent.ssc) { + extends DStream [(K,C)] (parent.ssc) { override def dependencies = List(parent) @@ -388,8 +390,8 @@ class ShuffledRDS[K: ClassManifest, V: ClassManifest, C: ClassManifest]( * TODO */ -class UnifiedRDS[T: ClassManifest](parents: Array[RDS[T]]) -extends RDS[T](parents(0).ssc) { +class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]]) +extends DStream[T](parents(0).ssc) { if (parents.length == 0) { throw new IllegalArgumentException("Empty array of parents") @@ -426,10 +428,10 @@ extends RDS[T](parents(0).ssc) { * TODO */ -class PerElementForEachRDS[T: ClassManifest] ( - parent: RDS[T], +class PerElementForEachDStream[T: ClassManifest] ( + parent: DStream[T], foreachFunc: T => Unit) -extends RDS[Unit](parent.ssc) { +extends DStream[Unit](parent.ssc) { override def dependencies = List(parent) @@ -457,12 +459,12 @@ extends RDS[Unit](parent.ssc) { * TODO */ -class PerRDDForEachRDS[T: ClassManifest] ( - parent: RDS[T], +class PerRDDForEachDStream[T: ClassManifest] ( + parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit) -extends RDS[Unit](parent.ssc) { +extends DStream[Unit](parent.ssc) { - def this(parent: RDS[T], altForeachFunc: (RDD[T]) => Unit) = + def this(parent: DStream[T], altForeachFunc: (RDD[T]) => Unit) = this(parent, (rdd: RDD[T], time: Time) => altForeachFunc(rdd)) override def dependencies = List(parent) diff --git a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala index ebd246823d..88aa375289 100644 --- a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala +++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala @@ -18,12 +18,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( +class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( ssc: SparkStreamContext, directory: Path, - filter: PathFilter = FileInputRDS.defaultPathFilter, + filter: PathFilter = FileInputDStream.defaultPathFilter, newFilesOnly: Boolean = true) - extends InputRDS[(K, V)](ssc) { + extends InputDStream[(K, V)](ssc) { val fs = directory.getFileSystem(new Configuration()) var lastModTime: Long = 0 @@ -69,7 +69,7 @@ class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] } } -object FileInputRDS { +object FileInputDStream { val defaultPathFilter = new PathFilter { def accept(path: Path): Boolean = { val file = path.getName() @@ -83,12 +83,12 @@ object FileInputRDS { } /* -class NetworkInputRDS[T: ClassManifest]( +class NetworkInputDStream[T: ClassManifest]( val networkInputName: String, val addresses: Array[InetSocketAddress], batchDuration: Time, ssc: SparkStreamContext) -extends InputRDS[T](networkInputName, batchDuration, ssc) { +extends InputDStream[T](networkInputName, batchDuration, ssc) { // TODO(Haoyuan): This is for the performance test. @@ -139,11 +139,11 @@ extends InputRDS[T](networkInputName, batchDuration, ssc) { } -class TestInputRDS( +class TestInputDStream( val testInputName: String, batchDuration: Time, ssc: SparkStreamContext) -extends InputRDS[String](testInputName, batchDuration, ssc) { +extends InputDStream[String](testInputName, batchDuration, ssc) { @transient val references = new HashMap[Time,Array[String]] diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index 1960097216..088cbe4376 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -1,7 +1,6 @@ package spark.streaming -case class Interval (val beginTime: Time, val endTime: Time) { - +case class Interval (beginTime: Time, endTime: Time) { def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs)) def duration(): Time = endTime - beginTime @@ -33,7 +32,7 @@ case class Interval (val beginTime: Time, val endTime: Time) { this + (endTime - beginTime) } - def isZero() = (beginTime.isZero && endTime.isZero) + def isZero = (beginTime.isZero && endTime.isZero) def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString @@ -41,7 +40,6 @@ case class Interval (val beginTime: Time, val endTime: Time) { } object Interval { - def zero() = new Interval (Time.zero, Time.zero) def currentInterval(intervalDuration: Time): Interval = { @@ -49,7 +47,6 @@ object Interval { val intervalBegin = time.floor(intervalDuration) Interval(intervalBegin, intervalBegin + intervalDuration) } - } diff --git a/streaming/src/main/scala/spark/streaming/PairRDSFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 403ae233a5..0cf296f21a 100644 --- a/streaming/src/main/scala/spark/streaming/PairRDSFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -3,23 +3,23 @@ package spark.streaming import scala.collection.mutable.ArrayBuffer import spark.streaming.SparkStreamContext._ -class PairRDSFunctions[K: ClassManifest, V: ClassManifest](rds: RDS[(K,V)]) +class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) extends Serializable { - def ssc = rds.ssc + def ssc = stream.ssc /* ---------------------------------- */ - /* RDS operations for key-value pairs */ + /* DStream operations for key-value pairs */ /* ---------------------------------- */ - def groupByKey(numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = { + def groupByKey(numPartitions: Int = 0): ShuffledDStream[K, V, ArrayBuffer[V]] = { def createCombiner(v: V) = ArrayBuffer[V](v) def mergeValue(c: ArrayBuffer[V], v: V) = (c += v) def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2) combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, numPartitions) } - def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int = 0): ShuffledRDS[K, V, V] = { + def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int = 0): ShuffledDStream[K, V, V] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, numPartitions) } @@ -28,23 +28,23 @@ extends Serializable { createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, - numPartitions: Int) : ShuffledRDS[K, V, C] = { - new ShuffledRDS[K, V, C](rds, createCombiner, mergeValue, mergeCombiner, numPartitions) + numPartitions: Int) : ShuffledDStream[K, V, C] = { + new ShuffledDStream[K, V, C](stream, createCombiner, mergeValue, mergeCombiner, numPartitions) } def groupByKeyAndWindow( windowTime: Time, slideTime: Time, - numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = { - rds.window(windowTime, slideTime).groupByKey(numPartitions) + numPartitions: Int = 0): ShuffledDStream[K, V, ArrayBuffer[V]] = { + stream.window(windowTime, slideTime).groupByKey(numPartitions) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time, - numPartitions: Int = 0): ShuffledRDS[K, V, V] = { - rds.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), numPartitions) + numPartitions: Int = 0): ShuffledDStream[K, V, V] = { + stream.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), numPartitions) } // This method is the efficient sliding window reduce operation, @@ -57,10 +57,10 @@ extends Serializable { invReduceFunc: (V, V) => V, windowTime: Time, slideTime: Time, - numPartitions: Int): ReducedWindowedRDS[K, V] = { + numPartitions: Int): ReducedWindowedDStream[K, V] = { - new ReducedWindowedRDS[K, V]( - rds, + new ReducedWindowedDStream[K, V]( + stream, ssc.sc.clean(reduceFunc), ssc.sc.clean(invReduceFunc), windowTime, diff --git a/streaming/src/main/scala/spark/streaming/QueueInputRDS.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala index 31e6a64e21..c78abd1a87 100644 --- a/streaming/src/main/scala/spark/streaming/QueueInputRDS.scala +++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala @@ -6,12 +6,12 @@ import spark.UnionRDD import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer -class QueueInputRDS[T: ClassManifest]( +class QueueInputDStream[T: ClassManifest]( ssc: SparkStreamContext, val queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] - ) extends InputRDS[T](ssc) { + ) extends InputDStream[T](ssc) { override def start() { } diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedRDS.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index dd1f474657..11fa4e5443 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedRDS.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -11,28 +11,28 @@ import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer -class ReducedWindowedRDS[K: ClassManifest, V: ClassManifest]( - parent: RDS[(K, V)], +class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( + parent: DStream[(K, V)], reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, _windowTime: Time, _slideTime: Time, numPartitions: Int) -extends RDS[(K,V)](parent.ssc) { +extends DStream[(K,V)](parent.ssc) { if (!_windowTime.isMultipleOf(parent.slideTime)) - throw new Exception("The window duration of ReducedWindowedRDS (" + _slideTime + ") " + - "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + throw new Exception("The window duration of ReducedWindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") if (!_slideTime.isMultipleOf(parent.slideTime)) - throw new Exception("The slide duration of ReducedWindowedRDS (" + _slideTime + ") " + - "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + throw new Exception("The slide duration of ReducedWindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") - val reducedRDS = parent.reduceByKey(reduceFunc, numPartitions) + val reducedStream = parent.reduceByKey(reduceFunc, numPartitions) val allowPartialWindows = true - //reducedRDS.persist(StorageLevel.MEMORY_ONLY_DESER_2) + //reducedStream.persist(StorageLevel.MEMORY_ONLY_DESER_2) - override def dependencies = List(reducedRDS) + override def dependencies = List(reducedStream) def windowTime: Time = _windowTime @@ -41,9 +41,9 @@ extends RDS[(K,V)](parent.ssc) { override def persist( storageLevel: StorageLevel, checkpointLevel: StorageLevel, - checkpointInterval: Time): RDS[(K,V)] = { + checkpointInterval: Time): DStream[(K,V)] = { super.persist(storageLevel, checkpointLevel, checkpointInterval) - reducedRDS.persist(storageLevel, checkpointLevel, checkpointInterval) + reducedStream.persist(storageLevel, checkpointLevel, checkpointInterval) } override def compute(validTime: Time): Option[RDD[(K, V)]] = { @@ -80,7 +80,7 @@ extends RDS[(K,V)](parent.ssc) { if (allowPartialWindows) { if (currentTime - slideTime == parent.zeroTime) { - reducedRDS.getOrCompute(currentTime) match { + reducedStream.getOrCompute(currentTime) match { case Some(rdd) => return Some(rdd) case None => throw new Exception("Could not get first reduced RDD for time " + currentTime) } @@ -94,11 +94,11 @@ extends RDS[(K,V)](parent.ssc) { val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]() var t = currentWindow.endTime while (t > currentWindow.beginTime) { - reducedRDS.getOrCompute(t) match { + reducedStream.getOrCompute(t) match { case Some(rdd) => reducedRDDs += rdd case None => throw new Exception("Could not get reduced RDD for time " + t) } - t -= reducedRDS.slideTime + t -= reducedStream.slideTime } if (reducedRDDs.size == 0) { throw new Exception("Could not generate the first RDD for time " + validTime) @@ -120,21 +120,21 @@ extends RDS[(K,V)](parent.ssc) { // Get the RDDs of the reduced values in "old time steps" var t = currentWindow.beginTime while (t > previousWindow.beginTime) { - reducedRDS.getOrCompute(t) match { + reducedStream.getOrCompute(t) match { case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]] case None => throw new Exception("Could not get old reduced RDD for time " + t) } - t -= reducedRDS.slideTime + t -= reducedStream.slideTime } // Get the RDDs of the reduced values in "new time steps" t = currentWindow.endTime while (t > previousWindow.endTime) { - reducedRDS.getOrCompute(t) match { + reducedStream.getOrCompute(t) match { case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]] case None => throw new Exception("Could not get new reduced RDD for time " + t) } - t -= reducedRDS.slideTime + t -= reducedStream.slideTime } val partitioner = new HashPartitioner(numPartitions) diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 83f874e550..fff4924b4c 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -9,12 +9,11 @@ import scala.collection.mutable.HashMap sealed trait SchedulerMessage case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage -case class Test extends SchedulerMessage class Scheduler( ssc: SparkStreamContext, - inputRDSs: Array[InputRDS[_]], - outputRDSs: Array[RDS[_]]) + inputStreams: Array[InputDStream[_]], + outputStreams: Array[DStream[_]]) extends Logging { initLogging() @@ -26,21 +25,21 @@ extends Logging { def start() { val zeroTime = Time(timer.start()) - outputRDSs.foreach(_.initialize(zeroTime)) - inputRDSs.par.foreach(_.start()) + outputStreams.foreach(_.initialize(zeroTime)) + inputStreams.par.foreach(_.start()) logInfo("Scheduler started") } def stop() { timer.stop() - inputRDSs.par.foreach(_.stop()) + inputStreams.par.foreach(_.stop()) logInfo("Scheduler stopped") } def generateRDDs (time: Time) { logInfo("Generating RDDs for time " + time) - outputRDSs.foreach(outputRDS => { - outputRDS.generateJob(time) match { + outputStreams.foreach(outputStream => { + outputStream.generateJob(time) match { case Some(job) => submitJob(job) case None => } diff --git a/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala b/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala index d32f6d588c..2bec1091c0 100644 --- a/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala +++ b/streaming/src/main/scala/spark/streaming/SparkStreamContext.scala @@ -31,8 +31,8 @@ class SparkStreamContext ( val sc = new SparkContext(master, frameworkName, sparkHome, jars) val env = SparkEnv.get - val inputRDSs = new ArrayBuffer[InputRDS[_]]() - val outputRDSs = new ArrayBuffer[RDS[_]]() + val inputStreams = new ArrayBuffer[InputDStream[_]]() + val outputStreams = new ArrayBuffer[DStream[_]]() var batchDuration: Time = null var scheduler: Scheduler = null @@ -48,17 +48,17 @@ class SparkStreamContext ( def createNetworkStream[T: ClassManifest]( name: String, addresses: Array[InetSocketAddress], - batchDuration: Time): RDS[T] = { + batchDuration: Time): DStream[T] = { - val inputRDS = new NetworkInputRDS[T](this, addresses) - inputRDSs += inputRDS - inputRDS + val inputStream = new NetworkinputStream[T](this, addresses) + inputStreams += inputStream + inputStream } def createNetworkStream[T: ClassManifest]( name: String, addresses: Array[String], - batchDuration: Long): RDS[T] = { + batchDuration: Long): DStream[T] = { def stringToInetSocketAddress (str: String): InetSocketAddress = { val parts = str.split(":") @@ -83,13 +83,13 @@ class SparkStreamContext ( K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K, V]: ClassManifest - ](directory: String): RDS[(K, V)] = { - val inputRDS = new FileInputRDS[K, V, F](this, new Path(directory)) - inputRDSs += inputRDS - inputRDS + ](directory: String): DStream[(K, V)] = { + val inputStream = new FileInputDStream[K, V, F](this, new Path(directory)) + inputStreams += inputStream + inputStream } - def createTextFileStream(directory: String): RDS[String] = { + def createTextFileStream(directory: String): DStream[String] = { createFileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } @@ -101,26 +101,26 @@ class SparkStreamContext ( queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null - ): RDS[T] = { - val inputRDS = new QueueInputRDS(this, queue, oneAtATime, defaultRDD) - inputRDSs += inputRDS - inputRDS + ): DStream[T] = { + val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) + inputStreams += inputStream + inputStream } - def createQueueStream[T: ClassManifest](iterator: Iterator[RDD[T]]): RDS[T] = { + def createQueueStream[T: ClassManifest](iterator: Iterator[RDD[T]]): DStream[T] = { val queue = new Queue[RDD[T]] - val inputRDS = createQueueStream(queue, true, null) + val inputStream = createQueueStream(queue, true, null) queue ++= iterator - inputRDS + inputStream } /** - * This function registers a RDS as an output stream that will be + * This function registers a DStream as an output stream that will be * computed every interval. */ - def registerOutputStream (outputRDS: RDS[_]) { - outputRDSs += outputRDS + def registerOutputStream (outputStream: DStream[_]) { + outputStreams += outputStream } /** @@ -133,11 +133,11 @@ class SparkStreamContext ( if (batchDuration < Milliseconds(100)) { logWarning("Batch duration of " + batchDuration + " is very low") } - if (inputRDSs.size == 0) { - throw new Exception("No input RDSes created, so nothing to take input from") + if (inputStreams.size == 0) { + throw new Exception("No input streams created, so nothing to take input from") } - if (outputRDSs.size == 0) { - throw new Exception("No output RDSes registered, so nothing to execute") + if (outputStreams.size == 0) { + throw new Exception("No output streams registered, so nothing to execute") } } @@ -147,7 +147,7 @@ class SparkStreamContext ( */ def start() { verify() - scheduler = new Scheduler(this, inputRDSs.toArray, outputRDSs.toArray) + scheduler = new Scheduler(this, inputStreams.toArray, outputStreams.toArray) scheduler.start() } @@ -168,6 +168,6 @@ class SparkStreamContext ( object SparkStreamContext { - implicit def rdsToPairRdsFunctions [K: ClassManifest, V: ClassManifest] (rds: RDS[(K,V)]) = - new PairRDSFunctions (rds) + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = + new PairDStreamFunctions(stream) } diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index c4573137ae..5c476f02c3 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -50,11 +50,11 @@ class Time(private var millis: Long) { def isZero = (this.millis == 0) - override def toString() = (millis.toString + " ms") + override def toString = (millis.toString + " ms") - def toFormattedString() = millis.toString + def toFormattedString = millis.toString - def milliseconds() = millis + def milliseconds = millis } object Time { diff --git a/streaming/src/main/scala/spark/streaming/WindowedRDS.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala index 812a982301..9a6617a1ee 100644 --- a/streaming/src/main/scala/spark/streaming/WindowedRDS.scala +++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala @@ -8,19 +8,19 @@ import spark.SparkContext._ import scala.collection.mutable.ArrayBuffer -class WindowedRDS[T: ClassManifest]( - parent: RDS[T], +class WindowedDStream[T: ClassManifest]( + parent: DStream[T], _windowTime: Time, _slideTime: Time) - extends RDS[T](parent.ssc) { + extends DStream[T](parent.ssc) { if (!_windowTime.isMultipleOf(parent.slideTime)) - throw new Exception("The window duration of WindowedRDS (" + _slideTime + ") " + - "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") if (!_slideTime.isMultipleOf(parent.slideTime)) - throw new Exception("The slide duration of WindowedRDS (" + _slideTime + ") " + - "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") val allowPartialWindows = true @@ -44,7 +44,7 @@ class WindowedRDS[T: ClassManifest]( if (windowStartTime >= parent.zeroTime) { // Walk back through time, from the 'windowEndTime' to 'windowStartTime' - // and get all parent RDDs from the parent RDS + // and get all parent RDDs from the parent DStream var t = windowEndTime while (t > windowStartTime) { parent.getOrCompute(t) match { diff --git a/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala b/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala index d56fdcdf29..669f575240 100644 --- a/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala +++ b/streaming/src/main/scala/spark/streaming/examples/ExampleOne.scala @@ -20,10 +20,10 @@ object ExampleOne { ssc.setBatchDuration(Seconds(1)) // Create the queue through which RDDs can be pushed to - // a QueueInputRDS + // a QueueInputDStream val rddQueue = new SynchronizedQueue[RDD[Int]]() - // Create the QueueInputRDs and use it do some processing + // Create the QueueInputDStream and use it do some processing val inputStream = ssc.createQueueStream(rddQueue) val mappedStream = inputStream.map(x => (x % 10, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) diff --git a/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala b/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala index 4b8f6d609d..be47e47a5a 100644 --- a/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala +++ b/streaming/src/main/scala/spark/streaming/examples/ExampleTwo.scala @@ -24,12 +24,12 @@ object ExampleTwo { if (fs.exists(directory)) throw new Exception("This directory already exists") fs.mkdirs(directory) - // Create the FileInputRDS on the directory and use the + // Create the FileInputDStream on the directory and use the // stream to count words in new files created - val inputRDS = ssc.createTextFileStream(directory.toString) - val wordsRDS = inputRDS.flatMap(_.split(" ")) - val wordCountsRDS = wordsRDS.map(x => (x, 1)).reduceByKey(_ + _) - wordCountsRDS.print + val inputStream = ssc.createTextFileStream(directory.toString) + val words = inputStream.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() ssc.start() // Creating new files in the directory diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala index a155630151..ba7bc63d6a 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala @@ -14,12 +14,12 @@ object WordCount { val ssc = new SparkStreamContext(args(0), "ExampleTwo") ssc.setBatchDuration(Seconds(2)) - // Create the FileInputRDS on the directory and use the + // Create the FileInputDStream on the directory and use the // stream to count words in new files created - val inputRDS = ssc.createTextFileStream(args(1)) - val wordsRDS = inputRDS.flatMap(_.split(" ")) - val wordCountsRDS = wordsRDS.map(x => (x, 1)).reduceByKey(_ + _) - wordCountsRDS.print() + val lines = ssc.createTextFileStream(args(1)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() ssc.start() } } diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala index 9925b1d07c..9fb1924798 100644 --- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala @@ -23,7 +23,7 @@ object Receiver { count += 28 } } catch { - case e: Exception => e.printStackTrace + case e: Exception => e.printStackTrace() } val timeTaken = System.currentTimeMillis - time val tput = (count / 1024.0) / (timeTaken / 1000.0) diff --git a/streaming/src/test/scala/spark/streaming/RDSSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala index f51ea50a5d..ce7c3d2e2b 100644 --- a/streaming/src/test/scala/spark/streaming/RDSSuite.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala @@ -1,6 +1,6 @@ package spark.streaming -import spark.RDD +import spark.{Logging, RDD} import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter @@ -8,15 +8,15 @@ import org.scalatest.BeforeAndAfter import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.SynchronizedQueue -class RDSSuite extends FunSuite with BeforeAndAfter { +class DStreamSuite extends FunSuite with BeforeAndAfter with Logging { var ssc: SparkStreamContext = null val batchDurationMillis = 1000 def testOp[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], - operation: RDS[U] => RDS[V], - expectedOutput: Seq[Seq[V]]) = { + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]]) { try { ssc = new SparkStreamContext("local", "test") ssc.setBatchDuration(Milliseconds(batchDurationMillis)) @@ -31,7 +31,7 @@ class RDSSuite extends FunSuite with BeforeAndAfter { val output = new ArrayBuffer[Seq[V]]() while(outputQueue.size > 0) { val rdd = outputQueue.take() - println("Collecting RDD " + rdd.id + ", " + rdd.getClass().getSimpleName() + ", " + rdd.splits.size) + logInfo("Collecting RDD " + rdd.id + ", " + rdd.getClass.getSimpleName + ", " + rdd.splits.size) output += (rdd.collect()) } assert(output.size === expectedOutput.size) @@ -47,19 +47,19 @@ class RDSSuite extends FunSuite with BeforeAndAfter { val inputData = Array(1 to 4, 5 to 8, 9 to 12) // map - testOp(inputData, (r: RDS[Int]) => r.map(_.toString), inputData.map(_.map(_.toString))) + testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString))) // flatMap - testOp(inputData, (r: RDS[Int]) => r.flatMap(x => Array(x, x * 2)), + testOp(inputData, (r: DStream[Int]) => r.flatMap(x => Array(x, x * 2)), inputData.map(_.flatMap(x => Array(x, x * 2))) ) } } -object RDSSuite { +object DStreamSuite { def main(args: Array[String]) { - val r = new RDSSuite() + val r = new DStreamSuite() val inputData = Array(1 to 4, 5 to 8, 9 to 12) - r.testOp(inputData, (r: RDS[Int]) => r.map(_.toString), inputData.map(_.map(_.toString))) + r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString))) } }
\ No newline at end of file |