aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-01-08 17:32:15 -0500
committerMatei Zaharia <matei@databricks.com>2014-01-09 00:07:29 -0800
commita01f3401e32ca4324884d13c9fad53c6c87bb5f0 (patch)
tree2ffaae5b5e997fba0deccfc9a4c4abc6402e054b /streaming
parentdceedb466032eca38f943cb97a8dca27496ca914 (diff)
downloadspark-a01f3401e32ca4324884d13c9fad53c6c87bb5f0.tar.gz
spark-a01f3401e32ca4324884d13c9fad53c6c87bb5f0.tar.bz2
spark-a01f3401e32ca4324884d13c9fad53c6c87bb5f0.zip
Use typed getters for configuration settings
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
4 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 27d474c0a0..d41f726f83 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -175,7 +175,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
val ip = env.conf.get("spark.driver.host", "localhost")
- val port = env.conf.get("spark.driver.port", "7077").toInt
+ val port = env.conf.getInt("spark.driver.port", 7077)
val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorSelection(url)
val timeout = 5.seconds
@@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong
+ val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 7341bfbc99..c8ee93bf5b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -38,4 +38,4 @@ class Job(val time: Time, func: () => _) {
}
override def toString = id
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 5f8be93a98..3c624e8199 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong
+ val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9304fc1a93..30c070c274 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -31,7 +31,7 @@ private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
val jobSets = new ConcurrentHashMap[Time, JobSet]
- val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt
+ val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
val generator = new JobGenerator(this)
val listenerBus = new StreamingListenerBus()