diff options
Diffstat (limited to 'streaming')
3 files changed, 8 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala index de30297c7d..b794159b09 100644 --- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala @@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest]( buffer ++= queue } if (buffer.size > 0) { - Some(new UnionRDD(ssc.sc, buffer.toSeq)) + if (oneAtATime) { + Some(buffer.first) + } else { + Some(new UnionRDD(ssc.sc, buffer.toSeq)) + } } else if (defaultRDD != null) { Some(defaultRDD) } else { @@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest]( } } -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 00136685d5..d2e907378d 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -26,7 +26,6 @@ extends Logging { val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_)) def start() { - val zeroTime = Time(timer.start()) outputStreams.foreach(_.initialize(zeroTime)) inputStreams.par.foreach(_.start()) @@ -40,6 +39,7 @@ extends Logging { } def generateRDDs (time: Time) { + SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") logInfo("Generating RDDs for time " + time) outputStreams.foreach(outputStream => { diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index 4cb780c006..72b71d5fab 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -8,7 +8,7 @@ import spark.SparkContext._ import spark.storage.StorageLevel class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( - parent: DStream[(K, V)], + @transient parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean |