diff options
Diffstat (limited to 'core/src')
5 files changed, 47 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 5be5317f40..e93b10fd7e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -431,4 +431,10 @@ object JavaSparkContext { implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc) implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc + + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to SparkContext. + */ + def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 9bbd635ab9..481026eaa2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -24,7 +24,8 @@ import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} +import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} + import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ @@ -110,6 +111,12 @@ private[spark] class Client( } } + private def isPossibleMaster(remoteUrl: Address) = { + masterUrls.map(s => Master.toAkkaUrl(s)) + .map(u => AddressFromURIString(u).hostPort) + .contains(remoteUrl.hostPort) + } + override def receive = { case RegisteredApplication(appId_, masterUrl) => appId = appId_ @@ -145,6 +152,9 @@ private[spark] class Client( logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() + case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => + logWarning(s"Could not connect to $address: $cause") + case StopClient => markDead() sender ! true diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 160cca4d6c..9a5e3cb77e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -29,6 +29,9 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream} * A serializer. Because some serialization libraries are not thread safe, this class is used to * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are * guaranteed to only be called from one thread at a time. + * + * Implementations of this trait should have a zero-arg constructor or a constructor that accepts a + * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence. */ trait Serializer { def newInstance(): SerializerInstance diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 22465272f3..36a37af4f8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkConf * creating a new one. */ private[spark] class SerializerManager { + // TODO: Consider moving this into SparkConf itself to remove the global singleton. private val serializers = new ConcurrentHashMap[String, Serializer] private var _default: Serializer = _ @@ -53,8 +54,18 @@ private[spark] class SerializerManager { if (serializer == null) { val clsLoader = Thread.currentThread.getContextClassLoader val cls = Class.forName(clsName, true, clsLoader) - val constructor = cls.getConstructor(classOf[SparkConf]) - serializer = constructor.newInstance(conf).asInstanceOf[Serializer] + + // First try with the constructor that takes SparkConf. If we can't find one, + // use a no-arg constructor instead. + try { + val constructor = cls.getConstructor(classOf[SparkConf]) + serializer = constructor.newInstance(conf).asInstanceOf[Serializer] + } catch { + case _: NoSuchMethodException => + val constructor = cls.getConstructor() + serializer = constructor.newInstance().asInstanceOf[Serializer] + } + serializers.put(clsName, serializer) } serializer diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 362cea5e3e..7df7e3d8e5 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -21,6 +21,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory +import org.apache.log4j.{Level, Logger} + import org.apache.spark.SparkConf /** @@ -47,8 +49,15 @@ private[spark] object AkkaUtils { val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt - val lifecycleEvents = - if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" + val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean + val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" + if (!akkaLogLifecycleEvents) { + // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. + // See: https://www.assembla.com/spaces/akka/tickets/3787#/ + Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) + } + + val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off" val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt val akkaFailureDetector = @@ -73,7 +82,10 @@ private[spark] object AkkaUtils { |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize + |akka.log-config-on-start = $logAkkaConfig |akka.remote.log-remote-lifecycle-events = $lifecycleEvents + |akka.log-dead-letters = $lifecycleEvents + |akka.log-dead-letters-during-shutdown = $lifecycleEvents """.stripMargin) val actorSystem = if (indestructible) { |