From e2c68642c64345434e2034082cf9b299491e9e9f Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 1 Jan 2014 22:03:39 -0500 Subject: Miscellaneous fixes from code review. Also replaced SparkConf.getOrElse with just a "get" that takes a default value, and added getInt, getLong, etc to make code that uses this simpler later on. --- .../org/apache/spark/streaming/dstream/NetworkInputDStream.scala | 6 +++--- .../scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 4 ++-- .../scala/org/apache/spark/streaming/scheduler/JobScheduler.scala | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index a230845b92..27d474c0a0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -174,8 +174,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** A helper actor that communicates with the NetworkInputTracker */ private class NetworkReceiverActor extends Actor { logInfo("Attempting to register with tracker") - val ip = env.conf.getOrElse("spark.driver.host", "localhost") - val port = env.conf.getOrElse("spark.driver.port", "7077").toInt + val ip = env.conf.get("spark.driver.host", "localhost") + val port = env.conf.get("spark.driver.port", "7077").toInt val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorSelection(url) val timeout = 5.seconds @@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = env.conf.getOrElse("spark.streaming.blockInterval", "200").toLong + val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 844180c81a..5f8be93a98 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -46,7 +46,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } })) val clock = { - val clockClass = ssc.sc.conf.getOrElse( + val clockClass = ssc.sc.conf.get( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") Class.forName(clockClass).newInstance().asInstanceOf[Clock] } @@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds - val jumpTime = ssc.sc.conf.getOrElse("spark.streaming.manualClock.jump", "0").toLong + val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 651cdaaa6d..9304fc1a93 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -31,7 +31,7 @@ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { val jobSets = new ConcurrentHashMap[Time, JobSet] - val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt + val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt val executor = Executors.newFixedThreadPool(numConcurrentJobs) val generator = new JobGenerator(this) val listenerBus = new StreamingListenerBus() -- cgit v1.2.3