aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMubarak Seyed <mubarak.seyed@gmail.com>2014-09-23 15:09:12 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-23 15:09:12 -0700
commit729952a5efce755387c76cdf29280ee6f49fdb72 (patch)
tree034ef81a4f69c3329a93b213ebc2092e6046f035 /streaming
parentb3fef50e22fb3fe499f627179d17836a92dcb33a (diff)
downloadspark-729952a5efce755387c76cdf29280ee6f49fdb72.tar.gz
spark-729952a5efce755387c76cdf29280ee6f49fdb72.tar.bz2
spark-729952a5efce755387c76cdf29280ee6f49fdb72.zip
[SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
This is a refactored version of the original PR https://github.com/apache/spark/pull/1723 my mubarak Please take a look andrewor14, mubarak Author: Mubarak Seyed <mubarak.seyed@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2464 from tdas/streaming-callsite and squashes the following commits: dc54c71 [Tathagata Das] Made changes based on PR comments. 390b45d [Tathagata Das] Fixed minor bugs. 904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-callsite 7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it simpler. Also added unit test for DStream creation site. b9ed945 [Mubarak Seyed] Adding streaming utils c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' ceb43da [Mubarak Seyed] Changing default regex function name 8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 491a1eb [Mubarak Seyed] Removing streaming visibility from getRDDCreationCallSite in DStream 33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream 5051c58 [Mubarak Seyed] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None) a207eb7 [Mubarak Seyed] Fixing code review comments ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream 2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853 1d90cc3 [Mubarak Seyed] Changes for SPARK-1853 5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 70f494f [Mubarak Seyed] Changes for SPARK-1853 1500deb [Mubarak Seyed] Changes in Spark Streaming UI 9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI d466d75 [Mubarak Seyed] Changes for spark streaming UI
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala96
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala45
3 files changed, 107 insertions, 38 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index f63560dcb5..5a8eef1372 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -35,10 +35,9 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
+import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.MetadataCleaner
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -448,6 +447,7 @@ class StreamingContext private[streaming] (
throw new SparkException("StreamingContext has already been stopped")
}
validate()
+ sparkContext.setCallSite(DStream.getCreationSite())
scheduler.start()
state = Started
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index e05db236ad..65f7ccd318 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -23,6 +23,7 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import scala.deprecated
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
+import scala.util.matching.Regex
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.{BlockRDD, RDD}
@@ -30,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.util.{CallSite, MetadataCleaner}
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -106,6 +107,9 @@ abstract class DStream[T: ClassTag] (
/** Return the StreamingContext associated with this DStream */
def context = ssc
+ /* Set the creation call site */
+ private[streaming] val creationSite = DStream.getCreationSite()
+
/** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
@@ -272,43 +276,41 @@ abstract class DStream[T: ClassTag] (
}
/**
- * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
- * method that should not be called directly.
+ * Get the RDD corresponding to the given time; either retrieve it from cache
+ * or compute-and-cache it.
*/
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
- // If this DStream was not initialized (i.e., zeroTime not set), then do it
- // If RDD was already generated, then retrieve it from HashMap
- generatedRDDs.get(time) match {
-
- // If an RDD was already generated and is being reused, then
- // probably all RDDs in this DStream will be reused and hence should be cached
- case Some(oldRDD) => Some(oldRDD)
-
- // if RDD was not generated, and if the time is valid
- // (based on sliding time of this DStream), then generate the RDD
- case None => {
- if (isTimeValid(time)) {
- compute(time) match {
- case Some(newRDD) =>
- if (storageLevel != StorageLevel.NONE) {
- newRDD.persist(storageLevel)
- logInfo("Persisting RDD " + newRDD.id + " for time " +
- time + " to " + storageLevel + " at time " + time)
- }
- if (checkpointDuration != null &&
- (time - zeroTime).isMultipleOf(checkpointDuration)) {
- newRDD.checkpoint()
- logInfo("Marking RDD " + newRDD.id + " for time " + time +
- " for checkpointing at time " + time)
- }
- generatedRDDs.put(time, newRDD)
- Some(newRDD)
- case None =>
- None
+ // If RDD was already generated, then retrieve it from HashMap,
+ // or else compute the RDD
+ generatedRDDs.get(time).orElse {
+ // Compute the RDD if time is valid (e.g. correct time in a sliding window)
+ // of RDD generation, else generate nothing.
+ if (isTimeValid(time)) {
+ // Set the thread-local property for call sites to this DStream's creation site
+ // such that RDDs generated by compute gets that as their creation site.
+ // Note that this `getOrCompute` may get called from another DStream which may have
+ // set its own call site. So we store its call site in a temporary variable,
+ // set this DStream's creation site, generate RDDs and then restore the previous call site.
+ val prevCallSite = ssc.sparkContext.getCallSite()
+ ssc.sparkContext.setCallSite(creationSite)
+ val rddOption = compute(time)
+ ssc.sparkContext.setCallSite(prevCallSite)
+
+ rddOption.foreach { case newRDD =>
+ // Register the generated RDD for caching and checkpointing
+ if (storageLevel != StorageLevel.NONE) {
+ newRDD.persist(storageLevel)
+ logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
- } else {
- None
+ if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
+ newRDD.checkpoint()
+ logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
+ }
+ generatedRDDs.put(time, newRDD)
}
+ rddOption
+ } else {
+ None
}
}
}
@@ -799,3 +801,29 @@ abstract class DStream[T: ClassTag] (
this
}
}
+
+private[streaming] object DStream {
+
+ /** Get the creation site of a DStream from the stack trace of when the DStream is created. */
+ def getCreationSite(): CallSite = {
+ val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
+ val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
+ val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
+ val SCALA_CLASS_REGEX = """^scala""".r
+
+ /** Filtering function that excludes non-user classes for a streaming application */
+ def streamingExclustionFunction(className: String): Boolean = {
+ def doesMatch(r: Regex) = r.findFirstIn(className).isDefined
+ val isSparkClass = doesMatch(SPARK_CLASS_REGEX)
+ val isSparkExampleClass = doesMatch(SPARK_EXAMPLES_CLASS_REGEX)
+ val isSparkStreamingTestClass = doesMatch(SPARK_STREAMING_TESTCLASS_REGEX)
+ val isScalaClass = doesMatch(SCALA_CLASS_REGEX)
+
+ // If the class is a spark example class or a streaming test class then it is considered
+ // as a streaming application class and don't exclude. Otherwise, exclude any
+ // non-Spark and non-Scala class, as the rest would streaming application classes.
+ (isSparkClass || isScalaClass) && !isSparkExampleClass && !isSparkStreamingTestClass
+ }
+ org.apache.spark.util.Utils.getCallSite(streamingExclustionFunction)
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a3cabd6be0..ebf83748ff 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -19,13 +19,16 @@ package org.apache.spark.streaming
import java.util.concurrent.atomic.AtomicInteger
+import scala.language.postfixOps
+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.{MetadataCleaner, Utils}
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.util.Utils
+import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
@@ -257,6 +260,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
}
+ test("DStream and generated RDD creation sites") {
+ testPackage.test()
+ }
+
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => (1 to i))
val inputStream = new TestInputStream(s, input, 1)
@@ -293,3 +300,37 @@ class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging
object TestReceiver {
val counter = new AtomicInteger(1)
}
+
+/** Streaming application for testing DStream and RDD creation sites */
+package object testPackage extends Assertions {
+ def test() {
+ val conf = new SparkConf().setMaster("local").setAppName("CreationSite test")
+ val ssc = new StreamingContext(conf , Milliseconds(100))
+ try {
+ val inputStream = ssc.receiverStream(new TestReceiver)
+
+ // Verify creation site of DStream
+ val creationSite = inputStream.creationSite
+ assert(creationSite.shortForm.contains("receiverStream") &&
+ creationSite.shortForm.contains("StreamingContextSuite")
+ )
+ assert(creationSite.longForm.contains("testPackage"))
+
+ // Verify creation site of generated RDDs
+ var rddGenerated = false
+ var rddCreationSiteCorrect = true
+
+ inputStream.foreachRDD { rdd =>
+ rddCreationSiteCorrect = rdd.creationSite == creationSite
+ rddGenerated = true
+ }
+ ssc.start()
+
+ eventually(timeout(10000 millis), interval(10 millis)) {
+ assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+}