aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java5
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java2
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServer.java8
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java4
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java6
-rwxr-xr-xcore/src/main/java/org/apache/spark/network/netty/PathResolver.java52
-rw-r--r--core/src/main/resources/org/apache/spark/log4j-defaults.properties (renamed from core/src/main/resources/org/apache/spark/default-log4j.properties)0
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/Serializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala10
23 files changed, 135 insertions, 70 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 46d61503bc..d2d778b756 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -31,7 +31,8 @@ import java.util.concurrent.TimeUnit;
class FileClient {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName());
+
private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
@@ -39,7 +40,7 @@ class FileClient {
private final int connectTimeout;
private final int sendTimeout = 60; // 1 min
- public FileClient(FileClientHandler handler, int connectTimeout) {
+ FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
this.connectTimeout = connectTimeout;
}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
index fb61be1c12..264cf97d02 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
@@ -25,7 +25,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private final FileClientHandler fhandler;
- public FileClientChannelInitializer(FileClientHandler handler) {
+ FileClientChannelInitializer(FileClientHandler handler) {
fhandler = handler;
}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index aea7534459..c93425e278 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory;
*/
class FileServer {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName());
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private ChannelFuture channelFuture = null;
private int port = 0;
- private Thread blockingThread = null;
- public FileServer(PathResolver pResolver, int port) {
+ FileServer(PathResolver pResolver, int port) {
InetSocketAddress addr = new InetSocketAddress(port);
// Configure the server.
@@ -70,7 +69,8 @@ class FileServer {
* Start the file server asynchronously in a new thread.
*/
public void start() {
- blockingThread = new Thread() {
+ Thread blockingThread = new Thread() {
+ @Override
public void run() {
try {
channelFuture.channel().closeFuture().sync();
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
index 3f15ff898f..46efec8f8d 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -25,9 +25,9 @@ import io.netty.handler.codec.string.StringDecoder;
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
- PathResolver pResolver;
+ private final PathResolver pResolver;
- public FileServerChannelInitializer(PathResolver pResolver) {
+ FileServerChannelInitializer(PathResolver pResolver) {
this.pResolver = pResolver;
}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index e2d9391b4c..3ac045f944 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
class FileServerHandler extends SimpleChannelInboundHandler<String> {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName());
private final PathResolver pResolver;
- public FileServerHandler(PathResolver pResolver){
+ FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}
@@ -61,7 +61,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
ctx.flush();
return;
}
- int len = new Long(length).intValue();
+ int len = (int) length;
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.write(new DefaultFileRegion(new FileInputStream(file)
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
index 9f7ced44cf..7ad8d03efb 100755
--- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
+++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
@@ -1,26 +1,26 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-public interface PathResolver {
- /** Get the file segment in which the given block resides. */
- public FileSegment getBlockLocation(BlockId blockId);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.FileSegment;
+
+public interface PathResolver {
+ /** Get the file segment in which the given block resides. */
+ FileSegment getBlockLocation(BlockId blockId);
+}
diff --git a/core/src/main/resources/org/apache/spark/default-log4j.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
index d72dbadc39..d72dbadc39 100644
--- a/core/src/main/resources/org/apache/spark/default-log4j.properties
+++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index d519fc5a29..4a34989e50 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -104,13 +104,15 @@ trait Logging {
// If Log4j doesn't seem initialized, load a default properties file
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
- val defaultLogProps = "org/apache/spark/default-log4j.properties"
+ val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
val classLoader = this.getClass.getClassLoader
Option(classLoader.getResource(defaultLogProps)) match {
- case Some(url) => PropertyConfigurator.configure(url)
- case None => System.err.println(s"Spark was unable to load $defaultLogProps")
+ case Some(url) =>
+ PropertyConfigurator.configure(url)
+ log.info(s"Using Spark's default log4j profile: $defaultLogProps")
+ case None =>
+ System.err.println(s"Spark was unable to load $defaultLogProps")
}
- log.info(s"Using Spark's default log4j profile: $defaultLogProps")
}
Logging.initialized = true
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 98343e9532..2de32231e8 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -24,7 +24,7 @@ import com.typesafe.config.ConfigFactory
*
* @param loadDefaults whether to load values from the system properties and classpath
*/
-class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
+class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
@@ -67,7 +67,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
- set("spark.jars", jars.mkString(","))
+ for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
+ set("spark.jars", jars.filter(_ != null).mkString(","))
}
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@@ -164,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
getOption(key).map(_.toDouble).getOrElse(defaultValue)
}
+ /** Get a parameter as a boolean, falling back to a default if not set */
+ def getBoolean(key: String, defaultValue: Boolean): Boolean = {
+ getOption(key).map(_.toBoolean).getOrElse(defaultValue)
+ }
+
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
@@ -171,6 +177,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
.map{case (k, v) => (k.substring(prefix.length), v)}
}
+ /** Get all akka conf variables set on this SparkConf */
+ def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
+
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e80e43af6d..0e47f4e442 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -116,6 +116,10 @@ class SparkContext(
throw new SparkException("An application must be set in your configuration")
}
+ if (conf.get("spark.logConf", "false").toBoolean) {
+ logInfo("Spark configuration:\n" + conf.toDebugString)
+ }
+
// Set Spark driver host and port system properties
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 5be5317f40..e93b10fd7e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -431,4 +431,10 @@ object JavaSparkContext {
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
+
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 19d393a0db..e38459b883 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
private[spark] class ApplicationDescription(
val name: String,
- val maxCores: Int, /* Integer.MAX_VALUE denotes an unlimited number of cores */
+ val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: String,
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 9bbd635ab9..481026eaa2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -24,7 +24,8 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
@@ -110,6 +111,12 @@ private[spark] class Client(
}
}
+ private def isPossibleMaster(remoteUrl: Address) = {
+ masterUrls.map(s => Master.toAkkaUrl(s))
+ .map(u => AddressFromURIString(u).hostPort)
+ .contains(remoteUrl.hostPort)
+ }
+
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
@@ -145,6 +152,9 @@ private[spark] class Client(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
+ case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
+ logWarning(s"Could not connect to $address: $cause")
+
case StopClient =>
markDead()
sender ! true
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index ef649fd80c..28ebbdc66b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -48,7 +48,7 @@ private[spark] object TestClient {
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = new SparkConf)
val desc = new ApplicationDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
+ "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
"dummy-spark-home", "ignored")
val listener = new TestListener
val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 5150b7c7de..3e26379166 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -28,7 +28,8 @@ private[spark] class ApplicationInfo(
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef,
- val appUiUrl: String)
+ val appUiUrl: String,
+ defaultCores: Int)
extends Serializable {
@transient var state: ApplicationState.Value = _
@@ -81,7 +82,9 @@ private[spark] class ApplicationInfo(
}
}
- def coresLeft: Int = desc.maxCores - coresGranted
+ private val myMaxCores = desc.maxCores.getOrElse(defaultCores)
+
+ def coresLeft: Int = myMaxCores - coresGranted
private var _retryCount = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7b696cfcca..6617b7100f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -88,7 +88,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
+
+ // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
+ val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
+ if (defaultCores < 1) {
+ throw new SparkException("spark.deploy.defaultCores must be positive")
+ }
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -426,7 +432,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ new ApplicationInfo(
+ now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores)
}
def registerApplication(app: ApplicationInfo): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9858717d13..73fc37444e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend(
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
override def start() {
super.start()
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 160cca4d6c..9a5e3cb77e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -29,6 +29,9 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream}
* A serializer. Because some serialization libraries are not thread safe, this class is used to
* create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are
* guaranteed to only be called from one thread at a time.
+ *
+ * Implementations of this trait should have a zero-arg constructor or a constructor that accepts a
+ * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence.
*/
trait Serializer {
def newInstance(): SerializerInstance
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 22465272f3..36a37af4f8 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkConf
* creating a new one.
*/
private[spark] class SerializerManager {
+ // TODO: Consider moving this into SparkConf itself to remove the global singleton.
private val serializers = new ConcurrentHashMap[String, Serializer]
private var _default: Serializer = _
@@ -53,8 +54,18 @@ private[spark] class SerializerManager {
if (serializer == null) {
val clsLoader = Thread.currentThread.getContextClassLoader
val cls = Class.forName(clsName, true, clsLoader)
- val constructor = cls.getConstructor(classOf[SparkConf])
- serializer = constructor.newInstance(conf).asInstanceOf[Serializer]
+
+ // First try with the constructor that takes SparkConf. If we can't find one,
+ // use a no-arg constructor instead.
+ try {
+ val constructor = cls.getConstructor(classOf[SparkConf])
+ serializer = constructor.newInstance(conf).asInstanceOf[Serializer]
+ } catch {
+ case _: NoSuchMethodException =>
+ val constructor = cls.getConstructor()
+ serializer = constructor.newInstance().asInstanceOf[Serializer]
+ }
+
serializers.put(clsName, serializer)
}
serializer
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index b4c4e1dbbc..3f009a8998 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,10 +17,13 @@
package org.apache.spark.util
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
+
+import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
@@ -47,15 +50,23 @@ private[spark] object AkkaUtils {
val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
- val lifecycleEvents =
- if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+ val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
+ val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
+ if (!akkaLogLifecycleEvents) {
+ // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
+ // See: https://www.assembla.com/spaces/akka/tickets/3787#/
+ Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
+ }
+
+ val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
- val akkaConf = ConfigFactory.parseString(
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+ ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -73,8 +84,11 @@ private[spark] object AkkaUtils {
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
+ |akka.log-config-on-start = $logAkkaConfig
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
- """.stripMargin)
+ |akka.log-dead-letters = $lifecycleEvents
+ |akka.log-dead-letters-during-shutdown = $lifecycleEvents
+ """.stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 0b38e239f9..331fa3a642 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -70,10 +70,11 @@ class JsonProtocolSuite extends FunSuite {
def createAppDesc() : ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
- new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl")
+ new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
- new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr")
+ new ApplicationInfo(
+ 3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue)
}
def createWorkerInfo() : WorkerInfo = {
new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index f58b1ee05a..be93074b7b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,8 +26,8 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
- val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.env.get("spark.home")).get
- val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()),
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+ val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 032c2f2f69..f60ce270c7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -40,8 +40,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
- var oldOops: String = null
- var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer.mb", "1")
@@ -61,7 +59,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- System.setProperty("os.arch", "amd64")
+ oldArch = System.setProperty("os.arch", "amd64")
conf.set("os.arch", "amd64")
conf.set("spark.test.useCompressedOops", "true")
conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
@@ -94,11 +92,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
System.clearProperty("os.arch")
}
- if (oldOops != null) {
- conf.set("spark.test.useCompressedOops", oldOops)
- } else {
- System.clearProperty("spark.test.useCompressedOops")
- }
+ System.clearProperty("spark.test.useCompressedOops")
}
test("StorageLevel object caching") {