aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-05-18 14:33:33 -0700
committerAndrew Or <andrew@databricks.com>2015-05-18 14:33:33 -0700
commitb93c97d79b42a06b48d2a8d98beccc636442541e (patch)
tree3fd6faf28faf59fc1d6dfccae180ce7c212958f6 /streaming
parentfcf90b75ccf222bd2f1939addc3f8f052d2bd3ff (diff)
downloadspark-b93c97d79b42a06b48d2a8d98beccc636442541e.tar.gz
spark-b93c97d79b42a06b48d2a8d98beccc636442541e.tar.bz2
spark-b93c97d79b42a06b48d2a8d98beccc636442541e.zip
[SPARK-7501] [STREAMING] DAG visualization: show DStream operations
This is similar to #5999, but for streaming. Roughly 200 lines are tests. One thing to note here is that we already do some kind of scoping thing for call sites, so this patch adds the new RDD operation scoping logic in the same place. Also, this patch adds a `try finally` block to set the relevant variables in a safer way. tdas zsxwing ------------------------ **Before** <img src="https://cloud.githubusercontent.com/assets/2133137/7625996/d88211b8-f9b4-11e4-90b9-e11baa52d6d7.png" width="450px"/> -------------------------- **After** <img src="https://cloud.githubusercontent.com/assets/2133137/7625997/e0878f8c-f9b4-11e4-8df3-7dd611b13c87.png" width="650px"/> Author: Andrew Or <andrew@databricks.com> Closes #6034 from andrewor14/dag-viz-streaming and squashes the following commits: 932a64a [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming e685df9 [Andrew Or] Rename createRDDWith 84d0656 [Andrew Or] Review feedback 697c086 [Andrew Or] Fix tests 53b9936 [Andrew Or] Set scopes for foreachRDD properly 1881802 [Andrew Or] Refactor DStream scope names again af4ba8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming fd07d22 [Andrew Or] Make MQTT lower case f6de871 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 0ca1801 [Andrew Or] Remove a few unnecessary withScopes on aliases fa4e5fb [Andrew Or] Pass in input stream name rather than defining it from within 1af0b0e [Andrew Or] Fix style 074c00b [Andrew Or] Review comments d25a324 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming e4a93ac [Andrew Or] Fix tests? 25416dc [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 9113183 [Andrew Or] Add tests for DStream scopes b3806ab [Andrew Or] Fix test bb80bbb [Andrew Or] Fix MIMA? 5c30360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 5703939 [Andrew Or] Rename operations that create InputDStreams 7c4513d [Andrew Or] Group RDDs by DStream operations and batches bf0ab6e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 05c2676 [Andrew Or] Wrap many more methods in withScope c121047 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 65ef3e9 [Andrew Or] Fix NPE a0d3263 [Andrew Or] Scope streaming operations instead of RDD operations
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala48
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala177
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala32
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala111
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala190
6 files changed, 441 insertions, 119 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1d2ecdd341..7f181bcecd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.input.FixedLengthBinaryInputFormat
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
@@ -242,14 +242,33 @@ class StreamingContext private[streaming] (
private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
/**
+ * Execute a block of code in a scope such that all new DStreams created in this body will
+ * be part of the same scope. For more detail, see the comments in `doCompute`.
+ *
+ * Note: Return statements are NOT allowed in the given body.
+ */
+ private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body)
+
+ /**
+ * Execute a block of code in a scope such that all new DStreams created in this body will
+ * be part of the same scope. For more detail, see the comments in `doCompute`.
+ *
+ * Note: Return statements are NOT allowed in the given body.
+ */
+ private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
+ RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
+ }
+
+ /**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@deprecated("Use receiverStream", "1.0.0")
- def networkStream[T: ClassTag](
- receiver: Receiver[T]): ReceiverInputDStream[T] = {
- receiverStream(receiver)
+ def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ withNamedScope("network stream") {
+ receiverStream(receiver)
+ }
}
/**
@@ -257,9 +276,10 @@ class StreamingContext private[streaming] (
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
- def receiverStream[T: ClassTag](
- receiver: Receiver[T]): ReceiverInputDStream[T] = {
- new PluggableInputDStream[T](this, receiver)
+ def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ withNamedScope("receiver stream") {
+ new PluggableInputDStream[T](this, receiver)
+ }
}
/**
@@ -279,7 +299,7 @@ class StreamingContext private[streaming] (
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
- ): ReceiverInputDStream[T] = {
+ ): ReceiverInputDStream[T] = withNamedScope("actor stream") {
receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
@@ -296,7 +316,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[String] = {
+ ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
@@ -334,7 +354,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[T] = {
+ ): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
new RawInputDStream[T](this, hostname, port, storageLevel)
}
@@ -408,7 +428,7 @@ class StreamingContext private[streaming] (
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
- def textFileStream(directory: String): DStream[String] = {
+ def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
@@ -430,7 +450,7 @@ class StreamingContext private[streaming] (
@Experimental
def binaryRecordsStream(
directory: String,
- recordLength: Int): DStream[Array[Byte]] = {
+ recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
@@ -477,7 +497,7 @@ class StreamingContext private[streaming] (
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
- def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
+ def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
new UnionDStream[T](streams.toArray)
}
@@ -488,7 +508,7 @@ class StreamingContext private[streaming] (
def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
- ): DStream[T] = {
+ ): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 64de7526a6..5977481e1f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -25,12 +25,13 @@ import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.matching.Regex
-import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
+import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
/**
@@ -73,7 +74,7 @@ abstract class DStream[T: ClassTag] (
def dependencies: List[DStream[_]]
/** Method that generates a RDD for the given time */
- def compute (validTime: Time): Option[RDD[T]]
+ def compute(validTime: Time): Option[RDD[T]]
// =======================================================================
// Methods and fields available on all DStreams
@@ -111,6 +112,44 @@ abstract class DStream[T: ClassTag] (
/* Set the creation call site */
private[streaming] val creationSite = DStream.getCreationSite()
+ /**
+ * The base scope associated with the operation that created this DStream.
+ *
+ * This is the medium through which we pass the DStream operation name (e.g. updatedStateByKey)
+ * to the RDDs created by this DStream. Note that we never use this scope directly in RDDs.
+ * Instead, we instantiate a new scope during each call to `compute` based on this one.
+ *
+ * This is not defined if the DStream is created outside of one of the public DStream operations.
+ */
+ protected[streaming] val baseScope: Option[String] = {
+ Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
+ }
+
+ /**
+ * Make a scope that groups RDDs created in the same DStream operation in the same batch.
+ *
+ * Each DStream produces many scopes and each scope may be shared by other DStreams created
+ * in the same operation. Separate calls to the same DStream operation create separate scopes.
+ * For instance, `dstream.map(...).map(...)` creates two separate scopes per batch.
+ */
+ private def makeScope(time: Time): Option[RDDOperationScope] = {
+ baseScope.map { bsJson =>
+ val formattedBatchTime = UIUtils.formatBatchTime(
+ time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+ val bs = RDDOperationScope.fromJson(bsJson)
+ val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
+ val scopeName =
+ if (baseName.length > 10) {
+ // If the operation name is too long, wrap the line
+ s"$baseName\n@ $formattedBatchTime"
+ } else {
+ s"$baseName @ $formattedBatchTime"
+ }
+ val scopeId = s"${bs.id}_${time.milliseconds}"
+ new RDDOperationScope(scopeName, id = scopeId)
+ }
+ }
+
/** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
@@ -295,28 +334,23 @@ abstract class DStream[T: ClassTag] (
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
- private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
+ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
- // Set the thread-local property for call sites to this DStream's creation site
- // such that RDDs generated by compute gets that as their creation site.
- // Note that this `getOrCompute` may get called from another DStream which may have
- // set its own call site. So we store its call site in a temporary variable,
- // set this DStream's creation site, generate RDDs and then restore the previous call site.
- val prevCallSite = ssc.sparkContext.getCallSite()
- ssc.sparkContext.setCallSite(creationSite)
- // Disable checks for existing output directories in jobs launched by the streaming
- // scheduler, since we may need to write output to an existing directory during checkpoint
- // recovery; see SPARK-4835 for more details. We need to have this call here because
- // compute() might cause Spark jobs to be launched.
- val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
- compute(time)
+
+ val rddOption = createRDDWithLocalProperties(time) {
+ // Disable checks for existing output directories in jobs launched by the streaming
+ // scheduler, since we may need to write output to an existing directory during checkpoint
+ // recovery; see SPARK-4835 for more details. We need to have this call here because
+ // compute() might cause Spark jobs to be launched.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ compute(time)
+ }
}
- ssc.sparkContext.setCallSite(prevCallSite)
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
@@ -338,6 +372,41 @@ abstract class DStream[T: ClassTag] (
}
/**
+ * Wrap a body of code such that the call site and operation scope
+ * information are passed to the RDDs created in this body properly.
+ */
+ protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+ val scopeKey = SparkContext.RDD_SCOPE_KEY
+ val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
+ // Pass this DStream's operation scope and creation site information to RDDs through
+ // thread-local properties in our SparkContext. Since this method may be called from another
+ // DStream, we need to temporarily store any old scope and creation site information to
+ // restore them later after setting our own.
+ val prevCallSite = ssc.sparkContext.getCallSite()
+ val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
+ val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
+
+ try {
+ ssc.sparkContext.setCallSite(creationSite)
+ // Use the DStream's base scope for this RDD so we can (1) preserve the higher level
+ // DStream operation name, and (2) share this scope with other DStreams created in the
+ // same operation. Disallow nesting so that low-level Spark primitives do not show up.
+ // TODO: merge callsites with scopes so we can just reuse the code there
+ makeScope(time).foreach { s =>
+ ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+ }
+
+ body
+ } finally {
+ // Restore any state that was modified before returning
+ ssc.sparkContext.setCallSite(prevCallSite)
+ ssc.sparkContext.setLocalProperty(scopeKey, prevScope)
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, prevScopeNoOverride)
+ }
+ }
+
+ /**
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
@@ -456,7 +525,7 @@ abstract class DStream[T: ClassTag] (
// =======================================================================
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
+ def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
@@ -464,26 +533,31 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
+ def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
/** Return a new DStream containing only the elements that satisfy a predicate. */
- def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+ def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
+ new FilteredDStream(this, filterFunc)
+ }
/**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
- def glom(): DStream[Array[T]] = new GlommedDStream(this)
-
+ def glom(): DStream[Array[T]] = ssc.withScope {
+ new GlommedDStream(this)
+ }
/**
* Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
* returned DStream has exactly numPartitions partitions.
*/
- def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
+ def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
+ this.transform(_.repartition(numPartitions))
+ }
/**
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
@@ -493,7 +567,7 @@ abstract class DStream[T: ClassTag] (
def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
- ): DStream[U] = {
+ ): DStream[U] = ssc.withScope {
new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
}
@@ -501,14 +575,15 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
- def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ }
/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Long] = {
+ def count(): DStream[Long] = ssc.withScope {
this.map(_ => (null, 1L))
.transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
.reduceByKey(_ + _)
@@ -522,15 +597,16 @@ abstract class DStream[T: ClassTag] (
* `numPartitions` not specified).
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
- : DStream[(T, Long)] =
+ : DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+ }
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: RDD[T] => Unit): Unit = {
+ def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
}
@@ -539,7 +615,7 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = {
+ def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
}
@@ -547,7 +623,7 @@ abstract class DStream[T: ClassTag] (
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreachRDD(foreachFunc: RDD[T] => Unit) {
+ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
}
@@ -555,7 +631,7 @@ abstract class DStream[T: ClassTag] (
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
+ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -566,7 +642,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -578,7 +654,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -596,7 +672,7 @@ abstract class DStream[T: ClassTag] (
*/
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
- ): DStream[V] = {
+ ): DStream[V] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -610,7 +686,7 @@ abstract class DStream[T: ClassTag] (
*/
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
- ): DStream[V] = {
+ ): DStream[V] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -628,7 +704,7 @@ abstract class DStream[T: ClassTag] (
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
- def print() {
+ def print(): Unit = ssc.withScope {
print(10)
}
@@ -636,7 +712,7 @@ abstract class DStream[T: ClassTag] (
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
- def print(num: Int) {
+ def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
@@ -668,7 +744,7 @@ abstract class DStream[T: ClassTag] (
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
new WindowedDStream(this, windowDuration, slideDuration)
}
@@ -686,7 +762,7 @@ abstract class DStream[T: ClassTag] (
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
- ): DStream[T] = {
+ ): DStream[T] = ssc.withScope {
this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
@@ -711,7 +787,7 @@ abstract class DStream[T: ClassTag] (
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
- ): DStream[T] = {
+ ): DStream[T] = ssc.withScope {
this.map(x => (1, x))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
@@ -727,7 +803,9 @@ abstract class DStream[T: ClassTag] (
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+ def countByWindow(
+ windowDuration: Duration,
+ slideDuration: Duration): DStream[Long] = ssc.withScope {
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
@@ -748,8 +826,7 @@ abstract class DStream[T: ClassTag] (
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null)
- : DStream[(T, Long)] =
- {
+ : DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
@@ -764,19 +841,21 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
- def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
+ def union(that: DStream[T]): DStream[T] = ssc.withScope {
+ new UnionDStream[T](Array(this, that))
+ }
/**
* Return all the RDDs defined by the Interval object (both end times included)
*/
- def slice(interval: Interval): Seq[RDD[T]] = {
+ def slice(interval: Interval): Seq[RDD[T]] = ssc.withScope {
slice(interval.beginTime, interval.endTime)
}
/**
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
- def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope {
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
@@ -810,7 +889,7 @@ abstract class DStream[T: ClassTag] (
* The file name at each batch interval is generated based on `prefix` and
* `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsObjectFiles(prefix: String, suffix: String = "") {
+ def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
@@ -823,7 +902,7 @@ abstract class DStream[T: ClassTag] (
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsTextFiles(prefix: String, suffix: String = "") {
+ def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 685a32e1d2..c109ceccc6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] (
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
- val jobFunc = () => {
+ val jobFunc = () => createRDDWithLocalProperties(time) {
ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 9716adb628..d58c99a8ff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -17,10 +17,13 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Time, Duration, StreamingContext}
-
import scala.reflect.ClassTag
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.streaming.{Time, Duration, StreamingContext}
+import org.apache.spark.util.Utils
+
/**
* This is the abstract base class for all input streams. This class provides methods
* start() and stop() which is called by Spark Streaming system to start and stop receiving data.
@@ -44,10 +47,31 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()
+ /** A human-readable name of this InputDStream */
+ private[streaming] def name: String = {
+ // e.g. FlumePollingDStream -> "Flume polling stream"
+ val newName = Utils.getFormattedClassName(this)
+ .replaceAll("InputDStream", "Stream")
+ .split("(?=[A-Z])")
+ .filter(_.nonEmpty)
+ .mkString(" ")
+ .toLowerCase
+ .capitalize
+ s"$newName [$id]"
+ }
+
/**
- * The name of this InputDStream. By default, it's the class name with its id.
+ * The base scope associated with the operation that created this DStream.
+ *
+ * For InputDStreams, we use the name of this DStream as the scope name.
+ * If an outer scope is given, we assume that it includes an alternative name for this stream.
*/
- private[streaming] def name: String = s"${getClass.getSimpleName}-$id"
+ protected[streaming] override val baseScope: Option[String] = {
+ val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
+ .map { json => RDDOperationScope.fromJson(json).name + s" [$id]" }
+ .getOrElse(name.toLowerCase)
+ Some(new RDDOperationScope(scopeName).toJson)
+ }
/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 8a58571632..884a8e8b52 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -46,7 +46,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
- def groupByKey(): DStream[(K, Iterable[V])] = {
+ def groupByKey(): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKey(defaultPartitioner())
}
@@ -54,7 +54,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
+ def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKey(defaultPartitioner(numPartitions))
}
@@ -62,7 +62,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` on each RDD. The supplied
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
- def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
+ def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
@@ -75,7 +75,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
- def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
+ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner())
}
@@ -84,7 +84,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
- def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
+ def reduceByKey(
+ reduceFunc: (V, V) => V,
+ numPartitions: Int): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
@@ -93,7 +95,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
- def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
+ def reduceByKey(
+ reduceFunc: (V, V) => V,
+ partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
@@ -104,11 +108,11 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
*/
def combineByKey[C: ClassTag](
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiner: (C, C) => C,
- partitioner: Partitioner,
- mapSideCombine: Boolean = true): DStream[(K, C)] = {
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
mapSideCombine)
}
@@ -121,7 +125,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = {
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
}
@@ -136,8 +140,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* DStream's batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
- : DStream[(K, Iterable[V])] =
- {
+ : DStream[(K, Iterable[V])] = ssc.withScope {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
@@ -157,7 +160,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
- ): DStream[(K, Iterable[V])] = {
+ ): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
@@ -176,7 +179,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ): DStream[(K, Iterable[V])] = {
+ ): DStream[(K, Iterable[V])] = ssc.withScope {
val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v
val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v
val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
@@ -198,7 +201,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
}
@@ -217,7 +220,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
@@ -238,7 +241,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
defaultPartitioner(numPartitions))
}
@@ -260,7 +263,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
self.reduceByKey(cleanedReduceFunc, partitioner)
.window(windowDuration, slideDuration)
@@ -294,8 +297,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
slideDuration: Duration = self.slideDuration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
- ): DStream[(K, V)] = {
-
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
@@ -328,7 +330,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
slideDuration: Duration,
partitioner: Partitioner,
filterFunc: ((K, V)) => Boolean
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
@@ -349,7 +351,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
@@ -365,7 +367,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
}
@@ -382,7 +384,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
@@ -406,7 +408,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
}
@@ -425,7 +427,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
initialRDD: RDD[(K, S)]
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
@@ -451,7 +453,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
partitioner: Partitioner,
rememberPartitioner: Boolean,
initialRDD: RDD[(K, S)]
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
rememberPartitioner, Some(initialRDD))
}
@@ -460,7 +462,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying a map function to the value of each key-value pairs in
* 'this' DStream without changing the key.
*/
- def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
+ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
@@ -470,7 +472,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
*/
def flatMapValues[U: ClassTag](
flatMapValuesFunc: V => TraversableOnce[U]
- ): DStream[(K, U)] = {
+ ): DStream[(K, U)] = ssc.withScope {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
@@ -479,7 +481,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = {
+ def cogroup[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
cogroup(other, defaultPartitioner())
}
@@ -487,8 +490,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
- : DStream[(K, (Iterable[V], Iterable[W]))] = {
+ def cogroup[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
cogroup(other, defaultPartitioner(numPartitions))
}
@@ -499,7 +503,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Iterable[V], Iterable[W]))] = {
+ ): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
@@ -510,7 +514,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
- def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
join[W](other, defaultPartitioner())
}
@@ -518,7 +522,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope {
join[W](other, defaultPartitioner(numPartitions))
}
@@ -529,7 +535,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def join[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (V, W))] = {
+ ): DStream[(K, (V, W))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
@@ -541,7 +547,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = ssc.withScope {
leftOuterJoin[W](other, defaultPartitioner())
}
@@ -553,7 +560,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def leftOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
- ): DStream[(K, (V, Option[W]))] = {
+ ): DStream[(K, (V, Option[W]))] = ssc.withScope {
leftOuterJoin[W](other, defaultPartitioner(numPartitions))
}
@@ -565,7 +572,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def leftOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (V, Option[W]))] = {
+ ): DStream[(K, (V, Option[W]))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
@@ -577,7 +584,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+ def rightOuterJoin[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = ssc.withScope {
rightOuterJoin[W](other, defaultPartitioner())
}
@@ -589,7 +597,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def rightOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
- ): DStream[(K, (Option[V], W))] = {
+ ): DStream[(K, (Option[V], W))] = ssc.withScope {
rightOuterJoin[W](other, defaultPartitioner(numPartitions))
}
@@ -601,7 +609,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def rightOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Option[V], W))] = {
+ ): DStream[(K, (Option[V], W))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
@@ -613,7 +621,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
+ def fullOuterJoin[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
fullOuterJoin[W](other, defaultPartitioner())
}
@@ -625,7 +634,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
- ): DStream[(K, (Option[V], Option[W]))] = {
+ ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
fullOuterJoin[W](other, defaultPartitioner(numPartitions))
}
@@ -637,7 +646,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Option[V], Option[W]))] = {
+ ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
@@ -651,7 +660,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassTag[F]) {
+ )(implicit fm: ClassTag[F]): Unit = ssc.withScope {
saveAsHadoopFiles(prefix, suffix, keyClass, valueClass,
fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -667,7 +676,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
- ) {
+ ): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
@@ -684,7 +693,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassTag[F]) {
+ )(implicit fm: ClassTag[F]): Unit = ssc.withScope {
saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -700,7 +709,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = ssc.sparkContext.hadoopConfiguration
- ) {
+ ): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
new file mode 100644
index 0000000000..3929331020
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.ui.UIUtils
+
+/**
+ * Tests whether scope information is passed from DStream operations to RDDs correctly.
+ */
+class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
+ private var ssc: StreamingContext = null
+ private val batchDuration: Duration = Seconds(1)
+
+ override def beforeAll(): Unit = {
+ ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+ }
+
+ override def afterAll(): Unit = {
+ ssc.stop(stopSparkContext = true)
+ }
+
+ before { assertPropertiesNotSet() }
+ after { assertPropertiesNotSet() }
+
+ test("dstream without scope") {
+ val dummyStream = new DummyDStream(ssc)
+ dummyStream.initialize(Time(0))
+
+ // This DStream is not instantiated in any scope, so all RDDs
+ // created by this stream should similarly not have a scope
+ assert(dummyStream.baseScope === None)
+ assert(dummyStream.getOrCompute(Time(1000)).get.scope === None)
+ assert(dummyStream.getOrCompute(Time(2000)).get.scope === None)
+ assert(dummyStream.getOrCompute(Time(3000)).get.scope === None)
+ }
+
+ test("input dstream without scope") {
+ val inputStream = new DummyInputDStream(ssc)
+ inputStream.initialize(Time(0))
+
+ val baseScope = inputStream.baseScope.map(RDDOperationScope.fromJson)
+ val scope1 = inputStream.getOrCompute(Time(1000)).get.scope
+ val scope2 = inputStream.getOrCompute(Time(2000)).get.scope
+ val scope3 = inputStream.getOrCompute(Time(3000)).get.scope
+
+ // This DStream is not instantiated in any scope, so all RDDs
+ assertDefined(baseScope, scope1, scope2, scope3)
+ assert(baseScope.get.name.startsWith("dummy stream"))
+ assertScopeCorrect(baseScope.get, scope1.get, 1000)
+ assertScopeCorrect(baseScope.get, scope2.get, 2000)
+ assertScopeCorrect(baseScope.get, scope3.get, 3000)
+ }
+
+ test("scoping simple operations") {
+ val inputStream = new DummyInputDStream(ssc)
+ val mappedStream = inputStream.map { i => i + 1 }
+ val filteredStream = mappedStream.filter { i => i % 2 == 0 }
+ filteredStream.initialize(Time(0))
+
+ val mappedScopeBase = mappedStream.baseScope.map(RDDOperationScope.fromJson)
+ val mappedScope1 = mappedStream.getOrCompute(Time(1000)).get.scope
+ val mappedScope2 = mappedStream.getOrCompute(Time(2000)).get.scope
+ val mappedScope3 = mappedStream.getOrCompute(Time(3000)).get.scope
+ val filteredScopeBase = filteredStream.baseScope.map(RDDOperationScope.fromJson)
+ val filteredScope1 = filteredStream.getOrCompute(Time(1000)).get.scope
+ val filteredScope2 = filteredStream.getOrCompute(Time(2000)).get.scope
+ val filteredScope3 = filteredStream.getOrCompute(Time(3000)).get.scope
+
+ // These streams are defined in their respective scopes "map" and "filter", so all
+ // RDDs created by these streams should inherit the IDs and names of their parent
+ // DStream's base scopes
+ assertDefined(mappedScopeBase, mappedScope1, mappedScope2, mappedScope3)
+ assertDefined(filteredScopeBase, filteredScope1, filteredScope2, filteredScope3)
+ assert(mappedScopeBase.get.name === "map")
+ assert(filteredScopeBase.get.name === "filter")
+ assertScopeCorrect(mappedScopeBase.get, mappedScope1.get, 1000)
+ assertScopeCorrect(mappedScopeBase.get, mappedScope2.get, 2000)
+ assertScopeCorrect(mappedScopeBase.get, mappedScope3.get, 3000)
+ assertScopeCorrect(filteredScopeBase.get, filteredScope1.get, 1000)
+ assertScopeCorrect(filteredScopeBase.get, filteredScope2.get, 2000)
+ assertScopeCorrect(filteredScopeBase.get, filteredScope3.get, 3000)
+ }
+
+ test("scoping nested operations") {
+ val inputStream = new DummyInputDStream(ssc)
+ val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
+ countStream.initialize(Time(0))
+
+ val countScopeBase = countStream.baseScope.map(RDDOperationScope.fromJson)
+ val countScope1 = countStream.getOrCompute(Time(1000)).get.scope
+ val countScope2 = countStream.getOrCompute(Time(2000)).get.scope
+ val countScope3 = countStream.getOrCompute(Time(3000)).get.scope
+
+ // Assert that all children RDDs inherit the DStream operation name correctly
+ assertDefined(countScopeBase, countScope1, countScope2, countScope3)
+ assert(countScopeBase.get.name === "countByWindow")
+ assertScopeCorrect(countScopeBase.get, countScope1.get, 1000)
+ assertScopeCorrect(countScopeBase.get, countScope2.get, 2000)
+ assertScopeCorrect(countScopeBase.get, countScope3.get, 3000)
+
+ // All streams except the input stream should share the same scopes as `countStream`
+ def testStream(stream: DStream[_]): Unit = {
+ if (stream != inputStream) {
+ val myScopeBase = stream.baseScope.map(RDDOperationScope.fromJson)
+ val myScope1 = stream.getOrCompute(Time(1000)).get.scope
+ val myScope2 = stream.getOrCompute(Time(2000)).get.scope
+ val myScope3 = stream.getOrCompute(Time(3000)).get.scope
+ assertDefined(myScopeBase, myScope1, myScope2, myScope3)
+ assert(myScopeBase === countScopeBase)
+ assert(myScope1 === countScope1)
+ assert(myScope2 === countScope2)
+ assert(myScope3 === countScope3)
+ // Climb upwards to test the parent streams
+ stream.dependencies.foreach(testStream)
+ }
+ }
+ testStream(countStream)
+ }
+
+ /** Assert that the RDD operation scope properties are not set in our SparkContext. */
+ private def assertPropertiesNotSet(): Unit = {
+ assert(ssc != null)
+ assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) == null)
+ assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY) == null)
+ }
+
+ /** Assert that the given RDD scope inherits the name and ID of the base scope correctly. */
+ private def assertScopeCorrect(
+ baseScope: RDDOperationScope,
+ rddScope: RDDOperationScope,
+ batchTime: Long): Unit = {
+ assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
+ }
+
+ /** Assert that the given RDD scope inherits the base name and ID correctly. */
+ private def assertScopeCorrect(
+ baseScopeId: String,
+ baseScopeName: String,
+ rddScope: RDDOperationScope,
+ batchTime: Long): Unit = {
+ val formattedBatchTime = UIUtils.formatBatchTime(
+ batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+ assert(rddScope.id === s"${baseScopeId}_$batchTime")
+ assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+ }
+
+ /** Assert that all the specified options are defined. */
+ private def assertDefined[T](options: Option[T]*): Unit = {
+ options.zipWithIndex.foreach { case (o, i) => assert(o.isDefined, s"Option $i was empty!") }
+ }
+
+}
+
+/**
+ * A dummy stream that does absolutely nothing.
+ */
+private class DummyDStream(ssc: StreamingContext) extends DStream[Int](ssc) {
+ override def dependencies: List[DStream[Int]] = List.empty
+ override def slideDuration: Duration = Seconds(1)
+ override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
+}
+
+/**
+ * A dummy input stream that does absolutely nothing.
+ */
+private class DummyInputDStream(ssc: StreamingContext) extends InputDStream[Int](ssc) {
+ override def start(): Unit = { }
+ override def stop(): Unit = { }
+ override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
+}