aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorwalker <walker@transwarp01.(none)>2014-01-09 01:22:26 +0800
committerwalker <walker@transwarp01.(none)>2014-01-09 01:22:26 +0800
commitd942f95d7ee39879ac326abf61b730f023506e4e (patch)
treede011c7319cb121bfeb94473095d089982953f17 /core/src/main
parent2ad315e80fb297f1e6cf77ec6fcdcf25750b4a3d (diff)
parentbb6a39a6872fb8e7118c5a402b7b9d67459e9a64 (diff)
downloadspark-d942f95d7ee39879ac326abf61b730f023506e4e.tar.gz
spark-d942f95d7ee39879ac326abf61b730f023506e4e.tar.bz2
spark-d942f95d7ee39879ac326abf61b730f023506e4e.zip
Merge remote branch 'upstream/master'
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java5
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java2
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServer.java8
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java4
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java6
-rwxr-xr-xcore/src/main/java/org/apache/spark/network/netty/PathResolver.java52
-rw-r--r--core/src/main/resources/org/apache/spark/log4j-defaults.properties (renamed from core/src/main/resources/org/apache/spark/default-log4j.properties)0
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala13
20 files changed, 100 insertions, 81 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 46d61503bc..d2d778b756 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -31,7 +31,8 @@ import java.util.concurrent.TimeUnit;
class FileClient {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName());
+
private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
@@ -39,7 +40,7 @@ class FileClient {
private final int connectTimeout;
private final int sendTimeout = 60; // 1 min
- public FileClient(FileClientHandler handler, int connectTimeout) {
+ FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
this.connectTimeout = connectTimeout;
}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
index fb61be1c12..264cf97d02 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
@@ -25,7 +25,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private final FileClientHandler fhandler;
- public FileClientChannelInitializer(FileClientHandler handler) {
+ FileClientChannelInitializer(FileClientHandler handler) {
fhandler = handler;
}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index aea7534459..c93425e278 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory;
*/
class FileServer {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName());
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private ChannelFuture channelFuture = null;
private int port = 0;
- private Thread blockingThread = null;
- public FileServer(PathResolver pResolver, int port) {
+ FileServer(PathResolver pResolver, int port) {
InetSocketAddress addr = new InetSocketAddress(port);
// Configure the server.
@@ -70,7 +69,8 @@ class FileServer {
* Start the file server asynchronously in a new thread.
*/
public void start() {
- blockingThread = new Thread() {
+ Thread blockingThread = new Thread() {
+ @Override
public void run() {
try {
channelFuture.channel().closeFuture().sync();
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
index 3f15ff898f..46efec8f8d 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -25,9 +25,9 @@ import io.netty.handler.codec.string.StringDecoder;
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
- PathResolver pResolver;
+ private final PathResolver pResolver;
- public FileServerChannelInitializer(PathResolver pResolver) {
+ FileServerChannelInitializer(PathResolver pResolver) {
this.pResolver = pResolver;
}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index e2d9391b4c..3ac045f944 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
class FileServerHandler extends SimpleChannelInboundHandler<String> {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName());
private final PathResolver pResolver;
- public FileServerHandler(PathResolver pResolver){
+ FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}
@@ -61,7 +61,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
ctx.flush();
return;
}
- int len = new Long(length).intValue();
+ int len = (int) length;
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.write(new DefaultFileRegion(new FileInputStream(file)
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
index 9f7ced44cf..7ad8d03efb 100755
--- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
+++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
@@ -1,26 +1,26 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-public interface PathResolver {
- /** Get the file segment in which the given block resides. */
- public FileSegment getBlockLocation(BlockId blockId);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.FileSegment;
+
+public interface PathResolver {
+ /** Get the file segment in which the given block resides. */
+ FileSegment getBlockLocation(BlockId blockId);
+}
diff --git a/core/src/main/resources/org/apache/spark/default-log4j.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
index d72dbadc39..d72dbadc39 100644
--- a/core/src/main/resources/org/apache/spark/default-log4j.properties
+++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index d519fc5a29..4a34989e50 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -104,13 +104,15 @@ trait Logging {
// If Log4j doesn't seem initialized, load a default properties file
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
- val defaultLogProps = "org/apache/spark/default-log4j.properties"
+ val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
val classLoader = this.getClass.getClassLoader
Option(classLoader.getResource(defaultLogProps)) match {
- case Some(url) => PropertyConfigurator.configure(url)
- case None => System.err.println(s"Spark was unable to load $defaultLogProps")
+ case Some(url) =>
+ PropertyConfigurator.configure(url)
+ log.info(s"Using Spark's default log4j profile: $defaultLogProps")
+ case None =>
+ System.err.println(s"Spark was unable to load $defaultLogProps")
}
- log.info(s"Using Spark's default log4j profile: $defaultLogProps")
}
Logging.initialized = true
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index cdae167aef..77b8ca1cce 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
// Set to the MapOutputTrackerActor living on the driver
- var trackerActor: Either[ActorRef, ActorSelection] = _
+ var trackerActor: ActorRef = _
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
@@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
// throw a SparkException if this fails.
private def askTracker(message: Any): Any = {
try {
- /*
- The difference between ActorRef and ActorSelection is well explained here:
- http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
- In spark a map output tracker can be either started on Driver where it is created which
- is an ActorRef or it can be on executor from where it is looked up which is an
- actorSelection.
- */
- val future = trackerActor match {
- case Left(a: ActorRef) => a.ask(message)(timeout)
- case Right(b: ActorSelection) => b.ask(message)(timeout)
- }
+ val future = trackerActor.ask(message)(timeout)
Await.result(future, timeout)
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 98343e9532..2de32231e8 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -24,7 +24,7 @@ import com.typesafe.config.ConfigFactory
*
* @param loadDefaults whether to load values from the system properties and classpath
*/
-class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
+class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
@@ -67,7 +67,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
- set("spark.jars", jars.mkString(","))
+ for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
+ set("spark.jars", jars.filter(_ != null).mkString(","))
}
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@@ -164,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
getOption(key).map(_.toDouble).getOrElse(defaultValue)
}
+ /** Get a parameter as a boolean, falling back to a default if not set */
+ def getBoolean(key: String, defaultValue: Boolean): Boolean = {
+ getOption(key).map(_.toBoolean).getOrElse(defaultValue)
+ }
+
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
@@ -171,6 +177,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
.map{case (k, v) => (k.substring(prefix.length), v)}
}
+ /** Get all akka conf variables set on this SparkConf */
+ def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
+
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e80e43af6d..0e47f4e442 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -116,6 +116,10 @@ class SparkContext(
throw new SparkException("An application must be set in your configuration")
}
+ if (conf.get("spark.logConf", "false").toBoolean) {
+ logInfo("Spark configuration:\n" + conf.toDebugString)
+ }
+
// Set Spark driver host and port system properties
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 634a94f0a7..2e36ccb9a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,11 +17,10 @@
package org.apache.spark
-import collection.mutable
-import serializer.Serializer
+import scala.collection.mutable
+import scala.concurrent.Await
import akka.actor._
-import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
@@ -157,17 +156,18 @@ object SparkEnv extends Logging {
conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
conf)
- def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
+ def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
- Left(actorSystem.actorOf(Props(newActor), name = name))
+ actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
- logInfo("Connecting to " + name + ": " + url)
- Right(actorSystem.actorSelection(url))
+ val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ logInfo(s"Connecting to $name: $url")
+ Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 19d393a0db..e38459b883 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
private[spark] class ApplicationDescription(
val name: String,
- val maxCores: Int, /* Integer.MAX_VALUE denotes an unlimited number of cores */
+ val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: String,
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index ef649fd80c..28ebbdc66b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -48,7 +48,7 @@ private[spark] object TestClient {
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = new SparkConf)
val desc = new ApplicationDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
+ "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
"dummy-spark-home", "ignored")
val listener = new TestListener
val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 5150b7c7de..3e26379166 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -28,7 +28,8 @@ private[spark] class ApplicationInfo(
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef,
- val appUiUrl: String)
+ val appUiUrl: String,
+ defaultCores: Int)
extends Serializable {
@transient var state: ApplicationState.Value = _
@@ -81,7 +82,9 @@ private[spark] class ApplicationInfo(
}
}
- def coresLeft: Int = desc.maxCores - coresGranted
+ private val myMaxCores = desc.maxCores.getOrElse(defaultCores)
+
+ def coresLeft: Int = myMaxCores - coresGranted
private var _retryCount = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7b696cfcca..6617b7100f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -88,7 +88,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
+
+ // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
+ val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
+ if (defaultCores < 1) {
+ throw new SparkException("spark.deploy.defaultCores must be positive")
+ }
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -426,7 +432,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ new ApplicationInfo(
+ now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores)
}
def registerApplication(app: ApplicationInfo): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9858717d13..73fc37444e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend(
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
override def start() {
super.start()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index b5afe8cd23..51a29ed8ef 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
-class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
- conf: SparkConf) extends Logging {
+class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
@@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
- val future = driverActor match {
- case Left(a: ActorRef) => a.ask(message)(timeout)
- case Right(b: ActorSelection) => b.ask(message)(timeout)
- }
+ val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index dca98c6c05..729ba2c550 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -95,7 +95,7 @@ private[spark] object ThreadingTest {
val conf = new SparkConf()
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7df7e3d8e5..3f009a8998 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark.util
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
+import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
- val akkaConf = ConfigFactory.parseString(
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+ ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin)
+ """.stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)
@@ -103,4 +105,9 @@ private[spark] object AkkaUtils {
def askTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
}
+
+ /** Returns the default Spark timeout to use for Akka remote actor lookup. */
+ def lookupTimeout(conf: SparkConf): FiniteDuration = {
+ Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
+ }
}