aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-18 23:39:28 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-18 23:39:28 -0800
commitec71b445ad0440e84c4b4909e4faf75aba0f13d7 (patch)
tree3594700c37b6baf2c4297ad7819e4a223f835bfc /streaming
parente93b391d75a1c2e17ad93caff39e8fc34d640935 (diff)
downloadspark-ec71b445ad0440e84c4b4909e4faf75aba0f13d7.tar.gz
spark-ec71b445ad0440e84c4b4909e4faf75aba0f13d7.tar.bz2
spark-ec71b445ad0440e84c4b4909e4faf75aba0f13d7.zip
Minor changes.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala2
9 files changed, 36 insertions, 32 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index fedbbde80c..41da028a3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -513,7 +513,10 @@ class StreamingContext private (
graph.addOutputStream(outputStream)
}
- def addListener(streamingListener: StreamingListener) {
+ /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ * receiving system events related to streaming.
+ */
+ def addStreamingListener(streamingListener: StreamingListener) {
scheduler.listenerBus.addListener(streamingListener)
}
@@ -532,20 +535,19 @@ class StreamingContext private (
* Start the execution of the streams.
*/
def start() {
-
validate()
+ // Get the network input streams
val networkInputStreams = graph.getInputStreams().filter(s => s match {
case n: NetworkInputDStream[_] => true
case _ => false
}).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
+ // Start the network input tracker (must start before receivers)
if (networkInputStreams.length > 0) {
- // Start the network input tracker (must start before receivers)
networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
networkInputTracker.start()
}
-
Thread.sleep(1000)
// Start the scheduler
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 80dcf87491..78d318cf27 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.StreamingListener
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -687,6 +688,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.remember(duration)
}
+ /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ * receiving system events related to streaming.
+ */
+ def addStreamingListener(streamingListener: StreamingListener) {
+ ssc.addStreamingListener(streamingListener)
+ }
+
/**
* Starts the execution of the streams.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 798598ad50..88e4af59b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -19,6 +19,9 @@ package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.Time
+/**
+ * Class having information on completed batches.
+ */
case class BatchInfo(
batchTime: Time,
submissionTime: Long,
@@ -32,7 +35,3 @@ case class BatchInfo(
def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
}
-
-
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index bca5e1f1a5..7341bfbc99 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -17,9 +17,11 @@
package org.apache.spark.streaming.scheduler
-import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.streaming.Time
+/**
+ * Class representing a Spark computation. It may contain multiple Spark jobs.
+ */
private[streaming]
class Job(val time: Time, func: () => _) {
var id: String = _
@@ -36,12 +38,4 @@ class Job(val time: Time, func: () => _) {
}
override def toString = id
-}
-/*
-private[streaming]
-object Job {
- val id = new AtomicLong(0)
-
- def getNewId() = id.getAndIncrement()
-}
-*/
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 5d3ce9c398..1cd0b9b0a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -22,6 +22,10 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
+/**
+ * This class generates jobs from DStreams as well as drives checkpointing and cleaning
+ * up DStream metadata.
+ */
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 69930f3b6c..33c5322358 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -23,7 +23,9 @@ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import scala.collection.mutable.HashSet
import org.apache.spark.streaming._
-
+/**
+ * This class drives the generation of Spark jobs from the DStreams.
+ */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 5647ffab8d..36225e190c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -50,19 +50,13 @@ trait StreamingListener {
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
*/
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
-
- import org.apache.spark
-
+ // Queue containing latest completed batches
val batchInfos = new Queue[BatchInfo]()
override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
- addToQueue(batchStarted.batchInfo)
- printStats()
- }
-
- def addToQueue(newPoint: BatchInfo) {
- batchInfos.enqueue(newPoint)
+ batchInfos.enqueue(batchStarted.batchInfo)
if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
+ printStats()
}
def printStats() {
@@ -71,10 +65,11 @@ class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
}
def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
- spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric))
+ org.apache.spark.scheduler.StatsReportListener.showMillisDistribution(
+ heading, extractDistribution(getMetric))
}
def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 324e491914..110a20f282 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -78,4 +78,4 @@ private[spark] class StreamingListenerBus() extends Logging {
}
return true
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 826c839932..16410a21e3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -34,7 +34,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{
test("basic BatchInfo generation") {
val ssc = setupStreams(input, operation)
val collector = new BatchInfoCollector
- ssc.addListener(collector)
+ ssc.addStreamingListener(collector)
runStreams(ssc, input.size, input.size)
val batchInfos = collector.batchInfos
batchInfos should have size 4