aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala2
3 files changed, 8 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index de30297c7d..b794159b09 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest](
buffer ++= queue
}
if (buffer.size > 0) {
- Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ if (oneAtATime) {
+ Some(buffer.first)
+ } else {
+ Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ }
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
@@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest](
}
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 00136685d5..d2e907378d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -26,7 +26,6 @@ extends Logging {
val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
def start() {
-
val zeroTime = Time(timer.start())
outputStreams.foreach(_.initialize(zeroTime))
inputStreams.par.foreach(_.start())
@@ -40,6 +39,7 @@ extends Logging {
}
def generateRDDs (time: Time) {
+ SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index 4cb780c006..72b71d5fab 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -8,7 +8,7 @@ import spark.SparkContext._
import spark.storage.StorageLevel
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
- parent: DStream[(K, V)],
+ @transient parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean