diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-01-20 12:09:45 -0700 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-01-20 12:09:45 -0700 |
commit | c0694291c81ad775918421941a80a00ca9593a38 (patch) | |
tree | 6b59ef12a7b98c6c9306b71aa3d0cd678fc401b3 | |
parent | ea739251eb763b756a282534268e765b8d4b70f0 (diff) | |
download | spark-c0694291c81ad775918421941a80a00ca9593a38.tar.gz spark-c0694291c81ad775918421941a80a00ca9593a38.tar.bz2 spark-c0694291c81ad775918421941a80a00ca9593a38.zip |
Splitting StreamingContext.queueStream into two methods
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..3cec35cb37 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -283,17 +283,31 @@ class StreamingContext private ( } /** - * Creates a input stream from an queue of RDDs. In each batch, + * Creates an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval - * @param defaultRDD Default RDD is returned by the DStream when the queue is empty * @tparam T Type of objects in the RDD */ def queueStream[T: ClassManifest]( queue: Queue[RDD[T]], - oneAtATime: Boolean = true, - defaultRDD: RDD[T] = null + oneAtATime: Boolean = true + ): DStream[T] = { + queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) + } + + /** + * Creates an input stream from a queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T: ClassManifest]( + queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] ): DStream[T] = { val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) registerInputStream(inputStream) |