aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-10-24 12:12:11 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-10-25 16:48:33 -0700
commiteef261c89286ddbcdcc03684c1a5d0b94d6da321 (patch)
tree5334546b7671eb2cd3bd8ba18701f2c3b2f1115a /core
parent05a0df2b9e0e4c3d032404187c0adf6d6d881860 (diff)
downloadspark-eef261c89286ddbcdcc03684c1a5d0b94d6da321.tar.gz
spark-eef261c89286ddbcdcc03684c1a5d0b94d6da321.tar.bz2
spark-eef261c89286ddbcdcc03684c1a5d0b94d6da321.zip
fixing comments on PR
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala25
3 files changed, 18 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3b39c97260..c9bc01cba5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -57,20 +57,13 @@ import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
-import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler
+import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
TimeStampedHashMap, Utils}
-import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.storage.StorageStatus
-import scala.Some
-import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.storage.StorageStatus
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -133,7 +126,7 @@ class SparkContext(
val startTime = System.currentTimeMillis()
// Add each JAR given through the constructor
- if (jars != null && jars != Seq(null)) {
+ if (jars != null) {
jars.foreach { addJar(_) }
}
@@ -164,7 +157,7 @@ class SparkContext(
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster
val MESOS_REGEX = """mesos://(.*)""".r
- //Regular expression for connection to Simr cluster
+ // Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
master match {
@@ -694,8 +687,7 @@ class SparkContext(
*/
def addJar(path: String) {
if (path == null) {
- logWarning("null specified as parameter to addJar",
- new SparkException("null specified as parameter to addJar"))
+ logWarning("null specified as parameter to addJar")
} else {
var key = ""
if (path.contains("\\")) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 80a9b4667d..70f3f88401 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -182,7 +182,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
if (driverActor != null) {
logInfo("Shutting down all executors")
val future = driverActor.ask(StopExecutors)(timeout)
- Await.result(future, timeout)
+ Await.ready(future, timeout)
}
} catch {
case e: Exception =>
@@ -194,7 +194,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
try {
if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout)
- Await.result(future, timeout)
+ Await.ready(future, timeout)
}
} catch {
case e: Exception =>
@@ -217,7 +217,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
def removeExecutor(executorId: String, reason: String) {
try {
val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
- Await.result(future, timeout)
+ Await.ready(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index ae56244979..d78bdbaa7a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -1,5 +1,3 @@
-package org.apache.spark.scheduler.cluster
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,20 +15,21 @@ package org.apache.spark.scheduler.cluster
* limitations under the License.
*/
+package org.apache.spark.scheduler.cluster
-import org.apache.spark.{Logging, SparkContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.spark.{Logging, SparkContext}
private[spark] class SimrSchedulerBackend(
- scheduler: ClusterScheduler,
- sc: SparkContext,
- driverFilePath: String)
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ driverFilePath: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
- val tmpPath = new Path(driverFilePath + "_tmp");
- val filePath = new Path(driverFilePath);
+ val tmpPath = new Path(driverFilePath + "_tmp")
+ val filePath = new Path(driverFilePath)
val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
@@ -44,8 +43,8 @@ private[spark] class SimrSchedulerBackend(
val conf = new Configuration()
val fs = FileSystem.get(conf)
- logInfo("Writing to HDFS file: " + driverFilePath);
- logInfo("Writing AKKA address: " + driverUrl);
+ logInfo("Writing to HDFS file: " + driverFilePath)
+ logInfo("Writing Akka address: " + driverUrl)
// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
@@ -54,16 +53,14 @@ private[spark] class SimrSchedulerBackend(
temp.close()
// "Atomic" rename
- fs.rename(tmpPath, filePath);
+ fs.rename(tmpPath, filePath)
}
override def stop() {
val conf = new Configuration()
val fs = FileSystem.get(conf)
- fs.delete(new Path(driverFilePath), false);
+ fs.delete(new Path(driverFilePath), false)
super.stopExecutors()
super.stop()
}
}
-
-