aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-09-07 20:16:21 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2012-09-07 20:16:21 +0000
commitb5750726ff3306e0ea5741141f6cae0eb3449902 (patch)
tree437943f76aecdac1f478c576a94df8311f2cef14 /streaming
parentb8e9e8ea7873dbcb43abd4bb2821ef64f6b45686 (diff)
downloadspark-b5750726ff3306e0ea5741141f6cae0eb3449902.tar.gz
spark-b5750726ff3306e0ea5741141f6cae0eb3449902.tar.bz2
spark-b5750726ff3306e0ea5741141f6cae0eb3449902.zip
Fixed bugs in streaming Scheduler and optimized QueueInputDStream.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala2
3 files changed, 8 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index bab48ff954..f6b53fe2f2 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest](
buffer ++= queue
}
if (buffer.size > 0) {
- Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ if (oneAtATime) {
+ Some(buffer.first)
+ } else {
+ Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ }
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
@@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest](
}
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 12e52bf56c..b4b8e34ec8 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -26,7 +26,6 @@ extends Logging {
val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
def start() {
-
val zeroTime = Time(timer.start())
outputStreams.foreach(_.initialize(zeroTime))
inputStreams.par.foreach(_.start())
@@ -41,6 +40,7 @@ extends Logging {
def generateRDDs (time: Time) {
println("\n-----------------------------------------------------\n")
+ SparkEnv.set(ssc.env)
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
outputStream.generateJob(time) match {
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index f313d8c162..9d3561b4a0 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -7,7 +7,7 @@ import spark.SparkContext._
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
- parent: DStream[(K, V)],
+ @transient parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean