aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala63
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala32
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala18
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala34
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala5
20 files changed, 182 insertions, 113 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 4960a85b97..ca0115f90e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.deploy.SparkHadoopUtil
@@ -35,14 +35,14 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
- val sparkHome = ssc.sc.sparkHome
+ val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
- val environment = ssc.sc.environment
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes()
- val delaySeconds = MetadataCleaner.getDelaySeconds
+ val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
+ val sparkConf = ssc.conf
def validate() {
assert(master != null, "Checkpoint.master is null")
@@ -58,11 +58,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
* Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
-class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging {
+class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
+ extends Logging
+{
val file = new Path(checkpointDir, "graph")
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
- val compressionCodec = CompressionCodec.createCodec()
+ val compressionCodec = CompressionCodec.createCodec(conf)
// The file to which we actually write - and then "move" to file
val writeFile = new Path(file.getParent, file.getName + ".next")
// The file to which existing checkpoint is backed up (i.e. "moved")
@@ -151,12 +153,12 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
private[streaming]
object CheckpointReader extends Logging {
- def read(path: String): Checkpoint = {
+ def read(conf: SparkConf, path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
new Path(path), new Path(path + ".bk"))
- val compressionCodec = CompressionCodec.createCodec()
+ val compressionCodec = CompressionCodec.createCodec(conf)
attempts.foreach(file => {
if (fs.exists(file)) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index a78d3965ee..00671ba520 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] (
@transient protected[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
- initLogging()
-
// =======================================================================
// Methods that should be implemented by subclasses of DStream
// =======================================================================
@@ -208,7 +206,7 @@ abstract class DStream[T: ClassTag] (
checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
)
- val metadataCleanerDelay = MetadataCleaner.getDelaySeconds
+ val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index daed7ff7c3..a09b891956 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -24,7 +24,6 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.scheduler.Job
final private[streaming] class DStreamGraph extends Serializable with Logging {
- initLogging()
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 80af96c060..56dbcbda23 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -108,8 +108,9 @@ extends Serializable {
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- partitioner: Partitioner) : DStream[(K, C)] = {
- new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true): DStream[(K, C)] = {
+ new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
}
/**
@@ -173,7 +174,13 @@ extends Serializable {
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, Seq[V])] = {
- self.window(windowDuration, slideDuration).groupByKey(partitioner)
+ val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
+ val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+ val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
+ self.groupByKey(partitioner)
+ .window(windowDuration, slideDuration)
+ .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
+ .asInstanceOf[DStream[(K, Seq[V])]]
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8898fdcb7f..b3a7cf08b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -63,18 +63,27 @@ class StreamingContext private (
/**
* Create a StreamingContext using an existing SparkContext.
- * @param sparkContext Existing SparkContext
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkContext existing SparkContext
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
/**
+ * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+ * @param batchDuration the time interval at which streaming data will be divided into batches
+ */
+ def this(conf: SparkConf, batchDuration: Duration) = {
+ this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
+ }
+
+ /**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your job, to display on the cluster web UI
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName a name for your job, to display on the cluster web UI
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(
master: String,
@@ -92,20 +101,20 @@ class StreamingContext private (
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
- def this(path: String) = this(null, CheckpointReader.read(path), null)
-
- initLogging()
+ def this(path: String) = this(null, CheckpointReader.read(new SparkConf(), path), null)
if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
- if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) {
- MetadataCleaner.setDelaySeconds(cp_.delaySeconds)
+ private val conf_ = Option(sc_).map(_.conf).getOrElse(cp_.sparkConf)
+
+ if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds(conf_) < 0) {
+ MetadataCleaner.setDelaySeconds(conf_, cp_.delaySeconds)
}
- if (MetadataCleaner.getDelaySeconds < 0) {
+ if (MetadataCleaner.getDelaySeconds(conf_) < 0) {
throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+ "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
}
@@ -114,12 +123,14 @@ class StreamingContext private (
protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
- new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
+ new SparkContext(cp_.sparkConf)
} else {
sc_
}
}
+ protected[streaming] val conf = sc.conf
+
protected[streaming] val env = SparkEnv.get
protected[streaming] val graph: DStreamGraph = {
@@ -584,18 +595,36 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls)
+
+ protected[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
+ // Set the default cleaner delay to an hour if not already set.
+ // This should be sufficient for even 1 second batch intervals.
+ val sc = new SparkContext(conf)
+ if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
+ MetadataCleaner.setDelaySeconds(sc.conf, 3600)
+ }
+ sc
+ }
+
protected[streaming] def createNewSparkContext(
master: String,
appName: String,
sparkHome: String,
jars: Seq[String],
- environment: Map[String, String]): SparkContext = {
+ environment: Map[String, String]): SparkContext =
+ {
+ val sc = new SparkContext(master, appName, sparkHome, jars, environment)
// Set the default cleaner delay to an hour if not already set.
- // This should be sufficient for even 1 second interval.
- if (MetadataCleaner.getDelaySeconds < 0) {
- MetadataCleaner.setDelaySeconds(3600)
+ // This should be sufficient for even 1 second batch intervals.
+ if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
+ MetadataCleaner.setDelaySeconds(sc.conf, 3600)
}
- new SparkContext(master, appName, sparkHome, jars, environment)
+ sc
}
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index dfd6e27c3e..6c3467d405 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -155,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
@@ -169,6 +169,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
+ * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
+ * information.
+ */
+ def combineByKey[C](createCombiner: JFunction[V, C],
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ partitioner: Partitioner,
+ mapSideCombine: Boolean
+ ): JavaPairDStream[K, C] = {
+ implicit val cm: ClassTag[C] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
+ }
+
+ /**
* Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index aad0d931e7..7dec4b3ad7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,26 +17,25 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Integer => JInt}
import java.io.InputStream
-import java.util.{Map => JMap, List => JList}
+import java.lang.{Integer => JInt}
+import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import twitter4j.Status
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-import akka.util.ByteString
-
import twitter4j.auth.Authorization
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.StreamingListener
@@ -124,6 +123,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(sparkContext.sc, batchDuration))
/**
+ * Creates a StreamingContext using an existing SparkContext.
+ * @param conf A Spark application configuration
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(conf: SparkConf, batchDuration: Duration) =
+ this(new StreamingContext(conf, batchDuration))
+
+ /**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
@@ -707,5 +714,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Sstops the execution of the streams.
*/
def stop() = ssc.stop()
+}
+object JavaStreamingContext {
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 5add20871e..27d474c0a0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
*/
abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
- initLogging()
-
lazy protected val env = SparkEnv.get
lazy protected val actor = env.actorSystem.actorOf(
@@ -176,8 +174,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/** A helper actor that communicates with the NetworkInputTracker */
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
- val ip = System.getProperty("spark.driver.host", "localhost")
- val port = System.getProperty("spark.driver.port", "7077").toInt
+ val ip = env.conf.get("spark.driver.host", "localhost")
+ val port = env.conf.get("spark.driver.port", "7077").toInt
val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorSelection(url)
val timeout = 5.seconds
@@ -214,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
+ val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index e6e0022097..84e69f277b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -29,8 +29,9 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- partitioner: Partitioner
- ) extends DStream [(K,C)] (parent.ssc) {
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true
+ ) extends DStream[(K,C)] (parent.ssc) {
override def dependencies = List(parent)
@@ -38,8 +39,8 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
- case Some(rdd) =>
- Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+ case Some(rdd) => Some(rdd.combineByKey[C](
+ createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 73d959331a..89c43ff935 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -17,10 +17,10 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.Duration
import scala.reflect.ClassTag
@@ -51,6 +51,14 @@ class WindowedDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
- Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
+ val rddsInWindow = parent.slice(currentWindow)
+ val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+ logDebug("Using partition aware union for windowing at " + validTime)
+ new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
+ } else {
+ logDebug("Using normal union for windowing at " + validTime)
+ new UnionRDD(ssc.sc,rddsInWindow)
+ }
+ Some(windowRDD)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 921a33a4cb..5f8be93a98 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -36,8 +36,6 @@ private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
- initLogging()
-
val ssc = jobScheduler.ssc
val graph = ssc.graph
val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
@@ -48,14 +46,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
}))
val clock = {
- val clockClass = System.getProperty(
+ val clockClass = ssc.sc.conf.get(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
Class.forName(clockClass).newInstance().asInstanceOf[Clock]
}
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
- new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
+ new CheckpointWriter(ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
@@ -67,7 +65,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
startFirstTime()
}
}
-
+
def stop() {
timer.stop()
if (checkpointWriter != null) checkpointWriter.stop()
@@ -106,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9511ccfbed..9304fc1a93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -25,15 +25,13 @@ import org.apache.spark.streaming._
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
- * the jobs and runs them using a thread pool. Number of threads
+ * the jobs and runs them using a thread pool. Number of threads
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
- initLogging()
-
val jobSets = new ConcurrentHashMap[Time, JobSet]
- val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+ val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
val generator = new JobGenerator(this)
val listenerBus = new StreamingListenerBus()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 4a3993e3e3..1559f7a9f7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration
private[streaming]
object MasterFailureTest extends Logging {
- initLogging()
@volatile var killed = false
@volatile var killCount = 0
@@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag](
*/
private[streaming]
class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
- initLogging()
override def run() {
try {
@@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread
private[streaming]
class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
extends Thread with Logging {
- initLogging()
override def run() {
val localTestDir = Files.createTempDir()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index fc8655a083..6585d494a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import org.apache.spark.util.{RateLimitedOutputStream, IntParam}
import java.net.ServerSocket
-import org.apache.spark.{Logging}
+import org.apache.spark.{SparkConf, Logging}
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import scala.io.Source
import java.io.IOException
@@ -42,7 +42,7 @@ object RawTextSender extends Logging {
// Repeat the input data multiple times to fill in a buffer
val lines = Source.fromFile(file).getLines().toArray
val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
- val ser = new KryoSerializer().newInstance()
+ val ser = new KryoSerializer(new SparkConf()).newInstance()
val serStream = ser.serializeStream(bufferStream)
var i = 0
while (bufferStream.position < blockSize) {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index daeb99f5b7..d53d433693 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -25,6 +25,7 @@ import com.google.common.io.Files;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
@@ -62,8 +63,11 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
@@ -101,7 +105,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("hello", "world"),
Arrays.asList("goodnight", "moon"));
- List<List<Integer>> expected = Arrays.asList(
+ List<List<Integer>> expected = Arrays.asList(
Arrays.asList(5,5),
Arrays.asList(9,4));
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index b35ca00b53..ee6b433d1f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -23,9 +23,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import util.ManualClock
+import org.apache.spark.{SparkContext, SparkConf}
class BasicOperationsSuite extends TestSuiteBase {
-
test("map") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
@@ -375,7 +375,11 @@ class BasicOperationsSuite extends TestSuiteBase {
}
test("slice") {
- val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1))
+ val conf2 = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("BasicOperationsSuite")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1))
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
ssc.registerInputStream(stream)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 4e25c9566c..8dc80ac2ed 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -57,7 +57,7 @@ class CheckpointSuite extends TestSuiteBase {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
val fs = FileSystem.getLocal(new Configuration())
@@ -132,8 +132,9 @@ class CheckpointSuite extends TestSuiteBase {
assert(!stateStream.generatedRDDs.isEmpty,
"No restored RDDs in state stream after recovery from second failure")
- // Adjust manual clock time as if it is being restarted after a delay
- System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
+ // Adjust manual clock time as if it is being restarted after a delay; this is a hack because
+ // we modify the conf object, but it works for this one property
+ ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
@@ -196,10 +197,6 @@ class CheckpointSuite extends TestSuiteBase {
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
- // Disable manual clock as FileInputDStream does not work with manual clock
- val clockProperty = System.getProperty("spark.streaming.clock")
- System.clearProperty("spark.streaming.clock")
-
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -296,10 +293,6 @@ class CheckpointSuite extends TestSuiteBase {
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
-
- // Enable manual clock back again for other tests
- if (clockProperty != null)
- System.setProperty("spark.streaming.clock", clockProperty)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 5fa14ad7c4..5185954521 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -56,9 +56,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
ssc.registerOutputStream(outputStream)
@@ -101,7 +101,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("flume input stream") {
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
@@ -150,11 +150,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
- System.clearProperty("spark.streaming.clock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -195,7 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(testDir)
// Enable manual clock back again for other tests
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
@@ -206,7 +206,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -250,7 +250,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val topics = Map("my-topic" -> 1)
val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
@@ -273,7 +273,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
MultiThreadTestReceiver.haveAllThreadsFinished = false
// set up the network stream using the test receiver
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.networkStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index e969e91d13..33464bc3a1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -28,7 +28,7 @@ import java.io.{ObjectInputStream, IOException}
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.rdd.RDD
/**
@@ -133,20 +133,26 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Whether to actually wait in real time before changing manual clock
def actuallyWait = false
+ //// A SparkConf to use in tests. Can be modified before calling setupStreams to configure things.
+ val conf = new SparkConf()
+ .setMaster(master)
+ .setAppName(framework)
+ .set("spark.cleaner.ttl", "3600")
+
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
+ //if (useManualClock) {
+ // System.setProperty(
+ // "spark.streaming.clock",
+ // "org.apache.spark.streaming.util.ManualClock"
+ // )
+ //} else {
+ // System.clearProperty("spark.streaming.clock")
+ //}
if (useManualClock) {
- System.setProperty(
- "spark.streaming.clock",
- "org.apache.spark.streaming.util.ManualClock"
- )
- } else {
- System.clearProperty("spark.streaming.clock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
}
// Default after function for any streaming test suite. Override this
@@ -169,9 +175,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
): StreamingContext = {
-
+ val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(sc, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}
@@ -195,9 +201,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
): StreamingContext = {
-
+ val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(sc, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 6b4aaefcdf..c39abfc21b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
-import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
@@ -225,9 +224,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.groupByKeyAndWindow(windowDuration, slideDuration)
- .map(x => (x._1, x._2.toSet))
- .persist()
+ s.groupByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toSet))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}