diff options
author | Matei Zaharia <matei@databricks.com> | 2014-01-08 17:32:15 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-01-09 00:07:29 -0800 |
commit | a01f3401e32ca4324884d13c9fad53c6c87bb5f0 (patch) | |
tree | 2ffaae5b5e997fba0deccfc9a4c4abc6402e054b /streaming | |
parent | dceedb466032eca38f943cb97a8dca27496ca914 (diff) | |
download | spark-a01f3401e32ca4324884d13c9fad53c6c87bb5f0.tar.gz spark-a01f3401e32ca4324884d13c9fad53c6c87bb5f0.tar.bz2 spark-a01f3401e32ca4324884d13c9fad53c6c87bb5f0.zip |
Use typed getters for configuration settings
Diffstat (limited to 'streaming')
4 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 27d474c0a0..d41f726f83 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -175,7 +175,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging private class NetworkReceiverActor extends Actor { logInfo("Attempting to register with tracker") val ip = env.conf.get("spark.driver.host", "localhost") - val port = env.conf.get("spark.driver.port", "7077").toInt + val port = env.conf.getInt("spark.driver.port", 7077) val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorSelection(url) val timeout = 5.seconds @@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong + val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 7341bfbc99..c8ee93bf5b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -38,4 +38,4 @@ class Job(val time: Time, func: () => _) { } override def toString = id -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 5f8be93a98..3c624e8199 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds - val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong + val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 9304fc1a93..30c070c274 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -31,7 +31,7 @@ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { val jobSets = new ConcurrentHashMap[Time, JobSet] - val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt + val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) val executor = Executors.newFixedThreadPool(numConcurrentJobs) val generator = new JobGenerator(this) val listenerBus = new StreamingListenerBus() |