diff options
Diffstat (limited to 'streaming')
6 files changed, 441 insertions, 119 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1d2ecdd341..7f181bcecd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.input.FixedLengthBinaryInputFormat -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ @@ -242,14 +242,33 @@ class StreamingContext private[streaming] ( private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement() /** + * Execute a block of code in a scope such that all new DStreams created in this body will + * be part of the same scope. For more detail, see the comments in `doCompute`. + * + * Note: Return statements are NOT allowed in the given body. + */ + private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body) + + /** + * Execute a block of code in a scope such that all new DStreams created in this body will + * be part of the same scope. For more detail, see the comments in `doCompute`. + * + * Note: Return statements are NOT allowed in the given body. + */ + private[streaming] def withNamedScope[U](name: String)(body: => U): U = { + RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) + } + + /** * Create an input stream with any arbitrary user implemented receiver. * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ @deprecated("Use receiverStream", "1.0.0") - def networkStream[T: ClassTag]( - receiver: Receiver[T]): ReceiverInputDStream[T] = { - receiverStream(receiver) + def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = { + withNamedScope("network stream") { + receiverStream(receiver) + } } /** @@ -257,9 +276,10 @@ class StreamingContext private[streaming] ( * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ - def receiverStream[T: ClassTag]( - receiver: Receiver[T]): ReceiverInputDStream[T] = { - new PluggableInputDStream[T](this, receiver) + def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = { + withNamedScope("receiver stream") { + new PluggableInputDStream[T](this, receiver) + } } /** @@ -279,7 +299,7 @@ class StreamingContext private[streaming] ( name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy - ): ReceiverInputDStream[T] = { + ): ReceiverInputDStream[T] = withNamedScope("actor stream") { receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } @@ -296,7 +316,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[String] = { + ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } @@ -334,7 +354,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[T] = { + ): ReceiverInputDStream[T] = withNamedScope("raw socket stream") { new RawInputDStream[T](this, hostname, port, storageLevel) } @@ -408,7 +428,7 @@ class StreamingContext private[streaming] ( * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ - def textFileStream(directory: String): DStream[String] = { + def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } @@ -430,7 +450,7 @@ class StreamingContext private[streaming] ( @Experimental def binaryRecordsStream( directory: String, - recordLength: Int): DStream[Array[Byte]] = { + recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( @@ -477,7 +497,7 @@ class StreamingContext private[streaming] ( /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ - def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = { + def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope { new UnionDStream[T](streams.toArray) } @@ -488,7 +508,7 @@ class StreamingContext private[streaming] ( def transform[T: ClassTag]( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] - ): DStream[T] = { + ): DStream[T] = withScope { new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 64de7526a6..5977481e1f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -25,12 +25,13 @@ import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.matching.Regex -import org.apache.spark.{Logging, SparkException} -import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD} +import org.apache.spark.{Logging, SparkContext, SparkException} +import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.ui.UIUtils import org.apache.spark.util.{CallSite, MetadataCleaner, Utils} /** @@ -73,7 +74,7 @@ abstract class DStream[T: ClassTag] ( def dependencies: List[DStream[_]] /** Method that generates a RDD for the given time */ - def compute (validTime: Time): Option[RDD[T]] + def compute(validTime: Time): Option[RDD[T]] // ======================================================================= // Methods and fields available on all DStreams @@ -111,6 +112,44 @@ abstract class DStream[T: ClassTag] ( /* Set the creation call site */ private[streaming] val creationSite = DStream.getCreationSite() + /** + * The base scope associated with the operation that created this DStream. + * + * This is the medium through which we pass the DStream operation name (e.g. updatedStateByKey) + * to the RDDs created by this DStream. Note that we never use this scope directly in RDDs. + * Instead, we instantiate a new scope during each call to `compute` based on this one. + * + * This is not defined if the DStream is created outside of one of the public DStream operations. + */ + protected[streaming] val baseScope: Option[String] = { + Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)) + } + + /** + * Make a scope that groups RDDs created in the same DStream operation in the same batch. + * + * Each DStream produces many scopes and each scope may be shared by other DStreams created + * in the same operation. Separate calls to the same DStream operation create separate scopes. + * For instance, `dstream.map(...).map(...)` creates two separate scopes per batch. + */ + private def makeScope(time: Time): Option[RDDOperationScope] = { + baseScope.map { bsJson => + val formattedBatchTime = UIUtils.formatBatchTime( + time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) + val bs = RDDOperationScope.fromJson(bsJson) + val baseName = bs.name // e.g. countByWindow, "kafka stream [0]" + val scopeName = + if (baseName.length > 10) { + // If the operation name is too long, wrap the line + s"$baseName\n@ $formattedBatchTime" + } else { + s"$baseName @ $formattedBatchTime" + } + val scopeId = s"${bs.id}_${time.milliseconds}" + new RDDOperationScope(scopeName, id = scopeId) + } + } + /** Persist the RDDs of this DStream with the given storage level */ def persist(level: StorageLevel): DStream[T] = { if (this.isInitialized) { @@ -295,28 +334,23 @@ abstract class DStream[T: ClassTag] ( * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ - private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { + private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { - // Set the thread-local property for call sites to this DStream's creation site - // such that RDDs generated by compute gets that as their creation site. - // Note that this `getOrCompute` may get called from another DStream which may have - // set its own call site. So we store its call site in a temporary variable, - // set this DStream's creation site, generate RDDs and then restore the previous call site. - val prevCallSite = ssc.sparkContext.getCallSite() - ssc.sparkContext.setCallSite(creationSite) - // Disable checks for existing output directories in jobs launched by the streaming - // scheduler, since we may need to write output to an existing directory during checkpoint - // recovery; see SPARK-4835 for more details. We need to have this call here because - // compute() might cause Spark jobs to be launched. - val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) { - compute(time) + + val rddOption = createRDDWithLocalProperties(time) { + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. We need to have this call here because + // compute() might cause Spark jobs to be launched. + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + compute(time) + } } - ssc.sparkContext.setCallSite(prevCallSite) rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing @@ -338,6 +372,41 @@ abstract class DStream[T: ClassTag] ( } /** + * Wrap a body of code such that the call site and operation scope + * information are passed to the RDDs created in this body properly. + */ + protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = { + val scopeKey = SparkContext.RDD_SCOPE_KEY + val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY + // Pass this DStream's operation scope and creation site information to RDDs through + // thread-local properties in our SparkContext. Since this method may be called from another + // DStream, we need to temporarily store any old scope and creation site information to + // restore them later after setting our own. + val prevCallSite = ssc.sparkContext.getCallSite() + val prevScope = ssc.sparkContext.getLocalProperty(scopeKey) + val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey) + + try { + ssc.sparkContext.setCallSite(creationSite) + // Use the DStream's base scope for this RDD so we can (1) preserve the higher level + // DStream operation name, and (2) share this scope with other DStreams created in the + // same operation. Disallow nesting so that low-level Spark primitives do not show up. + // TODO: merge callsites with scopes so we can just reuse the code there + makeScope(time).foreach { s => + ssc.sparkContext.setLocalProperty(scopeKey, s.toJson) + ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true") + } + + body + } finally { + // Restore any state that was modified before returning + ssc.sparkContext.setCallSite(prevCallSite) + ssc.sparkContext.setLocalProperty(scopeKey, prevScope) + ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, prevScopeNoOverride) + } + } + + /** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this @@ -456,7 +525,7 @@ abstract class DStream[T: ClassTag] ( // ======================================================================= /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[U: ClassTag](mapFunc: T => U): DStream[U] = { + def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope { new MappedDStream(this, context.sparkContext.clean(mapFunc)) } @@ -464,26 +533,31 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = { + def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } /** Return a new DStream containing only the elements that satisfy a predicate. */ - def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) + def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope { + new FilteredDStream(this, filterFunc) + } /** * Return a new DStream in which each RDD is generated by applying glom() to each RDD of * this DStream. Applying glom() to an RDD coalesces all elements within each partition into * an array. */ - def glom(): DStream[Array[T]] = new GlommedDStream(this) - + def glom(): DStream[Array[T]] = ssc.withScope { + new GlommedDStream(this) + } /** * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the * returned DStream has exactly numPartitions partitions. */ - def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) + def repartition(numPartitions: Int): DStream[T] = ssc.withScope { + this.transform(_.repartition(numPartitions)) + } /** * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs @@ -493,7 +567,7 @@ abstract class DStream[T: ClassTag] ( def mapPartitions[U: ClassTag]( mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false - ): DStream[U] = { + ): DStream[U] = ssc.withScope { new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) } @@ -501,14 +575,15 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD has a single element generated by reducing each RDD * of this DStream. */ - def reduce(reduceFunc: (T, T) => T): DStream[T] = + def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope { this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + } /** * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): DStream[Long] = { + def count(): DStream[Long] = ssc.withScope { this.map(_ => (null, 1L)) .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) .reduceByKey(_ + _) @@ -522,15 +597,16 @@ abstract class DStream[T: ClassTag] ( * `numPartitions` not specified). */ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null) - : DStream[(T, Long)] = + : DStream[(T, Long)] = ssc.withScope { this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: RDD[T] => Unit): Unit = { + def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { this.foreachRDD(foreachFunc) } @@ -539,7 +615,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = { + def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope { this.foreachRDD(foreachFunc) } @@ -547,7 +623,7 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreachRDD(foreachFunc: RDD[T] => Unit) { + def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) } @@ -555,7 +631,7 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { + def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -566,7 +642,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -578,7 +654,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -596,7 +672,7 @@ abstract class DStream[T: ClassTag] ( */ def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] - ): DStream[V] = { + ): DStream[V] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -610,7 +686,7 @@ abstract class DStream[T: ClassTag] ( */ def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] - ): DStream[V] = { + ): DStream[V] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -628,7 +704,7 @@ abstract class DStream[T: ClassTag] ( * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ - def print() { + def print(): Unit = ssc.withScope { print(10) } @@ -636,7 +712,7 @@ abstract class DStream[T: ClassTag] ( * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ - def print(num: Int) { + def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) @@ -668,7 +744,7 @@ abstract class DStream[T: ClassTag] ( * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { + def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope { new WindowedDStream(this, windowDuration, slideDuration) } @@ -686,7 +762,7 @@ abstract class DStream[T: ClassTag] ( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration - ): DStream[T] = { + ): DStream[T] = ssc.withScope { this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) } @@ -711,7 +787,7 @@ abstract class DStream[T: ClassTag] ( invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration - ): DStream[T] = { + ): DStream[T] = ssc.withScope { this.map(x => (1, x)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) @@ -727,7 +803,9 @@ abstract class DStream[T: ClassTag] ( * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { + def countByWindow( + windowDuration: Duration, + slideDuration: Duration): DStream[Long] = ssc.withScope { this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) } @@ -748,8 +826,7 @@ abstract class DStream[T: ClassTag] ( slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering[T] = null) - : DStream[(T, Long)] = - { + : DStream[(T, Long)] = ssc.withScope { this.map(x => (x, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, @@ -764,19 +841,21 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. */ - def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) + def union(that: DStream[T]): DStream[T] = ssc.withScope { + new UnionDStream[T](Array(this, that)) + } /** * Return all the RDDs defined by the Interval object (both end times included) */ - def slice(interval: Interval): Seq[RDD[T]] = { + def slice(interval: Interval): Seq[RDD[T]] = ssc.withScope { slice(interval.beginTime, interval.endTime) } /** * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ - def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { + def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope { if (!isInitialized) { throw new SparkException(this + " has not been initialized") } @@ -810,7 +889,7 @@ abstract class DStream[T: ClassTag] ( * The file name at each batch interval is generated based on `prefix` and * `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsObjectFiles(prefix: String, suffix: String = "") { + def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) @@ -823,7 +902,7 @@ abstract class DStream[T: ClassTag] ( * of elements. The file name at each batch interval is generated based on * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsTextFiles(prefix: String, suffix: String = "") { + def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 685a32e1d2..c109ceccc6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] ( override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => - val jobFunc = () => { + val jobFunc = () => createRDDWithLocalProperties(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 9716adb628..d58c99a8ff 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -17,10 +17,13 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Time, Duration, StreamingContext} - import scala.reflect.ClassTag +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDDOperationScope +import org.apache.spark.streaming.{Time, Duration, StreamingContext} +import org.apache.spark.util.Utils + /** * This is the abstract base class for all input streams. This class provides methods * start() and stop() which is called by Spark Streaming system to start and stop receiving data. @@ -44,10 +47,31 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) /** This is an unique identifier for the input stream. */ val id = ssc.getNewInputStreamId() + /** A human-readable name of this InputDStream */ + private[streaming] def name: String = { + // e.g. FlumePollingDStream -> "Flume polling stream" + val newName = Utils.getFormattedClassName(this) + .replaceAll("InputDStream", "Stream") + .split("(?=[A-Z])") + .filter(_.nonEmpty) + .mkString(" ") + .toLowerCase + .capitalize + s"$newName [$id]" + } + /** - * The name of this InputDStream. By default, it's the class name with its id. + * The base scope associated with the operation that created this DStream. + * + * For InputDStreams, we use the name of this DStream as the scope name. + * If an outer scope is given, we assume that it includes an alternative name for this stream. */ - private[streaming] def name: String = s"${getClass.getSimpleName}-$id" + protected[streaming] override val baseScope: Option[String] = { + val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)) + .map { json => RDDOperationScope.fromJson(json).name + s" [$id]" } + .getOrElse(name.toLowerCase) + Some(new RDDOperationScope(scopeName).toJson) + } /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 8a58571632..884a8e8b52 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -46,7 +46,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): DStream[(K, Iterable[V])] = { + def groupByKey(): DStream[(K, Iterable[V])] = ssc.withScope { groupByKey(defaultPartitioner()) } @@ -54,7 +54,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = { + def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = ssc.withScope { groupByKey(defaultPartitioner(numPartitions)) } @@ -62,7 +62,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` on each RDD. The supplied * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ - def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = { + def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope { val createCombiner = (v: V) => ArrayBuffer[V](v) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) @@ -75,7 +75,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ - def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner()) } @@ -84,7 +84,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs * with `numPartitions` partitions. */ - def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { + def reduceByKey( + reduceFunc: (V, V) => V, + numPartitions: Int): DStream[(K, V)] = ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } @@ -93,7 +95,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ - def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { + def reduceByKey( + reduceFunc: (V, V) => V, + partitioner: Partitioner): DStream[(K, V)] = ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } @@ -104,11 +108,11 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information. */ def combineByKey[C: ClassTag]( - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiner: (C, C) => C, - partitioner: Partitioner, - mapSideCombine: Boolean = true): DStream[(K, C)] = { + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner, + mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope { new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) } @@ -121,7 +125,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ - def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = { + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = ssc.withScope { groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) } @@ -136,8 +140,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * DStream's batching interval */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : DStream[(K, Iterable[V])] = - { + : DStream[(K, Iterable[V])] = ssc.withScope { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -157,7 +160,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, Iterable[V])] = { + ): DStream[(K, Iterable[V])] = ssc.withScope { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -176,7 +179,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, Iterable[V])] = { + ): DStream[(K, Iterable[V])] = ssc.withScope { val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 @@ -198,7 +201,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) } @@ -217,7 +220,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) } @@ -238,7 +241,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -260,7 +263,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) self.reduceByKey(cleanedReduceFunc, partitioner) .window(windowDuration, slideDuration) @@ -294,8 +297,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration = self.slideDuration, numPartitions: Int = ssc.sc.defaultParallelism, filterFunc: ((K, V)) => Boolean = null - ): DStream[(K, V)] = { - + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions), filterFunc @@ -328,7 +330,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration, partitioner: Partitioner, filterFunc: ((K, V)) => Boolean - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) @@ -349,7 +351,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) } @@ -365,7 +367,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) } @@ -382,7 +384,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -406,7 +408,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) } @@ -425,7 +427,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)] - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -451,7 +453,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) partitioner: Partitioner, rememberPartitioner: Boolean, initialRDD: RDD[(K, S)] - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, Some(initialRDD)) } @@ -460,7 +462,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying a map function to the value of each key-value pairs in * 'this' DStream without changing the key. */ - def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { + def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope { new MapValuedDStream[K, V, U](self, mapValuesFunc) } @@ -470,7 +472,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] - ): DStream[(K, U)] = { + ): DStream[(K, U)] = ssc.withScope { new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) } @@ -479,7 +481,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ - def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = { + def cogroup[W: ClassTag]( + other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope { cogroup(other, defaultPartitioner()) } @@ -487,8 +490,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int) - : DStream[(K, (Iterable[V], Iterable[W]))] = { + def cogroup[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope { cogroup(other, defaultPartitioner(numPartitions)) } @@ -499,7 +503,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Iterable[V], Iterable[W]))] = { + ): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) @@ -510,7 +514,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ - def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope { join[W](other, defaultPartitioner()) } @@ -518,7 +522,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + def join[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope { join[W](other, defaultPartitioner(numPartitions)) } @@ -529,7 +535,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def join[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (V, W))] = { + ): DStream[(K, (V, W))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) @@ -541,7 +547,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ - def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassTag]( + other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = ssc.withScope { leftOuterJoin[W](other, defaultPartitioner()) } @@ -553,7 +560,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (V, Option[W]))] = { + ): DStream[(K, (V, Option[W]))] = ssc.withScope { leftOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -565,7 +572,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (V, Option[W]))] = { + ): DStream[(K, (V, Option[W]))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) @@ -577,7 +584,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ - def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassTag]( + other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = ssc.withScope { rightOuterJoin[W](other, defaultPartitioner()) } @@ -589,7 +597,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], W))] = { + ): DStream[(K, (Option[V], W))] = ssc.withScope { rightOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -601,7 +609,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Option[V], W))] = { + ): DStream[(K, (Option[V], W))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) @@ -613,7 +621,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ - def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = { + def fullOuterJoin[W: ClassTag]( + other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = ssc.withScope { fullOuterJoin[W](other, defaultPartitioner()) } @@ -625,7 +634,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def fullOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], Option[W]))] = { + ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope { fullOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -637,7 +646,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def fullOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Option[V], Option[W]))] = { + ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner) @@ -651,7 +660,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]) { + )(implicit fm: ClassTag[F]): Unit = ssc.withScope { saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -667,7 +676,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration) - ) { + ): Unit = ssc.withScope { // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { @@ -684,7 +693,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]) { + )(implicit fm: ClassTag[F]): Unit = ssc.withScope { saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -700,7 +709,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = ssc.sparkContext.hadoopConfiguration - ) { + ): Unit = ssc.withScope { // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala new file mode 100644 index 0000000000..3929331020 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.{RDD, RDDOperationScope} +import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.streaming.ui.UIUtils + +/** + * Tests whether scope information is passed from DStream operations to RDDs correctly. + */ +class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + private var ssc: StreamingContext = null + private val batchDuration: Duration = Seconds(1) + + override def beforeAll(): Unit = { + ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration) + } + + override def afterAll(): Unit = { + ssc.stop(stopSparkContext = true) + } + + before { assertPropertiesNotSet() } + after { assertPropertiesNotSet() } + + test("dstream without scope") { + val dummyStream = new DummyDStream(ssc) + dummyStream.initialize(Time(0)) + + // This DStream is not instantiated in any scope, so all RDDs + // created by this stream should similarly not have a scope + assert(dummyStream.baseScope === None) + assert(dummyStream.getOrCompute(Time(1000)).get.scope === None) + assert(dummyStream.getOrCompute(Time(2000)).get.scope === None) + assert(dummyStream.getOrCompute(Time(3000)).get.scope === None) + } + + test("input dstream without scope") { + val inputStream = new DummyInputDStream(ssc) + inputStream.initialize(Time(0)) + + val baseScope = inputStream.baseScope.map(RDDOperationScope.fromJson) + val scope1 = inputStream.getOrCompute(Time(1000)).get.scope + val scope2 = inputStream.getOrCompute(Time(2000)).get.scope + val scope3 = inputStream.getOrCompute(Time(3000)).get.scope + + // This DStream is not instantiated in any scope, so all RDDs + assertDefined(baseScope, scope1, scope2, scope3) + assert(baseScope.get.name.startsWith("dummy stream")) + assertScopeCorrect(baseScope.get, scope1.get, 1000) + assertScopeCorrect(baseScope.get, scope2.get, 2000) + assertScopeCorrect(baseScope.get, scope3.get, 3000) + } + + test("scoping simple operations") { + val inputStream = new DummyInputDStream(ssc) + val mappedStream = inputStream.map { i => i + 1 } + val filteredStream = mappedStream.filter { i => i % 2 == 0 } + filteredStream.initialize(Time(0)) + + val mappedScopeBase = mappedStream.baseScope.map(RDDOperationScope.fromJson) + val mappedScope1 = mappedStream.getOrCompute(Time(1000)).get.scope + val mappedScope2 = mappedStream.getOrCompute(Time(2000)).get.scope + val mappedScope3 = mappedStream.getOrCompute(Time(3000)).get.scope + val filteredScopeBase = filteredStream.baseScope.map(RDDOperationScope.fromJson) + val filteredScope1 = filteredStream.getOrCompute(Time(1000)).get.scope + val filteredScope2 = filteredStream.getOrCompute(Time(2000)).get.scope + val filteredScope3 = filteredStream.getOrCompute(Time(3000)).get.scope + + // These streams are defined in their respective scopes "map" and "filter", so all + // RDDs created by these streams should inherit the IDs and names of their parent + // DStream's base scopes + assertDefined(mappedScopeBase, mappedScope1, mappedScope2, mappedScope3) + assertDefined(filteredScopeBase, filteredScope1, filteredScope2, filteredScope3) + assert(mappedScopeBase.get.name === "map") + assert(filteredScopeBase.get.name === "filter") + assertScopeCorrect(mappedScopeBase.get, mappedScope1.get, 1000) + assertScopeCorrect(mappedScopeBase.get, mappedScope2.get, 2000) + assertScopeCorrect(mappedScopeBase.get, mappedScope3.get, 3000) + assertScopeCorrect(filteredScopeBase.get, filteredScope1.get, 1000) + assertScopeCorrect(filteredScopeBase.get, filteredScope2.get, 2000) + assertScopeCorrect(filteredScopeBase.get, filteredScope3.get, 3000) + } + + test("scoping nested operations") { + val inputStream = new DummyInputDStream(ssc) + val countStream = inputStream.countByWindow(Seconds(10), Seconds(1)) + countStream.initialize(Time(0)) + + val countScopeBase = countStream.baseScope.map(RDDOperationScope.fromJson) + val countScope1 = countStream.getOrCompute(Time(1000)).get.scope + val countScope2 = countStream.getOrCompute(Time(2000)).get.scope + val countScope3 = countStream.getOrCompute(Time(3000)).get.scope + + // Assert that all children RDDs inherit the DStream operation name correctly + assertDefined(countScopeBase, countScope1, countScope2, countScope3) + assert(countScopeBase.get.name === "countByWindow") + assertScopeCorrect(countScopeBase.get, countScope1.get, 1000) + assertScopeCorrect(countScopeBase.get, countScope2.get, 2000) + assertScopeCorrect(countScopeBase.get, countScope3.get, 3000) + + // All streams except the input stream should share the same scopes as `countStream` + def testStream(stream: DStream[_]): Unit = { + if (stream != inputStream) { + val myScopeBase = stream.baseScope.map(RDDOperationScope.fromJson) + val myScope1 = stream.getOrCompute(Time(1000)).get.scope + val myScope2 = stream.getOrCompute(Time(2000)).get.scope + val myScope3 = stream.getOrCompute(Time(3000)).get.scope + assertDefined(myScopeBase, myScope1, myScope2, myScope3) + assert(myScopeBase === countScopeBase) + assert(myScope1 === countScope1) + assert(myScope2 === countScope2) + assert(myScope3 === countScope3) + // Climb upwards to test the parent streams + stream.dependencies.foreach(testStream) + } + } + testStream(countStream) + } + + /** Assert that the RDD operation scope properties are not set in our SparkContext. */ + private def assertPropertiesNotSet(): Unit = { + assert(ssc != null) + assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) == null) + assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY) == null) + } + + /** Assert that the given RDD scope inherits the name and ID of the base scope correctly. */ + private def assertScopeCorrect( + baseScope: RDDOperationScope, + rddScope: RDDOperationScope, + batchTime: Long): Unit = { + assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime) + } + + /** Assert that the given RDD scope inherits the base name and ID correctly. */ + private def assertScopeCorrect( + baseScopeId: String, + baseScopeName: String, + rddScope: RDDOperationScope, + batchTime: Long): Unit = { + val formattedBatchTime = UIUtils.formatBatchTime( + batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) + assert(rddScope.id === s"${baseScopeId}_$batchTime") + assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime") + } + + /** Assert that all the specified options are defined. */ + private def assertDefined[T](options: Option[T]*): Unit = { + options.zipWithIndex.foreach { case (o, i) => assert(o.isDefined, s"Option $i was empty!") } + } + +} + +/** + * A dummy stream that does absolutely nothing. + */ +private class DummyDStream(ssc: StreamingContext) extends DStream[Int](ssc) { + override def dependencies: List[DStream[Int]] = List.empty + override def slideDuration: Duration = Seconds(1) + override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int]) +} + +/** + * A dummy input stream that does absolutely nothing. + */ +private class DummyInputDStream(ssc: StreamingContext) extends InputDStream[Int](ssc) { + override def start(): Unit = { } + override def stop(): Unit = { } + override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int]) +} |