aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-01-20 12:09:45 -0700
committerseanm <sean.mcnamara@webtrends.com>2013-01-20 12:09:45 -0700
commitc0694291c81ad775918421941a80a00ca9593a38 (patch)
tree6b59ef12a7b98c6c9306b71aa3d0cd678fc401b3
parentea739251eb763b756a282534268e765b8d4b70f0 (diff)
downloadspark-c0694291c81ad775918421941a80a00ca9593a38.tar.gz
spark-c0694291c81ad775918421941a80a00ca9593a38.tar.bz2
spark-c0694291c81ad775918421941a80a00ca9593a38.zip
Splitting StreamingContext.queueStream into two methods
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala22
1 files changed, 18 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb1..3cec35cb37 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -283,17 +283,31 @@ class StreamingContext private (
}
/**
- * Creates a input stream from an queue of RDDs. In each batch,
+ * Creates an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
- * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
- oneAtATime: Boolean = true,
- defaultRDD: RDD[T] = null
+ oneAtATime: Boolean = true
+ ): DStream[T] = {
+ queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
+ }
+
+ /**
+ * Creates an input stream from a queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T: ClassManifest](
+ queue: Queue[RDD[T]],
+ oneAtATime: Boolean,
+ defaultRDD: RDD[T]
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
registerInputStream(inputStream)