aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-12-24 18:30:31 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-12-25 00:09:36 +0530
commit2573add94cf920a88f74d80d8ea94218d812704d (patch)
tree9eb07c85cadbaea90a8e9742687adda924342b42 /streaming
parent0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff)
downloadspark-2573add94cf920a88f74d80d8ea94218d812704d.tar.gz
spark-2573add94cf920a88f74d80d8ea94218d812704d.tar.bz2
spark-2573add94cf920a88f74d80d8ea94218d812704d.zip
spark-544, introducing SparkConf and related configuration overhaul.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala18
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala11
7 files changed, 27 insertions, 25 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914eb5..b8e1427a21 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -34,7 +34,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
- val sparkHome = ssc.sc.sparkHome
+ val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
val environment = ssc.sc.environment
val graph = ssc.graph
@@ -42,6 +42,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds
+ val sparkConf = ssc.sc.conf
def validate() {
assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
index ed892e33e6..1d23713c80 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
@@ -26,7 +26,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
- val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+ val concurrentJobs = ssc.sc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
@@ -34,7 +34,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
null
}
- val clockClass = System.getProperty(
+ val clockClass = ssc.sc.conf.getOrElse(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
@@ -73,7 +73,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ val jumpTime = ssc.sc.conf.getOrElse("spark.streaming.manualClock.jump", "0").toLong
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index d2c4fdee65..76744223e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -115,7 +115,7 @@ class StreamingContext private (
protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
- new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
+ new SparkContext(cp_.sparkConf, cp_.environment)
} else {
sc_
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d5ae8aef92..8bf761b8cb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -175,8 +175,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/** A helper actor that communicates with the NetworkInputTracker */
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
- val ip = System.getProperty("spark.driver.host", "localhost")
- val port = System.getProperty("spark.driver.port", "7077").toInt
+ val ip = env.conf.getOrElse("spark.driver.host", "localhost")
+ val port = env.conf.getOrElse("spark.driver.port", "7077").toInt
val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorSelection(url)
val timeout = 5.seconds
@@ -213,7 +213,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
+ val blockInterval = env.conf.getOrElse("spark.streaming.blockInterval", "200").toLong
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index e81287b44e..315bd5443c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.util.ManualClock
*/
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
before {
FileUtils.deleteDirectory(new File(checkpointDir))
@@ -69,7 +69,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
@@ -135,13 +135,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Restart stream computation from the new checkpoint file to see whether that file has
// correct checkpoint data
+ conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
// Adjust manual clock time as if it is being restarted after a delay
- System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7dc82decef..da8f135dd7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -53,7 +53,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
override def checkpointDir = "checkpoint"
before {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
after {
@@ -68,7 +68,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
@@ -113,7 +113,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("flume input stream") {
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
@@ -162,11 +162,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
- System.clearProperty("spark.streaming.clock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -207,7 +207,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(testDir)
// Enable manual clock back again for other tests
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
@@ -218,7 +218,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -262,7 +262,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val topics = Map("my-topic" -> 1)
val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
@@ -285,7 +285,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
MultiThreadTestReceiver.haveAllThreadsFinished = false
// set up the network stream using the test receiver
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.networkStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 2f34e812a1..d1cab0c609 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -28,7 +28,7 @@ import java.io.{ObjectInputStream, IOException}
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.rdd.RDD
/**
@@ -130,6 +130,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Whether to actually wait in real time before changing manual clock
def actuallyWait = false
+ def conf = new SparkConf().setMasterUrl(master).setAppName(framework).set("spark.cleaner.ttl", "3600")
/**
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
@@ -139,9 +140,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
): StreamingContext = {
-
+ val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(sc, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}
@@ -165,9 +166,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
): StreamingContext = {
-
+ val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(sc, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}