aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala48
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala177
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala32
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala111
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala190
6 files changed, 441 insertions, 119 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1d2ecdd341..7f181bcecd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.input.FixedLengthBinaryInputFormat
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
@@ -242,14 +242,33 @@ class StreamingContext private[streaming] (
private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
/**
+ * Execute a block of code in a scope such that all new DStreams created in this body will
+ * be part of the same scope. For more detail, see the comments in `doCompute`.
+ *
+ * Note: Return statements are NOT allowed in the given body.
+ */
+ private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body)
+
+ /**
+ * Execute a block of code in a scope such that all new DStreams created in this body will
+ * be part of the same scope. For more detail, see the comments in `doCompute`.
+ *
+ * Note: Return statements are NOT allowed in the given body.
+ */
+ private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
+ RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
+ }
+
+ /**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@deprecated("Use receiverStream", "1.0.0")
- def networkStream[T: ClassTag](
- receiver: Receiver[T]): ReceiverInputDStream[T] = {
- receiverStream(receiver)
+ def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ withNamedScope("network stream") {
+ receiverStream(receiver)
+ }
}
/**
@@ -257,9 +276,10 @@ class StreamingContext private[streaming] (
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
- def receiverStream[T: ClassTag](
- receiver: Receiver[T]): ReceiverInputDStream[T] = {
- new PluggableInputDStream[T](this, receiver)
+ def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ withNamedScope("receiver stream") {
+ new PluggableInputDStream[T](this, receiver)
+ }
}
/**
@@ -279,7 +299,7 @@ class StreamingContext private[streaming] (
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
- ): ReceiverInputDStream[T] = {
+ ): ReceiverInputDStream[T] = withNamedScope("actor stream") {
receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
@@ -296,7 +316,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[String] = {
+ ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
@@ -334,7 +354,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[T] = {
+ ): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
new RawInputDStream[T](this, hostname, port, storageLevel)
}
@@ -408,7 +428,7 @@ class StreamingContext private[streaming] (
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
- def textFileStream(directory: String): DStream[String] = {
+ def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
@@ -430,7 +450,7 @@ class StreamingContext private[streaming] (
@Experimental
def binaryRecordsStream(
directory: String,
- recordLength: Int): DStream[Array[Byte]] = {
+ recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
@@ -477,7 +497,7 @@ class StreamingContext private[streaming] (
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
- def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
+ def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
new UnionDStream[T](streams.toArray)
}
@@ -488,7 +508,7 @@ class StreamingContext private[streaming] (
def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
- ): DStream[T] = {
+ ): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 64de7526a6..5977481e1f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -25,12 +25,13 @@ import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.matching.Regex
-import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
+import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
/**
@@ -73,7 +74,7 @@ abstract class DStream[T: ClassTag] (
def dependencies: List[DStream[_]]
/** Method that generates a RDD for the given time */
- def compute (validTime: Time): Option[RDD[T]]
+ def compute(validTime: Time): Option[RDD[T]]
// =======================================================================
// Methods and fields available on all DStreams
@@ -111,6 +112,44 @@ abstract class DStream[T: ClassTag] (
/* Set the creation call site */
private[streaming] val creationSite = DStream.getCreationSite()
+ /**
+ * The base scope associated with the operation that created this DStream.
+ *
+ * This is the medium through which we pass the DStream operation name (e.g. updatedStateByKey)
+ * to the RDDs created by this DStream. Note that we never use this scope directly in RDDs.
+ * Instead, we instantiate a new scope during each call to `compute` based on this one.
+ *
+ * This is not defined if the DStream is created outside of one of the public DStream operations.
+ */
+ protected[streaming] val baseScope: Option[String] = {
+ Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
+ }
+
+ /**
+ * Make a scope that groups RDDs created in the same DStream operation in the same batch.
+ *
+ * Each DStream produces many scopes and each scope may be shared by other DStreams created
+ * in the same operation. Separate calls to the same DStream operation create separate scopes.
+ * For instance, `dstream.map(...).map(...)` creates two separate scopes per batch.
+ */
+ private def makeScope(time: Time): Option[RDDOperationScope] = {
+ baseScope.map { bsJson =>
+ val formattedBatchTime = UIUtils.formatBatchTime(
+ time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+ val bs = RDDOperationScope.fromJson(bsJson)
+ val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
+ val scopeName =
+ if (baseName.length > 10) {
+ // If the operation name is too long, wrap the line
+ s"$baseName\n@ $formattedBatchTime"
+ } else {
+ s"$baseName @ $formattedBatchTime"
+ }
+ val scopeId = s"${bs.id}_${time.milliseconds}"
+ new RDDOperationScope(scopeName, id = scopeId)
+ }
+ }
+
/** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
@@ -295,28 +334,23 @@ abstract class DStream[T: ClassTag] (
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
- private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
+ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
- // Set the thread-local property for call sites to this DStream's creation site
- // such that RDDs generated by compute gets that as their creation site.
- // Note that this `getOrCompute` may get called from another DStream which may have
- // set its own call site. So we store its call site in a temporary variable,
- // set this DStream's creation site, generate RDDs and then restore the previous call site.
- val prevCallSite = ssc.sparkContext.getCallSite()
- ssc.sparkContext.setCallSite(creationSite)
- // Disable checks for existing output directories in jobs launched by the streaming
- // scheduler, since we may need to write output to an existing directory during checkpoint
- // recovery; see SPARK-4835 for more details. We need to have this call here because
- // compute() might cause Spark jobs to be launched.
- val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
- compute(time)
+
+ val rddOption = createRDDWithLocalProperties(time) {
+ // Disable checks for existing output directories in jobs launched by the streaming
+ // scheduler, since we may need to write output to an existing directory during checkpoint
+ // recovery; see SPARK-4835 for more details. We need to have this call here because
+ // compute() might cause Spark jobs to be launched.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ compute(time)
+ }
}
- ssc.sparkContext.setCallSite(prevCallSite)
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
@@ -338,6 +372,41 @@ abstract class DStream[T: ClassTag] (
}
/**
+ * Wrap a body of code such that the call site and operation scope
+ * information are passed to the RDDs created in this body properly.
+ */
+ protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+ val scopeKey = SparkContext.RDD_SCOPE_KEY
+ val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
+ // Pass this DStream's operation scope and creation site information to RDDs through
+ // thread-local properties in our SparkContext. Since this method may be called from another
+ // DStream, we need to temporarily store any old scope and creation site information to
+ // restore them later after setting our own.
+ val prevCallSite = ssc.sparkContext.getCallSite()
+ val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
+ val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
+
+ try {
+ ssc.sparkContext.setCallSite(creationSite)
+ // Use the DStream's base scope for this RDD so we can (1) preserve the higher level
+ // DStream operation name, and (2) share this scope with other DStreams created in the
+ // same operation. Disallow nesting so that low-level Spark primitives do not show up.
+ // TODO: merge callsites with scopes so we can just reuse the code there
+ makeScope(time).foreach { s =>
+ ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+ }
+
+ body
+ } finally {
+ // Restore any state that was modified before returning
+ ssc.sparkContext.setCallSite(prevCallSite)
+ ssc.sparkContext.setLocalProperty(scopeKey, prevScope)
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, prevScopeNoOverride)
+ }
+ }
+
+ /**
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
@@ -456,7 +525,7 @@ abstract class DStream[T: ClassTag] (
// =======================================================================
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
+ def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
@@ -464,26 +533,31 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
+ def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
/** Return a new DStream containing only the elements that satisfy a predicate. */
- def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+ def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
+ new FilteredDStream(this, filterFunc)
+ }
/**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
- def glom(): DStream[Array[T]] = new GlommedDStream(this)
-
+ def glom(): DStream[Array[T]] = ssc.withScope {
+ new GlommedDStream(this)
+ }
/**
* Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
* returned DStream has exactly numPartitions partitions.
*/
- def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
+ def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
+ this.transform(_.repartition(numPartitions))
+ }
/**
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
@@ -493,7 +567,7 @@ abstract class DStream[T: ClassTag] (
def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
- ): DStream[U] = {
+ ): DStream[U] = ssc.withScope {
new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
}
@@ -501,14 +575,15 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
- def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ }
/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Long] = {
+ def count(): DStream[Long] = ssc.withScope {
this.map(_ => (null, 1L))
.transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
.reduceByKey(_ + _)
@@ -522,15 +597,16 @@ abstract class DStream[T: ClassTag] (
* `numPartitions` not specified).
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
- : DStream[(T, Long)] =
+ : DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+ }
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: RDD[T] => Unit): Unit = {
+ def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
}
@@ -539,7 +615,7 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = {
+ def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
}
@@ -547,7 +623,7 @@ abstract class DStream[T: ClassTag] (
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreachRDD(foreachFunc: RDD[T] => Unit) {
+ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
}
@@ -555,7 +631,7 @@ abstract class DStream[T: ClassTag] (
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
+ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -566,7 +642,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -578,7 +654,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -596,7 +672,7 @@ abstract class DStream[T: ClassTag] (
*/
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
- ): DStream[V] = {
+ ): DStream[V] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -610,7 +686,7 @@ abstract class DStream[T: ClassTag] (
*/
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
- ): DStream[V] = {
+ ): DStream[V] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
@@ -628,7 +704,7 @@ abstract class DStream[T: ClassTag] (
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
- def print() {
+ def print(): Unit = ssc.withScope {
print(10)
}
@@ -636,7 +712,7 @@ abstract class DStream[T: ClassTag] (
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
- def print(num: Int) {
+ def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
@@ -668,7 +744,7 @@ abstract class DStream[T: ClassTag] (
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
new WindowedDStream(this, windowDuration, slideDuration)
}
@@ -686,7 +762,7 @@ abstract class DStream[T: ClassTag] (
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
- ): DStream[T] = {
+ ): DStream[T] = ssc.withScope {
this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
@@ -711,7 +787,7 @@ abstract class DStream[T: ClassTag] (
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
- ): DStream[T] = {
+ ): DStream[T] = ssc.withScope {
this.map(x => (1, x))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
@@ -727,7 +803,9 @@ abstract class DStream[T: ClassTag] (
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+ def countByWindow(
+ windowDuration: Duration,
+ slideDuration: Duration): DStream[Long] = ssc.withScope {
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
@@ -748,8 +826,7 @@ abstract class DStream[T: ClassTag] (
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null)
- : DStream[(T, Long)] =
- {
+ : DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
@@ -764,19 +841,21 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
- def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
+ def union(that: DStream[T]): DStream[T] = ssc.withScope {
+ new UnionDStream[T](Array(this, that))
+ }
/**
* Return all the RDDs defined by the Interval object (both end times included)
*/
- def slice(interval: Interval): Seq[RDD[T]] = {
+ def slice(interval: Interval): Seq[RDD[T]] = ssc.withScope {
slice(interval.beginTime, interval.endTime)
}
/**
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
- def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope {
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
@@ -810,7 +889,7 @@ abstract class DStream[T: ClassTag] (
* The file name at each batch interval is generated based on `prefix` and
* `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsObjectFiles(prefix: String, suffix: String = "") {
+ def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
@@ -823,7 +902,7 @@ abstract class DStream[T: ClassTag] (
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsTextFiles(prefix: String, suffix: String = "") {
+ def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 685a32e1d2..c109ceccc6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] (
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
- val jobFunc = () => {
+ val jobFunc = () => createRDDWithLocalProperties(time) {
ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 9716adb628..d58c99a8ff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -17,10 +17,13 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Time, Duration, StreamingContext}
-
import scala.reflect.ClassTag
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.streaming.{Time, Duration, StreamingContext}
+import org.apache.spark.util.Utils
+
/**
* This is the abstract base class for all input streams. This class provides methods
* start() and stop() which is called by Spark Streaming system to start and stop receiving data.
@@ -44,10 +47,31 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()
+ /** A human-readable name of this InputDStream */
+ private[streaming] def name: String = {
+ // e.g. FlumePollingDStream -> "Flume polling stream"
+ val newName = Utils.getFormattedClassName(this)
+ .replaceAll("InputDStream", "Stream")
+ .split("(?=[A-Z])")
+ .filter(_.nonEmpty)
+ .mkString(" ")
+ .toLowerCase
+ .capitalize
+ s"$newName [$id]"
+ }
+
/**
- * The name of this InputDStream. By default, it's the class name with its id.
+ * The base scope associated with the operation that created this DStream.
+ *
+ * For InputDStreams, we use the name of this DStream as the scope name.
+ * If an outer scope is given, we assume that it includes an alternative name for this stream.
*/
- private[streaming] def name: String = s"${getClass.getSimpleName}-$id"
+ protected[streaming] override val baseScope: Option[String] = {
+ val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
+ .map { json => RDDOperationScope.fromJson(json).name + s" [$id]" }
+ .getOrElse(name.toLowerCase)
+ Some(new RDDOperationScope(scopeName).toJson)
+ }
/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 8a58571632..884a8e8b52 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -46,7 +46,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
- def groupByKey(): DStream[(K, Iterable[V])] = {
+ def groupByKey(): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKey(defaultPartitioner())
}
@@ -54,7 +54,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
+ def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKey(defaultPartitioner(numPartitions))
}
@@ -62,7 +62,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` on each RDD. The supplied
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
- def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
+ def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
@@ -75,7 +75,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
- def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
+ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner())
}
@@ -84,7 +84,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
- def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
+ def reduceByKey(
+ reduceFunc: (V, V) => V,
+ numPartitions: Int): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
@@ -93,7 +95,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
- def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
+ def reduceByKey(
+ reduceFunc: (V, V) => V,
+ partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
@@ -104,11 +108,11 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
*/
def combineByKey[C: ClassTag](
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiner: (C, C) => C,
- partitioner: Partitioner,
- mapSideCombine: Boolean = true): DStream[(K, C)] = {
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
mapSideCombine)
}
@@ -121,7 +125,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = {
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
}
@@ -136,8 +140,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* DStream's batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
- : DStream[(K, Iterable[V])] =
- {
+ : DStream[(K, Iterable[V])] = ssc.withScope {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
@@ -157,7 +160,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
- ): DStream[(K, Iterable[V])] = {
+ ): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
@@ -176,7 +179,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ): DStream[(K, Iterable[V])] = {
+ ): DStream[(K, Iterable[V])] = ssc.withScope {
val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v
val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v
val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
@@ -198,7 +201,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
}
@@ -217,7 +220,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
@@ -238,7 +241,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
defaultPartitioner(numPartitions))
}
@@ -260,7 +263,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
self.reduceByKey(cleanedReduceFunc, partitioner)
.window(windowDuration, slideDuration)
@@ -294,8 +297,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
slideDuration: Duration = self.slideDuration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
- ): DStream[(K, V)] = {
-
+ ): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
@@ -328,7 +330,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
slideDuration: Duration,
partitioner: Partitioner,
filterFunc: ((K, V)) => Boolean
- ): DStream[(K, V)] = {
+ ): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
@@ -349,7 +351,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
@@ -365,7 +367,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
}
@@ -382,7 +384,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
@@ -406,7 +408,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
}
@@ -425,7 +427,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
initialRDD: RDD[(K, S)]
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
@@ -451,7 +453,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
partitioner: Partitioner,
rememberPartitioner: Boolean,
initialRDD: RDD[(K, S)]
- ): DStream[(K, S)] = {
+ ): DStream[(K, S)] = ssc.withScope {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
rememberPartitioner, Some(initialRDD))
}
@@ -460,7 +462,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying a map function to the value of each key-value pairs in
* 'this' DStream without changing the key.
*/
- def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
+ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
@@ -470,7 +472,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
*/
def flatMapValues[U: ClassTag](
flatMapValuesFunc: V => TraversableOnce[U]
- ): DStream[(K, U)] = {
+ ): DStream[(K, U)] = ssc.withScope {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
@@ -479,7 +481,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = {
+ def cogroup[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
cogroup(other, defaultPartitioner())
}
@@ -487,8 +490,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
- : DStream[(K, (Iterable[V], Iterable[W]))] = {
+ def cogroup[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
cogroup(other, defaultPartitioner(numPartitions))
}
@@ -499,7 +503,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Iterable[V], Iterable[W]))] = {
+ ): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
@@ -510,7 +514,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
- def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
join[W](other, defaultPartitioner())
}
@@ -518,7 +522,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope {
join[W](other, defaultPartitioner(numPartitions))
}
@@ -529,7 +535,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def join[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (V, W))] = {
+ ): DStream[(K, (V, W))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
@@ -541,7 +547,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = ssc.withScope {
leftOuterJoin[W](other, defaultPartitioner())
}
@@ -553,7 +560,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def leftOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
- ): DStream[(K, (V, Option[W]))] = {
+ ): DStream[(K, (V, Option[W]))] = ssc.withScope {
leftOuterJoin[W](other, defaultPartitioner(numPartitions))
}
@@ -565,7 +572,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def leftOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (V, Option[W]))] = {
+ ): DStream[(K, (V, Option[W]))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
@@ -577,7 +584,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+ def rightOuterJoin[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = ssc.withScope {
rightOuterJoin[W](other, defaultPartitioner())
}
@@ -589,7 +597,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def rightOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
- ): DStream[(K, (Option[V], W))] = {
+ ): DStream[(K, (Option[V], W))] = ssc.withScope {
rightOuterJoin[W](other, defaultPartitioner(numPartitions))
}
@@ -601,7 +609,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def rightOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Option[V], W))] = {
+ ): DStream[(K, (Option[V], W))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
@@ -613,7 +621,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
- def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
+ def fullOuterJoin[W: ClassTag](
+ other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
fullOuterJoin[W](other, defaultPartitioner())
}
@@ -625,7 +634,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
- ): DStream[(K, (Option[V], Option[W]))] = {
+ ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
fullOuterJoin[W](other, defaultPartitioner(numPartitions))
}
@@ -637,7 +646,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Option[V], Option[W]))] = {
+ ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
@@ -651,7 +660,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassTag[F]) {
+ )(implicit fm: ClassTag[F]): Unit = ssc.withScope {
saveAsHadoopFiles(prefix, suffix, keyClass, valueClass,
fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -667,7 +676,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
- ) {
+ ): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
@@ -684,7 +693,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
- )(implicit fm: ClassTag[F]) {
+ )(implicit fm: ClassTag[F]): Unit = ssc.withScope {
saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -700,7 +709,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = ssc.sparkContext.hadoopConfiguration
- ) {
+ ): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
new file mode 100644
index 0000000000..3929331020
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.ui.UIUtils
+
+/**
+ * Tests whether scope information is passed from DStream operations to RDDs correctly.
+ */
+class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
+ private var ssc: StreamingContext = null
+ private val batchDuration: Duration = Seconds(1)
+
+ override def beforeAll(): Unit = {
+ ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+ }
+
+ override def afterAll(): Unit = {
+ ssc.stop(stopSparkContext = true)
+ }
+
+ before { assertPropertiesNotSet() }
+ after { assertPropertiesNotSet() }
+
+ test("dstream without scope") {
+ val dummyStream = new DummyDStream(ssc)
+ dummyStream.initialize(Time(0))
+
+ // This DStream is not instantiated in any scope, so all RDDs
+ // created by this stream should similarly not have a scope
+ assert(dummyStream.baseScope === None)
+ assert(dummyStream.getOrCompute(Time(1000)).get.scope === None)
+ assert(dummyStream.getOrCompute(Time(2000)).get.scope === None)
+ assert(dummyStream.getOrCompute(Time(3000)).get.scope === None)
+ }
+
+ test("input dstream without scope") {
+ val inputStream = new DummyInputDStream(ssc)
+ inputStream.initialize(Time(0))
+
+ val baseScope = inputStream.baseScope.map(RDDOperationScope.fromJson)
+ val scope1 = inputStream.getOrCompute(Time(1000)).get.scope
+ val scope2 = inputStream.getOrCompute(Time(2000)).get.scope
+ val scope3 = inputStream.getOrCompute(Time(3000)).get.scope
+
+ // This DStream is not instantiated in any scope, so all RDDs
+ assertDefined(baseScope, scope1, scope2, scope3)
+ assert(baseScope.get.name.startsWith("dummy stream"))
+ assertScopeCorrect(baseScope.get, scope1.get, 1000)
+ assertScopeCorrect(baseScope.get, scope2.get, 2000)
+ assertScopeCorrect(baseScope.get, scope3.get, 3000)
+ }
+
+ test("scoping simple operations") {
+ val inputStream = new DummyInputDStream(ssc)
+ val mappedStream = inputStream.map { i => i + 1 }
+ val filteredStream = mappedStream.filter { i => i % 2 == 0 }
+ filteredStream.initialize(Time(0))
+
+ val mappedScopeBase = mappedStream.baseScope.map(RDDOperationScope.fromJson)
+ val mappedScope1 = mappedStream.getOrCompute(Time(1000)).get.scope
+ val mappedScope2 = mappedStream.getOrCompute(Time(2000)).get.scope
+ val mappedScope3 = mappedStream.getOrCompute(Time(3000)).get.scope
+ val filteredScopeBase = filteredStream.baseScope.map(RDDOperationScope.fromJson)
+ val filteredScope1 = filteredStream.getOrCompute(Time(1000)).get.scope
+ val filteredScope2 = filteredStream.getOrCompute(Time(2000)).get.scope
+ val filteredScope3 = filteredStream.getOrCompute(Time(3000)).get.scope
+
+ // These streams are defined in their respective scopes "map" and "filter", so all
+ // RDDs created by these streams should inherit the IDs and names of their parent
+ // DStream's base scopes
+ assertDefined(mappedScopeBase, mappedScope1, mappedScope2, mappedScope3)
+ assertDefined(filteredScopeBase, filteredScope1, filteredScope2, filteredScope3)
+ assert(mappedScopeBase.get.name === "map")
+ assert(filteredScopeBase.get.name === "filter")
+ assertScopeCorrect(mappedScopeBase.get, mappedScope1.get, 1000)
+ assertScopeCorrect(mappedScopeBase.get, mappedScope2.get, 2000)
+ assertScopeCorrect(mappedScopeBase.get, mappedScope3.get, 3000)
+ assertScopeCorrect(filteredScopeBase.get, filteredScope1.get, 1000)
+ assertScopeCorrect(filteredScopeBase.get, filteredScope2.get, 2000)
+ assertScopeCorrect(filteredScopeBase.get, filteredScope3.get, 3000)
+ }
+
+ test("scoping nested operations") {
+ val inputStream = new DummyInputDStream(ssc)
+ val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
+ countStream.initialize(Time(0))
+
+ val countScopeBase = countStream.baseScope.map(RDDOperationScope.fromJson)
+ val countScope1 = countStream.getOrCompute(Time(1000)).get.scope
+ val countScope2 = countStream.getOrCompute(Time(2000)).get.scope
+ val countScope3 = countStream.getOrCompute(Time(3000)).get.scope
+
+ // Assert that all children RDDs inherit the DStream operation name correctly
+ assertDefined(countScopeBase, countScope1, countScope2, countScope3)
+ assert(countScopeBase.get.name === "countByWindow")
+ assertScopeCorrect(countScopeBase.get, countScope1.get, 1000)
+ assertScopeCorrect(countScopeBase.get, countScope2.get, 2000)
+ assertScopeCorrect(countScopeBase.get, countScope3.get, 3000)
+
+ // All streams except the input stream should share the same scopes as `countStream`
+ def testStream(stream: DStream[_]): Unit = {
+ if (stream != inputStream) {
+ val myScopeBase = stream.baseScope.map(RDDOperationScope.fromJson)
+ val myScope1 = stream.getOrCompute(Time(1000)).get.scope
+ val myScope2 = stream.getOrCompute(Time(2000)).get.scope
+ val myScope3 = stream.getOrCompute(Time(3000)).get.scope
+ assertDefined(myScopeBase, myScope1, myScope2, myScope3)
+ assert(myScopeBase === countScopeBase)
+ assert(myScope1 === countScope1)
+ assert(myScope2 === countScope2)
+ assert(myScope3 === countScope3)
+ // Climb upwards to test the parent streams
+ stream.dependencies.foreach(testStream)
+ }
+ }
+ testStream(countStream)
+ }
+
+ /** Assert that the RDD operation scope properties are not set in our SparkContext. */
+ private def assertPropertiesNotSet(): Unit = {
+ assert(ssc != null)
+ assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) == null)
+ assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY) == null)
+ }
+
+ /** Assert that the given RDD scope inherits the name and ID of the base scope correctly. */
+ private def assertScopeCorrect(
+ baseScope: RDDOperationScope,
+ rddScope: RDDOperationScope,
+ batchTime: Long): Unit = {
+ assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
+ }
+
+ /** Assert that the given RDD scope inherits the base name and ID correctly. */
+ private def assertScopeCorrect(
+ baseScopeId: String,
+ baseScopeName: String,
+ rddScope: RDDOperationScope,
+ batchTime: Long): Unit = {
+ val formattedBatchTime = UIUtils.formatBatchTime(
+ batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+ assert(rddScope.id === s"${baseScopeId}_$batchTime")
+ assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+ }
+
+ /** Assert that all the specified options are defined. */
+ private def assertDefined[T](options: Option[T]*): Unit = {
+ options.zipWithIndex.foreach { case (o, i) => assert(o.isDefined, s"Option $i was empty!") }
+ }
+
+}
+
+/**
+ * A dummy stream that does absolutely nothing.
+ */
+private class DummyDStream(ssc: StreamingContext) extends DStream[Int](ssc) {
+ override def dependencies: List[DStream[Int]] = List.empty
+ override def slideDuration: Duration = Seconds(1)
+ override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
+}
+
+/**
+ * A dummy input stream that does absolutely nothing.
+ */
+private class DummyInputDStream(ssc: StreamingContext) extends InputDStream[Int](ssc) {
+ override def start(): Unit = { }
+ override def stop(): Unit = { }
+ override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
+}