aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala2
4 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3fe8e8a4bf..d28f3593fe 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def getStorageLevel = storageLevel
- def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = {
+ def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = {
if (!level.useDisk && level.replication < 2) {
throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
}
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index bab48ff954..f6b53fe2f2 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest](
buffer ++= queue
}
if (buffer.size > 0) {
- Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ if (oneAtATime) {
+ Some(buffer.first)
+ } else {
+ Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ }
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
@@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest](
}
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 12e52bf56c..b4b8e34ec8 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -26,7 +26,6 @@ extends Logging {
val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
def start() {
-
val zeroTime = Time(timer.start())
outputStreams.foreach(_.initialize(zeroTime))
inputStreams.par.foreach(_.start())
@@ -41,6 +40,7 @@ extends Logging {
def generateRDDs (time: Time) {
println("\n-----------------------------------------------------\n")
+ SparkEnv.set(ssc.env)
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
outputStream.generateJob(time) match {
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index f313d8c162..9d3561b4a0 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -7,7 +7,7 @@ import spark.SparkContext._
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
- parent: DStream[(K, V)],
+ @transient parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean