From eef261c89286ddbcdcc03684c1a5d0b94d6da321 Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Thu, 24 Oct 2013 12:12:11 -0700 Subject: fixing comments on PR --- .../main/scala/org/apache/spark/SparkContext.scala | 16 ++++---------- .../cluster/CoarseGrainedSchedulerBackend.scala | 6 +++--- .../scheduler/cluster/SimrSchedulerBackend.scala | 25 ++++++++++------------ 3 files changed, 18 insertions(+), 29 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3b39c97260..c9bc01cba5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -57,20 +57,13 @@ import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend} -import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalScheduler +import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.storage.RDDInfo -import org.apache.spark.storage.StorageStatus -import scala.Some -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.storage.RDDInfo -import org.apache.spark.storage.StorageStatus /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -133,7 +126,7 @@ class SparkContext( val startTime = System.currentTimeMillis() // Add each JAR given through the constructor - if (jars != null && jars != Seq(null)) { + if (jars != null) { jars.foreach { addJar(_) } } @@ -164,7 +157,7 @@ class SparkContext( val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster val MESOS_REGEX = """mesos://(.*)""".r - //Regular expression for connection to Simr cluster + // Regular expression for connection to Simr cluster val SIMR_REGEX = """simr://(.*)""".r master match { @@ -694,8 +687,7 @@ class SparkContext( */ def addJar(path: String) { if (path == null) { - logWarning("null specified as parameter to addJar", - new SparkException("null specified as parameter to addJar")) + logWarning("null specified as parameter to addJar") } else { var key = "" if (path.contains("\\")) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 80a9b4667d..70f3f88401 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -182,7 +182,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac if (driverActor != null) { logInfo("Shutting down all executors") val future = driverActor.ask(StopExecutors)(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } } catch { case e: Exception => @@ -194,7 +194,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } } catch { case e: Exception => @@ -217,7 +217,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac def removeExecutor(executorId: String, reason: String) { try { val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } catch { case e: Exception => throw new SparkException("Error notifying standalone scheduler's driver actor", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index ae56244979..d78bdbaa7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -1,5 +1,3 @@ -package org.apache.spark.scheduler.cluster - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,20 +15,21 @@ package org.apache.spark.scheduler.cluster * limitations under the License. */ +package org.apache.spark.scheduler.cluster -import org.apache.spark.{Logging, SparkContext} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.{Logging, SparkContext} private[spark] class SimrSchedulerBackend( - scheduler: ClusterScheduler, - sc: SparkContext, - driverFilePath: String) + scheduler: ClusterScheduler, + sc: SparkContext, + driverFilePath: String) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { - val tmpPath = new Path(driverFilePath + "_tmp"); - val filePath = new Path(driverFilePath); + val tmpPath = new Path(driverFilePath + "_tmp") + val filePath = new Path(driverFilePath) val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt @@ -44,8 +43,8 @@ private[spark] class SimrSchedulerBackend( val conf = new Configuration() val fs = FileSystem.get(conf) - logInfo("Writing to HDFS file: " + driverFilePath); - logInfo("Writing AKKA address: " + driverUrl); + logInfo("Writing to HDFS file: " + driverFilePath) + logInfo("Writing Akka address: " + driverUrl) // Create temporary file to prevent race condition where executors get empty driverUrl file val temp = fs.create(tmpPath, true) @@ -54,16 +53,14 @@ private[spark] class SimrSchedulerBackend( temp.close() // "Atomic" rename - fs.rename(tmpPath, filePath); + fs.rename(tmpPath, filePath) } override def stop() { val conf = new Configuration() val fs = FileSystem.get(conf) - fs.delete(new Path(driverFilePath), false); + fs.delete(new Path(driverFilePath), false) super.stopExecutors() super.stop() } } - - -- cgit v1.2.3