diff options
6 files changed, 52 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala index d024d38aa9..14f708a3f8 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/NewHadoopRDD.scala @@ -28,7 +28,9 @@ class NewHadoopRDD[K, V]( @transient conf: Configuration) extends RDD[(K, V)](sc) { - private val serializableConf = new SerializableWritable(conf) + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + // private val serializableConf = new SerializableWritable(conf) private val jobtrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -41,7 +43,7 @@ class NewHadoopRDD[K, V]( @transient private val splits_ : Array[Split] = { val inputFormat = inputFormatClass.newInstance - val jobContext = new JobContext(serializableConf.value, jobId) + val jobContext = new JobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Split](rawSplits.size) for (i <- 0 until rawSplits.size) { @@ -54,9 +56,9 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Split) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopSplit] - val conf = serializableConf.value + val conf = confBroadcast.value.value val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) - val context = new TaskAttemptContext(serializableConf.value, attemptId) + val context = new TaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance val reader = format.createRecordReader(split.serializableHadoopSplit.value, context) reader.initialize(split.serializableHadoopSplit.value, context) diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3973ca1520..7e8098c346 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -284,9 +284,8 @@ extends Logging with Serializable { } -abstract class InputDStream[T: ClassManifest] ( - @transient ssc: StreamingContext) -extends DStream[T](ssc) { +abstract class InputDStream[T: ClassManifest] (@transient ssc: StreamingContext) + extends DStream[T](ssc) { override def dependencies = List() @@ -303,9 +302,9 @@ extends DStream[T](ssc) { */ class MappedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - mapFunc: T => U) -extends DStream[U](parent.ssc) { + @transient parent: DStream[T], + mapFunc: T => U + ) extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -322,9 +321,9 @@ extends DStream[U](parent.ssc) { */ class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - flatMapFunc: T => Traversable[U]) -extends DStream[U](parent.ssc) { + @transient parent: DStream[T], + flatMapFunc: T => Traversable[U] + ) extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -340,8 +339,10 @@ extends DStream[U](parent.ssc) { * TODO */ -class FilteredDStream[T: ClassManifest](parent: DStream[T], filterFunc: T => Boolean) -extends DStream[T](parent.ssc) { +class FilteredDStream[T: ClassManifest]( + @transient parent: DStream[T], + filterFunc: T => Boolean + ) extends DStream[T](parent.ssc) { override def dependencies = List(parent) @@ -358,9 +359,9 @@ extends DStream[T](parent.ssc) { */ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - mapPartFunc: Iterator[T] => Iterator[U]) -extends DStream[U](parent.ssc) { + @transient parent: DStream[T], + mapPartFunc: Iterator[T] => Iterator[U] + ) extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -376,7 +377,8 @@ extends DStream[U](parent.ssc) { * TODO */ -class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { +class GlommedDStream[T: ClassManifest](@transient parent: DStream[T]) + extends DStream[Array[T]](parent.ssc) { override def dependencies = List(parent) @@ -393,7 +395,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array */ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( - parent: DStream[(K,V)], + @transient parent: DStream[(K,V)], createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, @@ -418,7 +420,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( * TODO */ -class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]]) +class UnifiedDStream[T: ClassManifest](@transient parents: Array[DStream[T]]) extends DStream[T](parents(0).ssc) { if (parents.length == 0) { @@ -457,7 +459,7 @@ class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]]) */ class PerElementForEachDStream[T: ClassManifest] ( - parent: DStream[T], + @transient parent: DStream[T], foreachFunc: T => Unit ) extends DStream[Unit](parent.ssc) { @@ -488,7 +490,7 @@ class PerElementForEachDStream[T: ClassManifest] ( */ class PerRDDForEachDStream[T: ClassManifest] ( - parent: DStream[T], + @transient parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { @@ -516,7 +518,7 @@ class PerRDDForEachDStream[T: ClassManifest] ( */ class TransformedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], + @transient parent: DStream[T], transformFunc: (RDD[T], Time) => RDD[U] ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala index de30297c7d..b794159b09 100644 --- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala @@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest]( buffer ++= queue } if (buffer.size > 0) { - Some(new UnionRDD(ssc.sc, buffer.toSeq)) + if (oneAtATime) { + Some(buffer.first) + } else { + Some(new UnionRDD(ssc.sc, buffer.toSeq)) + } } else if (defaultRDD != null) { Some(defaultRDD) } else { @@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest]( } } -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 00136685d5..d2e907378d 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -26,7 +26,6 @@ extends Logging { val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_)) def start() { - val zeroTime = Time(timer.start()) outputStreams.foreach(_.initialize(zeroTime)) inputStreams.par.foreach(_.start()) @@ -40,6 +39,7 @@ extends Logging { } def generateRDDs (time: Time) { + SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") logInfo("Generating RDDs for time " + time) outputStreams.foreach(outputStream => { diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index 4cb780c006..c40f70c91d 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -8,7 +8,7 @@ import spark.SparkContext._ import spark.storage.StorageLevel class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( - parent: DStream[(K, V)], + @transient parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean @@ -26,14 +26,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife override def getOrCompute(time: Time): Option[RDD[(K, S)]] = { generatedRDDs.get(time) match { case Some(oldRDD) => { - if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) { + if (checkpointInterval != null && time > zeroTime && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) { val r = oldRDD val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index) val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) { override val partitioner = oldRDD.partitioner } generatedRDDs.update(time, checkpointedRDD) - logInfo("Updated RDD of time " + time + " with its checkpointed version") + logInfo("Checkpointed RDD " + oldRDD.id + " of time " + time + " with its new RDD " + checkpointedRDD.id) Some(checkpointedRDD) } else { Some(oldRDD) diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index be3188c5ed..3ba07d0448 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -1,11 +1,24 @@ package spark.streaming.examples import spark.util.IntParam +import spark.SparkContext +import spark.SparkContext._ import spark.storage.StorageLevel import spark.streaming._ import spark.streaming.StreamingContext._ +import WordCount2_ExtraFunctions._ + object TopKWordCountRaw { + def moreWarmup(sc: SparkContext) { + (0 until 40).foreach {i => + sc.parallelize(1 to 20000000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + .collect() + } + } + def main(args: Array[String]) { if (args.length != 7) { System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>") @@ -20,16 +33,12 @@ object TopKWordCountRaw { ssc.setBatchDuration(Milliseconds(batchMs)) // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() + moreWarmup(ssc.sc) val rawStreams = (1 to streams).map(_ => ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray val union = new UnifiedDStream(rawStreams) - import WordCount2_ExtraFunctions._ - val windowedCounts = union.mapPartitions(splitAndCountPartitions) .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, |