diff options
Diffstat (limited to 'core/src')
23 files changed, 135 insertions, 70 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java index 46d61503bc..d2d778b756 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java @@ -31,7 +31,8 @@ import java.util.concurrent.TimeUnit; class FileClient { - private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName()); + private final FileClientHandler handler; private Channel channel = null; private Bootstrap bootstrap = null; @@ -39,7 +40,7 @@ class FileClient { private final int connectTimeout; private final int sendTimeout = 60; // 1 min - public FileClient(FileClientHandler handler, int connectTimeout) { + FileClient(FileClientHandler handler, int connectTimeout) { this.handler = handler; this.connectTimeout = connectTimeout; } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java index fb61be1c12..264cf97d02 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java @@ -25,7 +25,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> { private final FileClientHandler fhandler; - public FileClientChannelInitializer(FileClientHandler handler) { + FileClientChannelInitializer(FileClientHandler handler) { fhandler = handler; } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java index aea7534459..c93425e278 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java @@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory; */ class FileServer { - private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName()); private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private ChannelFuture channelFuture = null; private int port = 0; - private Thread blockingThread = null; - public FileServer(PathResolver pResolver, int port) { + FileServer(PathResolver pResolver, int port) { InetSocketAddress addr = new InetSocketAddress(port); // Configure the server. @@ -70,7 +69,8 @@ class FileServer { * Start the file server asynchronously in a new thread. */ public void start() { - blockingThread = new Thread() { + Thread blockingThread = new Thread() { + @Override public void run() { try { channelFuture.channel().closeFuture().sync(); diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java index 3f15ff898f..46efec8f8d 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java @@ -25,9 +25,9 @@ import io.netty.handler.codec.string.StringDecoder; class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> { - PathResolver pResolver; + private final PathResolver pResolver; - public FileServerChannelInitializer(PathResolver pResolver) { + FileServerChannelInitializer(PathResolver pResolver) { this.pResolver = pResolver; } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index e2d9391b4c..3ac045f944 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory; class FileServerHandler extends SimpleChannelInboundHandler<String> { - private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName()); private final PathResolver pResolver; - public FileServerHandler(PathResolver pResolver){ + FileServerHandler(PathResolver pResolver){ this.pResolver = pResolver; } @@ -61,7 +61,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> { ctx.flush(); return; } - int len = new Long(length).intValue(); + int len = (int) length; ctx.write((new FileHeader(len, blockId)).buffer()); try { ctx.write(new DefaultFileRegion(new FileInputStream(file) diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 9f7ced44cf..7ad8d03efb 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -1,26 +1,26 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-public interface PathResolver {
- /** Get the file segment in which the given block resides. */
- public FileSegment getBlockLocation(BlockId blockId);
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.netty; + +import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.FileSegment; + +public interface PathResolver { + /** Get the file segment in which the given block resides. */ + FileSegment getBlockLocation(BlockId blockId); +} diff --git a/core/src/main/resources/org/apache/spark/default-log4j.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties index d72dbadc39..d72dbadc39 100644 --- a/core/src/main/resources/org/apache/spark/default-log4j.properties +++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index d519fc5a29..4a34989e50 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -104,13 +104,15 @@ trait Logging { // If Log4j doesn't seem initialized, load a default properties file val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { - val defaultLogProps = "org/apache/spark/default-log4j.properties" + val defaultLogProps = "org/apache/spark/log4j-defaults.properties" val classLoader = this.getClass.getClassLoader Option(classLoader.getResource(defaultLogProps)) match { - case Some(url) => PropertyConfigurator.configure(url) - case None => System.err.println(s"Spark was unable to load $defaultLogProps") + case Some(url) => + PropertyConfigurator.configure(url) + log.info(s"Using Spark's default log4j profile: $defaultLogProps") + case None => + System.err.println(s"Spark was unable to load $defaultLogProps") } - log.info(s"Using Spark's default log4j profile: $defaultLogProps") } Logging.initialized = true diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 98343e9532..2de32231e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -24,7 +24,7 @@ import com.typesafe.config.ConfigFactory * * @param loadDefaults whether to load values from the system properties and classpath */ -class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { +class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging { /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) @@ -67,7 +67,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Set JAR files to distribute to the cluster. */ def setJars(jars: Seq[String]): SparkConf = { - set("spark.jars", jars.mkString(",")) + for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor") + set("spark.jars", jars.filter(_ != null).mkString(",")) } /** Set JAR files to distribute to the cluster. (Java-friendly version.) */ @@ -164,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { getOption(key).map(_.toDouble).getOrElse(defaultValue) } + /** Get a parameter as a boolean, falling back to a default if not set */ + def getBoolean(key: String, defaultValue: Boolean): Boolean = { + getOption(key).map(_.toBoolean).getOrElse(defaultValue) + } + /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." @@ -171,6 +177,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { .map{case (k, v) => (k.substring(prefix.length), v)} } + /** Get all akka conf variables set on this SparkConf */ + def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")} + /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.contains(key) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e80e43af6d..0e47f4e442 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -116,6 +116,10 @@ class SparkContext( throw new SparkException("An application must be set in your configuration") } + if (conf.get("spark.logConf", "false").toBoolean) { + logInfo("Spark configuration:\n" + conf.toDebugString) + } + // Set Spark driver host and port system properties conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.port", "0") diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 5be5317f40..e93b10fd7e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -431,4 +431,10 @@ object JavaSparkContext { implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc) implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc + + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to SparkContext. + */ + def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray } diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 19d393a0db..e38459b883 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy private[spark] class ApplicationDescription( val name: String, - val maxCores: Int, /* Integer.MAX_VALUE denotes an unlimited number of cores */ + val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, val sparkHome: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 9bbd635ab9..481026eaa2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -24,7 +24,8 @@ import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} +import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} + import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ @@ -110,6 +111,12 @@ private[spark] class Client( } } + private def isPossibleMaster(remoteUrl: Address) = { + masterUrls.map(s => Master.toAkkaUrl(s)) + .map(u => AddressFromURIString(u).hostPort) + .contains(remoteUrl.hostPort) + } + override def receive = { case RegisteredApplication(appId_, masterUrl) => appId = appId_ @@ -145,6 +152,9 @@ private[spark] class Client( logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() + case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => + logWarning(s"Could not connect to $address: $cause") + case StopClient => markDead() sender ! true diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index ef649fd80c..28ebbdc66b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -48,7 +48,7 @@ private[spark] object TestClient { val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = new SparkConf) val desc = new ApplicationDescription( - "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), + "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored") val listener = new TestListener val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 5150b7c7de..3e26379166 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -28,7 +28,8 @@ private[spark] class ApplicationInfo( val desc: ApplicationDescription, val submitDate: Date, val driver: ActorRef, - val appUiUrl: String) + val appUiUrl: String, + defaultCores: Int) extends Serializable { @transient var state: ApplicationState.Value = _ @@ -81,7 +82,9 @@ private[spark] class ApplicationInfo( } } - def coresLeft: Int = desc.maxCores - coresGranted + private val myMaxCores = desc.maxCores.getOrElse(defaultCores) + + def coresLeft: Int = myMaxCores - coresGranted private var _retryCount = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7b696cfcca..6617b7100f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -88,7 +88,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. - val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean + val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true) + + // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue) + val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) + if (defaultCores < 1) { + throw new SparkException("spark.deploy.defaultCores must be positive") + } override def preStart() { logInfo("Starting Spark master at " + masterUrl) @@ -426,7 +432,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = { val now = System.currentTimeMillis() val date = new Date(now) - new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl) + new ApplicationInfo( + now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores) } def registerApplication(app: ApplicationInfo): Unit = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9858717d13..73fc37444e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend( var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.getOption("spark.cores.max").map(_.toInt) override def start() { super.start() diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 160cca4d6c..9a5e3cb77e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -29,6 +29,9 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream} * A serializer. Because some serialization libraries are not thread safe, this class is used to * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are * guaranteed to only be called from one thread at a time. + * + * Implementations of this trait should have a zero-arg constructor or a constructor that accepts a + * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence. */ trait Serializer { def newInstance(): SerializerInstance diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 22465272f3..36a37af4f8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkConf * creating a new one. */ private[spark] class SerializerManager { + // TODO: Consider moving this into SparkConf itself to remove the global singleton. private val serializers = new ConcurrentHashMap[String, Serializer] private var _default: Serializer = _ @@ -53,8 +54,18 @@ private[spark] class SerializerManager { if (serializer == null) { val clsLoader = Thread.currentThread.getContextClassLoader val cls = Class.forName(clsName, true, clsLoader) - val constructor = cls.getConstructor(classOf[SparkConf]) - serializer = constructor.newInstance(conf).asInstanceOf[Serializer] + + // First try with the constructor that takes SparkConf. If we can't find one, + // use a no-arg constructor instead. + try { + val constructor = cls.getConstructor(classOf[SparkConf]) + serializer = constructor.newInstance(conf).asInstanceOf[Serializer] + } catch { + case _: NoSuchMethodException => + val constructor = cls.getConstructor() + serializer = constructor.newInstance().asInstanceOf[Serializer] + } + serializers.put(clsName, serializer) } serializer diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index b4c4e1dbbc..3f009a8998 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,10 +17,13 @@ package org.apache.spark.util +import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory + +import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf /** @@ -47,15 +50,23 @@ private[spark] object AkkaUtils { val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt - val lifecycleEvents = - if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" + val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean + val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" + if (!akkaLogLifecycleEvents) { + // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. + // See: https://www.assembla.com/spaces/akka/tickets/3787#/ + Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) + } + + val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off" val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt val akkaFailureDetector = conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt - val akkaConf = ConfigFactory.parseString( + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( + ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -73,8 +84,11 @@ private[spark] object AkkaUtils { |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize + |akka.log-config-on-start = $logAkkaConfig |akka.remote.log-remote-lifecycle-events = $lifecycleEvents - """.stripMargin) + |akka.log-dead-letters = $lifecycleEvents + |akka.log-dead-letters-during-shutdown = $lifecycleEvents + """.stripMargin)) val actorSystem = if (indestructible) { IndestructibleActorSystem(name, akkaConf) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 0b38e239f9..331fa3a642 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -70,10 +70,11 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc() : ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map()) - new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl") } def createAppInfo() : ApplicationInfo = { - new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr") + new ApplicationInfo( + 3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue) } def createWorkerInfo() : WorkerInfo = { new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index f58b1ee05a..be93074b7b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -26,8 +26,8 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription} class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.env.get("spark.home")).get - val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()), + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 032c2f2f69..f60ce270c7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -40,8 +40,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT var actorSystem: ActorSystem = null var master: BlockManagerMaster = null var oldArch: String = null - var oldOops: String = null - var oldHeartBeat: String = null // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer.mb", "1") @@ -61,7 +59,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case - System.setProperty("os.arch", "amd64") + oldArch = System.setProperty("os.arch", "amd64") conf.set("os.arch", "amd64") conf.set("spark.test.useCompressedOops", "true") conf.set("spark.storage.disableBlockManagerHeartBeat", "true") @@ -94,11 +92,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT System.clearProperty("os.arch") } - if (oldOops != null) { - conf.set("spark.test.useCompressedOops", oldOops) - } else { - System.clearProperty("spark.test.useCompressedOops") - } + System.clearProperty("spark.test.useCompressedOops") } test("StorageLevel object caching") { |