aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-09 19:31:36 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-09 19:31:36 -0800
commit38d75e18fa48bd230e2ff9478e87274fb0cceb9f (patch)
tree5b6d15445a3376995068e9537482ca65078fc341 /streaming
parent4a5558ca9921ce89b3996e9ead13b07123fc7a2d (diff)
parent4b074fac054848ebd3397a3cce0a3e7871d3860c (diff)
downloadspark-38d75e18fa48bd230e2ff9478e87274fb0cceb9f.tar.gz
spark-38d75e18fa48bd230e2ff9478e87274fb0cceb9f.tar.bz2
spark-38d75e18fa48bd230e2ff9478e87274fb0cceb9f.zip
Merge remote-tracking branch 'apache/master' into driver-test
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
4 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 27d474c0a0..d41f726f83 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -175,7 +175,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
val ip = env.conf.get("spark.driver.host", "localhost")
- val port = env.conf.get("spark.driver.port", "7077").toInt
+ val port = env.conf.getInt("spark.driver.port", 7077)
val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorSelection(url)
val timeout = 5.seconds
@@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong
+ val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 7341bfbc99..c8ee93bf5b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -38,4 +38,4 @@ class Job(val time: Time, func: () => _) {
}
override def toString = id
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 917b4c57f6..2fa6853ae0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -110,7 +110,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong
+ val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9304fc1a93..30c070c274 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -31,7 +31,7 @@ private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
val jobSets = new ConcurrentHashMap[Time, JobSet]
- val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt
+ val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
val generator = new JobGenerator(this)
val listenerBus = new StreamingListenerBus()