aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala189
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala297
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala54
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala56
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala47
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/network/ReceiverTest.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/network/SenderTest.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala110
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/Serializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala58
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala25
85 files changed, 1287 insertions, 597 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 6e922a612a..5f73d234aa 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -41,7 +41,7 @@ class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
extends Serializable {
-
+
val id = Accumulators.newId
@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
@@ -113,7 +113,7 @@ class Accumulable[R, T] (
def setValue(newValue: R) {
this.value = newValue
}
-
+
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
@@ -177,7 +177,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
def zero(initialValue: R): R = {
// We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
// Instead we'll serialize it to a buffer and load it back.
- val ser = new JavaSerializer().newInstance()
+ val ser = new JavaSerializer(new SparkConf(false)).newInstance()
val copy = ser.deserialize[R](ser.serialize(initialValue))
copy.clear() // In case it contained stuff
copy
@@ -215,7 +215,7 @@ private object Accumulators {
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0
-
+
def newId: Long = synchronized {
lastId += 1
return lastId
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index cdfc9dd54e..69a738dc44 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
+ logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 6a973ea495..d519fc5a29 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -17,8 +17,8 @@
package org.apache.spark
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
+import org.apache.log4j.{LogManager, PropertyConfigurator}
+import org.slf4j.{Logger, LoggerFactory}
/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
@@ -33,6 +33,7 @@ trait Logging {
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
+ initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
@@ -89,7 +90,37 @@ trait Logging {
log.isTraceEnabled
}
- // Method for ensuring that logging is initialized, to avoid having multiple
- // threads do it concurrently (as SLF4J initialization is not thread safe).
- protected def initLogging() { log }
+ private def initializeIfNecessary() {
+ if (!Logging.initialized) {
+ Logging.initLock.synchronized {
+ if (!Logging.initialized) {
+ initializeLogging()
+ }
+ }
+ }
+ }
+
+ private def initializeLogging() {
+ // If Log4j doesn't seem initialized, load a default properties file
+ val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ val defaultLogProps = "org/apache/spark/default-log4j.properties"
+ val classLoader = this.getClass.getClassLoader
+ Option(classLoader.getResource(defaultLogProps)) match {
+ case Some(url) => PropertyConfigurator.configure(url)
+ case None => System.err.println(s"Spark was unable to load $defaultLogProps")
+ }
+ log.info(s"Using Spark's default log4j profile: $defaultLogProps")
+ }
+ Logging.initialized = true
+
+ // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
+ // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+ log
+ }
+}
+
+object Logging {
+ @volatile private var initialized = false
+ val initLock = new Object()
}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ccffcc356c..cdae167aef 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -50,9 +50,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
}
}
-private[spark] class MapOutputTracker extends Logging {
+private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
- private val timeout = AkkaUtils.askTimeout
+ private val timeout = AkkaUtils.askTimeout(conf)
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
@@ -65,7 +65,7 @@ private[spark] class MapOutputTracker extends Logging {
protected val epochLock = new java.lang.Object
private val metadataCleaner =
- new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup)
+ new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
@@ -129,7 +129,7 @@ private[spark] class MapOutputTracker extends Logging {
if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val hostPort = Utils.localHostPort()
+ val hostPort = Utils.localHostPort(conf)
// This try-finally prevents hangs due to timeouts:
try {
val fetchedBytes =
@@ -192,7 +192,8 @@ private[spark] class MapOutputTracker extends Logging {
}
}
-private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
+private[spark] class MapOutputTrackerMaster(conf: SparkConf)
+ extends MapOutputTracker(conf) {
// Cache a serialized version of the output statuses for each shuffle to send them out faster
private var cacheEpoch = epoch
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index bcec41c439..31b0773bfe 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -52,7 +52,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
- if (System.getProperty("spark.default.parallelism") != null) {
+ if (rdd.context.conf.contains("spark.default.parallelism")) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
@@ -90,7 +90,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
- private val ascending: Boolean = true)
+ private val ascending: Boolean = true)
extends Partitioner {
// An array of upper bounds for the first (partitions - 1) partitions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
new file mode 100644
index 0000000000..98343e9532
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -0,0 +1,189 @@
+package org.apache.spark
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import com.typesafe.config.ConfigFactory
+
+/**
+ * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
+ *
+ * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
+ * values from both the `spark.*` Java system properties and any `spark.conf` on your application's
+ * classpath (if it has one). In this case, system properties take priority over `spark.conf`, and
+ * any parameters you set directly on the `SparkConf` object take priority over both of those.
+ *
+ * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
+ * get the same configuration no matter what is on the classpath.
+ *
+ * All setter methods in this class support chaining. For example, you can write
+ * `new SparkConf().setMaster("local").setAppName("My app")`.
+ *
+ * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
+ * by the user. Spark does not support modifying the configuration at runtime.
+ *
+ * @param loadDefaults whether to load values from the system properties and classpath
+ */
+class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
+
+ /** Create a SparkConf that loads defaults from system properties and the classpath */
+ def this() = this(true)
+
+ private val settings = new HashMap[String, String]()
+
+ if (loadDefaults) {
+ ConfigFactory.invalidateCaches()
+ val typesafeConfig = ConfigFactory.systemProperties()
+ .withFallback(ConfigFactory.parseResources("spark.conf"))
+ for (e <- typesafeConfig.entrySet().asScala if e.getKey.startsWith("spark.")) {
+ settings(e.getKey) = e.getValue.unwrapped.toString
+ }
+ }
+
+ /** Set a configuration variable. */
+ def set(key: String, value: String): SparkConf = {
+ if (key == null) {
+ throw new NullPointerException("null key")
+ }
+ if (value == null) {
+ throw new NullPointerException("null value")
+ }
+ settings(key) = value
+ this
+ }
+
+ /**
+ * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
+ * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
+ */
+ def setMaster(master: String): SparkConf = {
+ set("spark.master", master)
+ }
+
+ /** Set a name for your application. Shown in the Spark web UI. */
+ def setAppName(name: String): SparkConf = {
+ set("spark.app.name", name)
+ }
+
+ /** Set JAR files to distribute to the cluster. */
+ def setJars(jars: Seq[String]): SparkConf = {
+ set("spark.jars", jars.mkString(","))
+ }
+
+ /** Set JAR files to distribute to the cluster. (Java-friendly version.) */
+ def setJars(jars: Array[String]): SparkConf = {
+ setJars(jars.toSeq)
+ }
+
+ /**
+ * Set an environment variable to be used when launching executors for this application.
+ * These variables are stored as properties of the form spark.executorEnv.VAR_NAME
+ * (for example spark.executorEnv.PATH) but this method makes them easier to set.
+ */
+ def setExecutorEnv(variable: String, value: String): SparkConf = {
+ set("spark.executorEnv." + variable, value)
+ }
+
+ /**
+ * Set multiple environment variables to be used when launching executors.
+ * These variables are stored as properties of the form spark.executorEnv.VAR_NAME
+ * (for example spark.executorEnv.PATH) but this method makes them easier to set.
+ */
+ def setExecutorEnv(variables: Seq[(String, String)]): SparkConf = {
+ for ((k, v) <- variables) {
+ setExecutorEnv(k, v)
+ }
+ this
+ }
+
+ /**
+ * Set multiple environment variables to be used when launching executors.
+ * (Java-friendly version.)
+ */
+ def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {
+ setExecutorEnv(variables.toSeq)
+ }
+
+ /**
+ * Set the location where Spark is installed on worker nodes.
+ */
+ def setSparkHome(home: String): SparkConf = {
+ set("spark.home", home)
+ }
+
+ /** Set multiple parameters together */
+ def setAll(settings: Traversable[(String, String)]) = {
+ this.settings ++= settings
+ this
+ }
+
+ /** Set a parameter if it isn't already configured */
+ def setIfMissing(key: String, value: String): SparkConf = {
+ if (!settings.contains(key)) {
+ settings(key) = value
+ }
+ this
+ }
+
+ /** Remove a parameter from the configuration */
+ def remove(key: String): SparkConf = {
+ settings.remove(key)
+ this
+ }
+
+ /** Get a parameter; throws a NoSuchElementException if it's not set */
+ def get(key: String): String = {
+ settings.getOrElse(key, throw new NoSuchElementException(key))
+ }
+
+ /** Get a parameter, falling back to a default if not set */
+ def get(key: String, defaultValue: String): String = {
+ settings.getOrElse(key, defaultValue)
+ }
+
+ /** Get a parameter as an Option */
+ def getOption(key: String): Option[String] = {
+ settings.get(key)
+ }
+
+ /** Get all parameters as a list of pairs */
+ def getAll: Array[(String, String)] = settings.clone().toArray
+
+ /** Get a parameter as an integer, falling back to a default if not set */
+ def getInt(key: String, defaultValue: Int): Int = {
+ getOption(key).map(_.toInt).getOrElse(defaultValue)
+ }
+
+ /** Get a parameter as a long, falling back to a default if not set */
+ def getLong(key: String, defaultValue: Long): Long = {
+ getOption(key).map(_.toLong).getOrElse(defaultValue)
+ }
+
+ /** Get a parameter as a double, falling back to a default if not set */
+ def getDouble(key: String, defaultValue: Double): Double = {
+ getOption(key).map(_.toDouble).getOrElse(defaultValue)
+ }
+
+ /** Get all executor environment variables set on this SparkConf */
+ def getExecutorEnv: Seq[(String, String)] = {
+ val prefix = "spark.executorEnv."
+ getAll.filter{case (k, v) => k.startsWith(prefix)}
+ .map{case (k, v) => (k.substring(prefix.length), v)}
+ }
+
+ /** Does the configuration contain a given parameter? */
+ def contains(key: String): Boolean = settings.contains(key)
+
+ /** Copy this object */
+ override def clone: SparkConf = {
+ new SparkConf(false).setAll(settings)
+ }
+
+ /**
+ * Return a string listing all keys and values, one per line. This is useful to print the
+ * configuration out for debugging.
+ */
+ def toDebugString: String = {
+ settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ad3337d94c..e80e43af6d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,36 +19,23 @@ package org.apache.spark
import java.io._
import java.net.URI
-import java.util.Properties
+import java.util.{UUID, Properties}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.Map
+import scala.collection.{Map, Set}
import scala.collection.generic.Growable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.ArrayWritable
-import org.apache.hadoop.io.BooleanWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
+FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
+TextInputFormat}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-
import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
@@ -61,53 +48,97 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
- TimeStampedHashMap, Utils}
+import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType,
+ClosureCleaner}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI.
- * @param sparkHome Location where Spark is installed on cluster nodes.
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- * system or HDFS, HTTP, HTTPS, or FTP URLs.
- * @param environment Environment variables to set on worker nodes.
+ * @param config a Spark Config object describing the application configuration. Any settings in
+ * this config overrides the default configs as well as system properties.
+ * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
+ * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
+ * from a list of input files or InputFormats for the application.
*/
class SparkContext(
- val master: String,
- val appName: String,
- val sparkHome: String = null,
- val jars: Seq[String] = Nil,
- val environment: Map[String, String] = Map(),
+ config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
- // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
- // of data-local splits on host
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map())
+ // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
+ // a map from hostname to a list of input format splits on the host.
+ val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
extends Logging {
- // Ensure logging is initialized before we spawn any threads
- initLogging()
+ /**
+ * Alternative constructor that allows setting common Spark properties directly
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
+ */
+ def this(master: String, appName: String, conf: SparkConf) =
+ this(SparkContext.updatedConf(conf, master, appName))
- // Set Spark driver host and port system properties
- if (System.getProperty("spark.driver.host") == null) {
- System.setProperty("spark.driver.host", Utils.localHostName())
+ /**
+ * Alternative constructor that allows setting common Spark properties directly
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI.
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes.
+ */
+ def this(
+ master: String,
+ appName: String,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map(),
+ preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
+ {
+ this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
+ preferredNodeLocationData)
}
- if (System.getProperty("spark.driver.port") == null) {
- System.setProperty("spark.driver.port", "0")
+
+ private[spark] val conf = config.clone()
+
+ /**
+ * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
+ * changed at runtime.
+ */
+ def getConf: SparkConf = conf.clone()
+
+ if (!conf.contains("spark.master")) {
+ throw new SparkException("A master URL must be set in your configuration")
+ }
+ if (!conf.contains("spark.app.name")) {
+ throw new SparkException("An application must be set in your configuration")
+ }
+
+ // Set Spark driver host and port system properties
+ conf.setIfMissing("spark.driver.host", Utils.localHostName())
+ conf.setIfMissing("spark.driver.port", "0")
+
+ val jars: Seq[String] = if (conf.contains("spark.jars")) {
+ conf.get("spark.jars").split(",").filter(_.size != 0)
+ } else {
+ null
}
+ val master = conf.get("spark.master")
+ val appName = conf.get("spark.app.name")
+
val isLocal = (master == "local" || master.startsWith("local["))
// Create the Spark execution environment (cache, map output tracker, etc)
- private[spark] val env = SparkEnv.createFromSystemProperties(
+ private[spark] val env = SparkEnv.create(
+ conf,
"<driver>",
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port").toInt,
- true,
- isLocal)
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port").toInt,
+ isDriver = true,
+ isLocal = isLocal)
SparkEnv.set(env)
// Used to store a URL for each static file/jar together with the file's local timestamp
@@ -116,7 +147,8 @@ class SparkContext(
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
- private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)
+ private[spark] val metadataCleaner =
+ new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
// Initialize the Spark UI
private[spark] val ui = new SparkUI(this)
@@ -126,23 +158,30 @@ class SparkContext(
// Add each JAR given through the constructor
if (jars != null) {
- jars.foreach { addJar(_) }
+ jars.foreach(addJar)
}
+ private[spark] val executorMemory = conf.getOption("spark.executor.memory")
+ .orElse(Option(System.getenv("SPARK_MEM")))
+ .map(Utils.memoryStringToMb)
+ .getOrElse(512)
+
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
- for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
- val value = System.getenv(key)
- if (value != null) {
- executorEnvs(key) = value
- }
+ for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
+ value <- Option(System.getenv(key))) {
+ executorEnvs(key) = value
}
- // Since memory can be set with a system property too, use that
- executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
- if (environment != null) {
- executorEnvs ++= environment
+ // Convert java options to env vars as a work around
+ // since we can't set env vars directly in sbt.
+ for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
+ value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
+ executorEnvs(envKey) = value
}
+ // Since memory can be set with a system property too, use that
+ executorEnvs("SPARK_MEM") = executorMemory + "m"
+ executorEnvs ++= conf.getExecutorEnv
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
@@ -164,24 +203,24 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val env = SparkEnv.get
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- Utils.getSystemProperties.foreach { case (key, value) =>
+ conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
- conf.set(key.substring("spark.hadoop.".length), value)
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
- val bufferSize = System.getProperty("spark.buffer.size", "65536")
- conf.set("io.file.buffer.size", bufferSize)
- conf
+ val bufferSize = conf.get("spark.buffer.size", "65536")
+ hadoopConf.set("io.file.buffer.size", bufferSize)
+ hadoopConf
}
private[spark] var checkpointDir: Option[String] = None
@@ -191,7 +230,7 @@ class SparkContext(
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}
- private[spark] def getLocalProperties(): Properties = localProperties.get()
+ private[spark] def getLocalProperties: Properties = localProperties.get()
private[spark] def setLocalProperties(props: Properties) {
localProperties.set(props)
@@ -522,7 +561,7 @@ class SparkContext(
addedFiles(key) = System.currentTimeMillis
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory))
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf)
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
@@ -692,15 +731,27 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
- if (sparkHome != null) {
- Some(sparkHome)
- } else if (System.getProperty("spark.home") != null) {
- Some(System.getProperty("spark.home"))
- } else if (System.getenv("SPARK_HOME") != null) {
- Some(System.getenv("SPARK_HOME"))
- } else {
- None
- }
+ conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME")))
+ }
+
+ /**
+ * Support function for API backtraces.
+ */
+ def setCallSite(site: String) {
+ setLocalProperty("externalCallSite", site)
+ }
+
+ /**
+ * Support function for API backtraces.
+ */
+ def clearCallSite() {
+ setLocalProperty("externalCallSite", null)
+ }
+
+ private[spark] def getCallSite(): String = {
+ val callSite = getLocalProperty("externalCallSite")
+ if (callSite == null) return Utils.formatSparkCallSite
+ callSite
}
/**
@@ -715,7 +766,7 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
- val callSite = Utils.formatSparkCallSite
+ val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
val start = System.nanoTime
@@ -799,7 +850,7 @@ class SparkContext(
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
- val callSite = Utils.formatSparkCallSite
+ val callSite = getCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
@@ -819,7 +870,7 @@ class SparkContext(
resultFunc: => R): SimpleFutureAction[R] =
{
val cleanF = clean(processPartition)
- val callSite = Utils.formatSparkCallSite
+ val callSite = getCallSite
val waiter = dagScheduler.submitJob(
rdd,
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
@@ -855,22 +906,15 @@ class SparkContext(
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists and useExisting is set to true, then the
- * exisiting directory will be used. Otherwise an exception will be thrown to
- * prevent accidental overriding of checkpoint files in the existing directory.
+ * be a HDFS path if running on a cluster.
*/
- def setCheckpointDir(dir: String, useExisting: Boolean = false) {
- val path = new Path(dir)
- val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- if (!useExisting) {
- if (fs.exists(path)) {
- throw new Exception("Checkpoint directory '" + path + "' already exists.")
- } else {
- fs.mkdirs(path)
- }
+ def setCheckpointDir(directory: String) {
+ checkpointDir = Option(directory).map { dir =>
+ val path = new Path(dir, UUID.randomUUID().toString)
+ val fs = path.getFileSystem(hadoopConfiguration)
+ fs.mkdirs(path)
+ fs.getFileStatus(path).getPath().toString
}
- checkpointDir = Some(dir)
}
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
@@ -994,7 +1038,7 @@ object SparkContext {
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
- * their JARs to SparkContext
+ * their JARs to SparkContext.
*/
def jarOfClass(cls: Class[_]): Seq[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
@@ -1011,21 +1055,44 @@ object SparkContext {
}
}
- /** Find the JAR that contains the class of a particular object */
+ /**
+ * Find the JAR that contains the class of a particular object, to make it easy for users
+ * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
+ * your driver program.
+ */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
- /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
- private[spark] val executorMemoryRequested = {
- // TODO: Might need to add some extra memory for the non-heap parts of the JVM
- Option(System.getProperty("spark.executor.memory"))
- .orElse(Option(System.getenv("SPARK_MEM")))
- .map(Utils.memoryStringToMb)
- .getOrElse(512)
+ /**
+ * Creates a modified version of a SparkConf with the parameters that can be passed separately
+ * to SparkContext, to make it easier to write SparkContext's constructors. This ignores
+ * parameters that are passed as the default value of null, instead of throwing an exception
+ * like SparkConf would.
+ */
+ private def updatedConf(
+ conf: SparkConf,
+ master: String,
+ appName: String,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map()): SparkConf =
+ {
+ val res = conf.clone()
+ res.setMaster(master)
+ res.setAppName(appName)
+ if (sparkHome != null) {
+ res.setSparkHome(sparkHome)
+ }
+ if (!jars.isEmpty) {
+ res.setJars(jars)
+ }
+ res.setExecutorEnv(environment.toSeq)
+ res
}
- // Creates a task scheduler based on a given master URL. Extracted for testing.
- private
- def createTaskScheduler(sc: SparkContext, master: String, appName: String): TaskScheduler = {
+ /** Creates a task scheduler based on a given master URL. Extracted for testing. */
+ private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
+ : TaskScheduler =
+ {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1071,10 +1138,10 @@ object SparkContext {
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
- if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
+ if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
- memoryPerSlaveInt, SparkContext.executorMemoryRequested))
+ memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
@@ -1132,7 +1199,7 @@ object SparkContext {
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
val scheduler = new TaskSchedulerImpl(sc)
- val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+ val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 826f5c2d8c..634a94f0a7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
-class SparkEnv (
+class SparkEnv private[spark] (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
@@ -54,7 +54,8 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
- val metricsSystem: MetricsSystem) {
+ val metricsSystem: MetricsSystem,
+ val conf: SparkConf) {
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -62,7 +63,7 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- def stop() {
+ private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
mapOutputTracker.stop()
@@ -78,6 +79,7 @@ class SparkEnv (
//actorSystem.awaitTermination()
}
+ private[spark]
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
@@ -106,33 +108,35 @@ object SparkEnv extends Logging {
/**
* Returns the ThreadLocal SparkEnv.
*/
- def getThreadLocal : SparkEnv = {
+ def getThreadLocal: SparkEnv = {
env.get()
}
- def createFromSystemProperties(
+ private[spark] def create(
+ conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port,
+ conf = conf)
// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
if (isDriver && port == 0) {
- System.setProperty("spark.driver.port", boundPort.toString)
+ conf.set("spark.driver.port", boundPort.toString)
}
// set only if unset until now.
- if (System.getProperty("spark.hostPort", null) == null) {
+ if (!conf.contains("spark.hostPort")) {
if (!isDriver){
// unexpected
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
}
Utils.checkHost(hostname)
- System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+ conf.set("spark.hostPort", hostname + ":" + boundPort)
}
val classLoader = Thread.currentThread.getContextClassLoader
@@ -140,25 +144,26 @@ object SparkEnv extends Logging {
// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
- val name = System.getProperty(propertyName, defaultClassName)
+ val name = conf.get(propertyName, defaultClassName)
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
}
val serializerManager = new SerializerManager
val serializer = serializerManager.setDefault(
- System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.get("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
val closureSerializer = serializerManager.get(
- System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
+ conf)
def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
if (isDriver) {
logInfo("Registering " + name)
Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
- val driverHost: String = System.getProperty("spark.driver.host", "localhost")
- val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+ val driverHost: String = conf.get("spark.driver.host", "localhost")
+ val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
@@ -168,21 +173,21 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
- new BlockManagerMasterActor(isLocal)))
- val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
+ new BlockManagerMasterActor(isLocal, conf)), conf)
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
val connectionManager = blockManager.connectionManager
- val broadcastManager = new BroadcastManager(isDriver)
+ val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)
// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
val mapOutputTracker = if (isDriver) {
- new MapOutputTrackerMaster()
+ new MapOutputTrackerMaster(conf)
} else {
- new MapOutputTracker()
+ new MapOutputTracker(conf)
}
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
@@ -193,12 +198,12 @@ object SparkEnv extends Logging {
val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
- System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ conf.set("spark.fileserver.uri", httpFileServer.serverUri)
val metricsSystem = if (isDriver) {
- MetricsSystem.createMetricsSystem("driver")
+ MetricsSystem.createMetricsSystem("driver", conf)
} else {
- MetricsSystem.createMetricsSystem("executor")
+ MetricsSystem.createMetricsSystem("executor", conf)
}
metricsSystem.start()
@@ -212,7 +217,7 @@ object SparkEnv extends Logging {
}
// Warn about deprecated spark.cache.class property
- if (System.getProperty("spark.cache.class") != null) {
+ if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
@@ -231,6 +236,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
- metricsSystem)
+ metricsSystem,
+ conf)
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 363667fa86..55c87450ac 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* Return an RDD with the values of each tuple.
*/
def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
+
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+ * Partitioner to partition the output RDD.
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD, partitioner)
+ }
+
+ /**
+ * Return approximate number of distinct values for each key this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+ * level.
+ */
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD)
+ }
+
+
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+ * output RDD into numPartitions.
+ *
+ */
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD, numPartitions)
+ }
}
object JavaPairRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f344804b4c..924d8af060 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
takeOrdered(num, comp)
}
+
+ /**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05.
+ */
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
+
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index acf328aa6a..e93b10fd7e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -29,17 +29,22 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import com.google.common.base.Optional
-import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, SparkContext}
+import org.apache.spark._
import org.apache.spark.SparkContext.IntAccumulatorParam
import org.apache.spark.SparkContext.DoubleAccumulatorParam
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
+import scala.Tuple2
/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and
* works with Java collections instead of Scala ones.
*/
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
+ /**
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+ */
+ def this(conf: SparkConf) = this(new SparkContext(conf))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
@@ -50,6 +55,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
+ */
+ def this(master: String, appName: String, conf: SparkConf) =
+ this(conf.setMaster(master).setAppName(appName))
+
+ /**
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jarFile JAR file to send to the cluster. This can be a path on the local file system
* or an HDFS, HTTP, HTTPS, or FTP URL.
@@ -381,20 +394,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists and useExisting is set to true, then the
- * exisiting directory will be used. Otherwise an exception will be thrown to
- * prevent accidental overriding of checkpoint files in the existing directory.
- */
- def setCheckpointDir(dir: String, useExisting: Boolean) {
- sc.setCheckpointDir(dir, useExisting)
- }
-
- /**
- * Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists, an exception will be thrown to prevent accidental
- * overriding of checkpoint files.
+ * be a HDFS path if running on a cluster.
*/
def setCheckpointDir(dir: String) {
sc.setCheckpointDir(dir)
@@ -405,10 +405,36 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}
+
+ /**
+ * Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be
+ * changed at runtime.
+ */
+ def getConf: SparkConf = sc.getConf
+
+ /**
+ * Pass-through to SparkContext.setCallSite. For API support only.
+ */
+ def setCallSite(site: String) {
+ sc.setCallSite(site)
+ }
+
+ /**
+ * Pass-through to SparkContext.setCallSite. For API support only.
+ */
+ def clearCallSite() {
+ sc.clearCallSite()
+ }
}
object JavaSparkContext {
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
+
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index ca42c76928..32cc70e8c9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.get("spark.buffer.size", "65536").toInt
override def getPartitions = parent.partitions
@@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
Utils.checkHost(serverHost, "Expected hostname")
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 43c18294c5..0fc478a419 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -31,8 +31,8 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
override def toString = "Broadcast(" + id + ")"
}
-private[spark]
-class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable {
+private[spark]
+class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
@@ -43,14 +43,14 @@ class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable
private def initialize() {
synchronized {
if (!initialized) {
- val broadcastFactoryClass = System.getProperty(
+ val broadcastFactoryClass = conf.get(
"spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
- broadcastFactory.initialize(isDriver)
+ broadcastFactory.initialize(isDriver, conf)
initialized = true
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index 68bff75b90..fb161ce69d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -17,6 +17,8 @@
package org.apache.spark.broadcast
+import org.apache.spark.SparkConf
+
/**
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
@@ -24,7 +26,7 @@ package org.apache.spark.broadcast
* entire Spark job.
*/
private[spark] trait BroadcastFactory {
- def initialize(isDriver: Boolean): Unit
+ def initialize(isDriver: Boolean, conf: SparkConf): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 47db720416..db596d5fcc 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -24,14 +24,14 @@ import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import org.apache.spark.{HttpServer, Logging, SparkEnv}
+import org.apache.spark.{SparkConf, HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils}
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
-
+
def value = value_
def blockId = BroadcastBlockId(id)
@@ -40,7 +40,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
}
- if (!isLocal) {
+ if (!isLocal) {
HttpBroadcast.write(id, value_)
}
@@ -64,7 +64,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
}
private[spark] class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isDriver: Boolean) { HttpBroadcast.initialize(isDriver) }
+ def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
@@ -81,44 +81,51 @@ private object HttpBroadcast extends Logging {
private var serverUri: String = null
private var server: HttpServer = null
+ // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
private val files = new TimeStampedHashSet[String]
- private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
+ private var cleaner: MetadataCleaner = null
- private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
+ private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
- private lazy val compressionCodec = CompressionCodec.createCodec()
+ private var compressionCodec: CompressionCodec = null
- def initialize(isDriver: Boolean) {
+ def initialize(isDriver: Boolean, conf: SparkConf) {
synchronized {
if (!initialized) {
- bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ bufferSize = conf.get("spark.buffer.size", "65536").toInt
+ compress = conf.get("spark.broadcast.compress", "true").toBoolean
if (isDriver) {
- createServer()
+ createServer(conf)
+ conf.set("spark.httpBroadcast.uri", serverUri)
}
- serverUri = System.getProperty("spark.httpBroadcast.uri")
+ serverUri = conf.get("spark.httpBroadcast.uri")
+ cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
+ compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
}
-
+
def stop() {
synchronized {
if (server != null) {
server.stop()
server = null
}
+ if (cleaner != null) {
+ cleaner.cancel()
+ cleaner = null
+ }
+ compressionCodec = null
initialized = false
- cleaner.cancel()
}
}
- private def createServer() {
- broadcastDir = Utils.createTempDir(Utils.getLocalDir)
+ private def createServer(conf: SparkConf) {
+ broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir)
server.start()
serverUri = server.uri
- System.setProperty("spark.httpBroadcast.uri", serverUri)
logInfo("Broadcast server started at " + serverUri)
}
@@ -143,7 +150,7 @@ private object HttpBroadcast extends Logging {
val in = {
val httpConnection = new URL(url).openConnection()
httpConnection.setReadTimeout(httpReadTimeout)
- val inputStream = httpConnection.getInputStream()
+ val inputStream = httpConnection.getInputStream
if (compress) {
compressionCodec.compressedInputStream(inputStream)
} else {
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 073a0a5029..9530938278 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -83,13 +83,13 @@ extends Broadcast[T](id) with Logging with Serializable {
case None =>
val start = System.nanoTime
logInfo("Started reading broadcast variable " + id)
-
+
// Initialize @transient variables that will receive garbage values from the master.
resetWorkerVariables()
if (receiveBroadcast(id)) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
-
+
// Store the merged copy in cache so that the next worker doesn't need to rebuild it.
// This creates a tradeoff between memory usage and latency.
// Storing copy doubles the memory footprint; not storing doubles deserialization cost.
@@ -122,14 +122,14 @@ extends Broadcast[T](id) with Logging with Serializable {
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(metaId) match {
- case Some(x) =>
+ case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0
-
- case None =>
+
+ case None =>
Thread.sleep(500)
}
}
@@ -145,13 +145,13 @@ extends Broadcast[T](id) with Logging with Serializable {
val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
- case Some(x) =>
+ case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, true)
-
- case None =>
+
+ case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}
@@ -166,21 +166,22 @@ private object TorrentBroadcast
extends Logging {
private var initialized = false
-
- def initialize(_isDriver: Boolean) {
+ private var conf: SparkConf = null
+ def initialize(_isDriver: Boolean, conf: SparkConf) {
+ TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
}
}
}
-
+
def stop() {
initialized = false
}
- val BLOCK_SIZE = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024
-
+ lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024
+
def blockifyObject[T](obj: T): TorrentInfo = {
val byteArray = Utils.serialize[T](obj)
val bais = new ByteArrayInputStream(byteArray)
@@ -209,7 +210,7 @@ extends Logging {
}
def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
- totalBytes: Int,
+ totalBytes: Int,
totalBlocks: Int): T = {
var retByteArray = new Array[Byte](totalBytes)
for (i <- 0 until totalBlocks) {
@@ -222,23 +223,23 @@ extends Logging {
}
private[spark] case class TorrentBlock(
- blockID: Int,
- byteArray: Array[Byte])
+ blockID: Int,
+ byteArray: Array[Byte])
extends Serializable
private[spark] case class TorrentInfo(
@transient arrayOfBlocks : Array[TorrentBlock],
- totalBlocks: Int,
- totalBytes: Int)
+ totalBlocks: Int,
+ totalBytes: Int)
extends Serializable {
-
- @transient var hasBlocks = 0
+
+ @transient var hasBlocks = 0
}
private[spark] class TorrentBroadcastFactory
extends BroadcastFactory {
-
- def initialize(isDriver: Boolean) { TorrentBroadcast.initialize(isDriver) }
+
+ def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 0aa8852649..4dfb19ed8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
if (sc != null) { sc.stop() }
- // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
+ // Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty("spark.driver.port", "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
@@ -417,4 +417,4 @@ private[spark] object Docker extends Logging {
"docker ps -l -q".!(ProcessLogger(line => id = line))
new DockerId(id)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 59d12a3e6f..ffc0cb0903 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.Utils
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import scala.collection.mutable.ArrayBuffer
@@ -43,7 +43,8 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
- val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
+ val conf = new SparkConf(false)
+ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masters = Array(masterUrl)
@@ -55,7 +56,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
workerActorSystems += workerSystem
}
- return masters
+ masters
}
def stop() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index fc1537f796..27dc42bf7e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,10 +34,10 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) {
- // if we are already running as the user intended there is no reason to do the doAs. It
+ // if we are already running as the user intended there is no reason to do the doAs. It
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
- // the user is UNKNOWN then we shouldn't be creating a remote unknown user
- // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
+ // the user is UNKNOWN then we shouldn't be creating a remote unknown user
+ // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
@@ -67,11 +67,15 @@ class SparkHadoopUtil {
}
object SparkHadoopUtil {
+
private val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ val yarnMode = java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
- Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+ .newInstance()
+ .asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 953755e40d..481026eaa2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -19,20 +19,19 @@ package org.apache.spark.deploy.client
import java.util.concurrent.TimeoutException
-import scala.concurrent.duration._
import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
-
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
@@ -43,7 +42,8 @@ private[spark] class Client(
actorSystem: ActorSystem,
masterUrls: Array[String],
appDescription: ApplicationDescription,
- listener: ClientListener)
+ listener: ClientListener,
+ conf: SparkConf)
extends Logging {
val REGISTRATION_TIMEOUT = 20.seconds
@@ -111,6 +111,12 @@ private[spark] class Client(
}
}
+ private def isPossibleMaster(remoteUrl: Address) = {
+ masterUrls.map(s => Master.toAkkaUrl(s))
+ .map(u => AddressFromURIString(u).hostPort)
+ .contains(remoteUrl.hostPort)
+ }
+
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
@@ -146,6 +152,9 @@ private[spark] class Client(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
+ case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
+ logWarning(s"Could not connect to $address: $cause")
+
case StopClient =>
markDead()
sender ! true
@@ -178,7 +187,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 5b62d3ba6c..ef649fd80c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.client
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.deploy.{Command, ApplicationDescription}
private[spark] object TestClient {
@@ -45,11 +45,13 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+ conf = new SparkConf)
val desc = new ApplicationDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
+ "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
+ "dummy-spark-home", "ignored")
val listener = new TestListener
- val client = new Client(actorSystem, Array(url), desc, listener)
+ val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd0794b8..7b696cfcca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
@@ -38,14 +38,16 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
- import context.dispatcher
+ import context.dispatcher // to use Akka's scheduler.schedule()
+
+ val conf = new SparkConf
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
- val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
- val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
+ val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt
+ val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -63,8 +65,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
- val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
- val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf)
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf)
val masterSource = new MasterSource(this)
val webUi = new MasterWebUI(this, webUiPort)
@@ -86,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -103,7 +105,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
persistenceEngine = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
- new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+ new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
@@ -113,7 +115,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
- context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
+ context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
@@ -495,7 +497,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
- workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
@@ -507,8 +509,9 @@ private[spark] object Master {
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
- val args = new MasterArguments(argStrings)
- val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+ val conf = new SparkConf
+ val args = new MasterArguments(argStrings, conf)
+ val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()
}
@@ -522,10 +525,12 @@ private[spark] object Master {
}
}
- def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
+ : (ActorSystem, Int, Int) =
+ {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 9d89b455fb..e7f3224091 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -18,16 +18,17 @@
package org.apache.spark.deploy.master
import org.apache.spark.util.{Utils, IntParam}
+import org.apache.spark.SparkConf
/**
* Command-line parser for the master.
*/
-private[spark] class MasterArguments(args: Array[String]) {
+private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
-
- // Check for settings in environment variables
+
+ // Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
@@ -37,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String]) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
- if (System.getProperty("master.ui.port") != null) {
- webUiPort = System.getProperty("master.ui.port").toInt
+ if (conf.contains("master.ui.port")) {
+ webUiPort = conf.get("master.ui.port").toInt
}
parse(args.toList)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 6cc7fd2ff4..999090ad74 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -23,7 +23,7 @@ import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.zookeeper.data.Stat
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@@ -35,8 +35,9 @@ import org.apache.spark.Logging
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g., "node already exists").
*/
-private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
- val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
+private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
+ conf: SparkConf) extends Logging {
+ val ZK_URL = conf.get("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
val ZK_TIMEOUT_MILLIS = 30000
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 7d535b08de..77c23fb9fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -21,16 +21,17 @@ import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.master.MasterMessages._
-private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
+ masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+ val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private val watcher = new ZooKeeperWatcher()
- private val zk = new SparkZooKeeperSession(this)
+ private val zk = new SparkZooKeeperSession(this, conf)
private var status = LeadershipStatus.NOT_LEADER
private var myLeaderFile: String = _
private var leaderUrl: String = _
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 825344b3bb..52000d4f9c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,19 +17,19 @@
package org.apache.spark.deploy.master
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.zookeeper._
import akka.serialization.Serialization
-class ZooKeeperPersistenceEngine(serialization: Serialization)
+class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
extends PersistenceEngine
with SparkZooKeeperWatcher
with Logging
{
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
- val zk = new SparkZooKeeperSession(this)
+ val zk = new SparkZooKeeperSession(this, conf)
zk.connect()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 9ab594b682..ead35662fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(master.conf)
val host = Utils.localHostName()
val port = requestedPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 87531b6719..fcaf4e92b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,23 +25,14 @@ import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor._
-import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -53,7 +44,8 @@ private[spark] class Worker(
cores: Int,
memory: Int,
masterUrls: Array[String],
- workDirPath: String = null)
+ workDirPath: String = null,
+ val conf: SparkConf)
extends Actor with Logging {
import context.dispatcher
@@ -63,7 +55,7 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
- val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+ val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -92,7 +84,7 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
- val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+ val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf)
val workerSource = new WorkerSource(this)
def coresFree: Int = cores - coresUsed
@@ -275,6 +267,7 @@ private[spark] class Worker(
}
private[spark] object Worker {
+
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
@@ -283,13 +276,16 @@ private[spark] object Worker {
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
- masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
- : (ActorSystem, Int) = {
+ masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
+ : (ActorSystem, Int) =
+ {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+ val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+ conf = conf)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir), name = "Worker")
+ masterUrls, workDir, conf), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 19aa800a95..c382034c99 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -22,7 +22,7 @@ import java.io.File
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
@@ -34,10 +34,10 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(worker.conf)
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
- System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+ worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
var server: Option[Server] = None
var boundPort: Option[Int] = None
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index debbdd4c44..53a2b94a52 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
@@ -98,10 +98,10 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
- indestructible = true)
+ indestructible = true, conf = new SparkConf)
// set it
val sparkHostPort = hostname + ":" + boundPort
- System.setProperty("spark.hostPort", sparkHostPort)
+// conf.set("spark.hostPort", sparkHostPort)
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0f19d7a96b..e51d274d33 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -48,8 +48,6 @@ private[spark] class Executor(
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
- initLogging()
-
// No ip or host:port - just hostname
Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
// must not have port specified.
@@ -58,16 +56,17 @@ private[spark] class Executor(
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
- // Set spark.* system properties from executor arg
- for ((key, value) <- properties) {
- System.setProperty(key, value)
- }
+ // Set spark.* properties from executor arg
+ val conf = new SparkConf(false)
+ conf.setAll(properties)
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
- if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
- System.setProperty("spark.local.dir", getYarnLocalDirs())
+ if (java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))))
+ {
+ conf.set("spark.local.dir", getYarnLocalDirs())
}
// Create our ClassLoader and set it on this thread
@@ -108,7 +107,7 @@ private[spark] class Executor(
// Initialize Spark environment (using system properties read above)
private val env = {
if (!isLocal) {
- val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0,
+ val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
@@ -142,11 +141,6 @@ private[spark] class Executor(
val tr = runningTasks.get(taskId)
if (tr != null) {
tr.kill()
- // We remove the task also in the finally block in TaskRunner.run.
- // The reason we need to remove it here is because killTask might be called before the task
- // is even launched, and never reaching that finally block. ConcurrentHashMap's remove is
- // idempotent.
- runningTasks.remove(taskId)
}
}
@@ -168,6 +162,8 @@ private[spark] class Executor(
class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {
+ object TaskKilledException extends Exception
+
@volatile private var killed = false
@volatile private var task: Task[Any] = _
@@ -201,9 +197,11 @@ private[spark] class Executor(
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
- logInfo("Executor killed task " + taskId)
- execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
- return
+ // Throw an exception rather than returning, because returning within a try{} block
+ // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
+ // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
+ // for the task.
+ throw TaskKilledException
}
attemptedTask = Some(task)
@@ -217,9 +215,7 @@ private[spark] class Executor(
// If the task has been killed, let's fail it.
if (task.killed) {
- logInfo("Executor killed task " + taskId)
- execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
- return
+ throw TaskKilledException
}
val resultSer = SparkEnv.get.serializer.newInstance()
@@ -261,6 +257,11 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}
+ case TaskKilledException => {
+ logInfo("Executor killed task " + taskId)
+ execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
+ }
+
case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
@@ -303,7 +304,7 @@ private[spark] class Executor(
* new classes defined by the REPL as the user types code
*/
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
- val classUri = System.getProperty("spark.repl.class.uri")
+ val classUri = conf.get("spark.repl.class.uri", null)
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
try {
@@ -331,12 +332,12 @@ private[spark] class Executor(
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 570a979b56..a1e98845f6 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,6 +22,7 @@ import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.apache.spark.{SparkEnv, SparkConf}
/**
@@ -37,15 +38,15 @@ trait CompressionCodec {
private[spark] object CompressionCodec {
-
- def createCodec(): CompressionCodec = {
- createCodec(System.getProperty(
+ def createCodec(conf: SparkConf): CompressionCodec = {
+ createCodec(conf, conf.get(
"spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
}
- def createCodec(codecName: String): CompressionCodec = {
- Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
- .newInstance().asInstanceOf[CompressionCodec]
+ def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
+ val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
+ .getConstructor(classOf[SparkConf])
+ ctor.newInstance(conf).asInstanceOf[CompressionCodec]
}
}
@@ -53,7 +54,7 @@ private[spark] object CompressionCodec {
/**
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
*/
-class LZFCompressionCodec extends CompressionCodec {
+class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
@@ -67,10 +68,10 @@ class LZFCompressionCodec extends CompressionCodec {
* Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by spark.io.compression.snappy.block.size.
*/
-class SnappyCompressionCodec extends CompressionCodec {
+class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
- val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+ val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index caab748d60..6f9f29969e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -26,7 +26,6 @@ import scala.util.matching.Regex
import org.apache.spark.Logging
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
- initLogging()
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index bec0c83be8..9930537b34 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
import org.apache.spark.metrics.source.Source
@@ -62,10 +62,10 @@ import org.apache.spark.metrics.source.Source
*
* [options] is the specific property of this source or sink.
*/
-private[spark] class MetricsSystem private (val instance: String) extends Logging {
- initLogging()
+private[spark] class MetricsSystem private (val instance: String,
+ conf: SparkConf) extends Logging {
- val confFile = System.getProperty("spark.metrics.conf")
+ val confFile = conf.get("spark.metrics.conf", null)
val metricsConfig = new MetricsConfig(Option(confFile))
val sinks = new mutable.ArrayBuffer[Sink]
@@ -159,5 +159,6 @@ private[spark] object MetricsSystem {
}
}
- def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+ def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem =
+ new MetricsSystem(instance, conf)
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 703bc6a9ca..46c40d0a2a 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -37,7 +37,7 @@ import scala.concurrent.duration._
import org.apache.spark.util.Utils
-private[spark] class ConnectionManager(port: Int) extends Logging {
+private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Logging {
class MessageStatus(
val message: Message,
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private val selector = SelectorProvider.provider.openSelector()
private val handleMessageExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.handler.threads.min","20").toInt,
- System.getProperty("spark.core.connection.handler.threads.max","60").toInt,
- System.getProperty("spark.core.connection.handler.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.get("spark.core.connection.handler.threads.min", "20").toInt,
+ conf.get("spark.core.connection.handler.threads.max", "60").toInt,
+ conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val handleReadWriteExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.io.threads.min","4").toInt,
- System.getProperty("spark.core.connection.io.threads.max","32").toInt,
- System.getProperty("spark.core.connection.io.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.get("spark.core.connection.io.threads.min", "4").toInt,
+ conf.get("spark.core.connection.io.threads.max", "32").toInt,
+ conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.connect.threads.min","1").toInt,
- System.getProperty("spark.core.connection.connect.threads.max","8").toInt,
- System.getProperty("spark.core.connection.connect.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.get("spark.core.connection.connect.threads.min", "1").toInt,
+ conf.get("spark.core.connection.connect.threads.max", "8").toInt,
+ conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
@@ -594,7 +594,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private[spark] object ConnectionManager {
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
+ val manager = new ConnectionManager(9999, new SparkConf)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
None
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 781715108b..1c9d6030d6 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -19,19 +19,19 @@ package org.apache.spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
+import org.apache.spark.SparkConf
private[spark] object ReceiverTest {
-
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
+ val manager = new ConnectionManager(9999, new SparkConf)
println("Started connection manager with id = " + manager.id)
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+
+ manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
- val buffer = ByteBuffer.wrap("response".getBytes())
+ val buffer = ByteBuffer.wrap("response".getBytes)
Some(Message.createBufferMessage(buffer, msg.id))
})
- Thread.currentThread.join()
+ Thread.currentThread.join()
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 777574980f..dcbd183c88 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -19,29 +19,29 @@ package org.apache.spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
+import org.apache.spark.SparkConf
private[spark] object SenderTest {
-
def main(args: Array[String]) {
-
+
if (args.length < 2) {
println("Usage: SenderTest <target host> <target port>")
System.exit(1)
}
-
+
val targetHost = args(0)
val targetPort = args(1).toInt
val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
- val manager = new ConnectionManager(0)
+ val manager = new ConnectionManager(0, new SparkConf)
println("Started connection manager with id = " + manager.id)
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+ manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
None
})
-
- val size = 100 * 1024 * 1024
+
+ val size = 100 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
@@ -50,7 +50,7 @@ private[spark] object SenderTest {
val count = 100
(0 until count).foreach(i => {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
- val startTime = System.currentTimeMillis
+ val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
case Some(response) =>
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index b1e1576dad..b729eb11c5 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -23,20 +23,20 @@ import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.util.CharsetUtil
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.network.ConnectionManagerId
import scala.collection.JavaConverters._
import org.apache.spark.storage.BlockId
-private[spark] class ShuffleCopier extends Logging {
+private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
def getBlock(host: String, port: Int, blockId: BlockId,
resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
- val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt
+ val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt
val fc = new FileClient(handler, connectTimeout)
try {
@@ -104,10 +104,10 @@ private[spark] object ShuffleCopier extends Logging {
val threads = if (args.length > 3) args(3).toInt else 10
val copiers = Executors.newFixedThreadPool(80)
- val tasks = (for (i <- Range(0, threads)) yield {
+ val tasks = (for (i <- Range(0, threads)) yield {
Executors.callable(new Runnable() {
def run() {
- val copier = new ShuffleCopier()
+ val copier = new ShuffleCopier(new SparkConf)
copier.getBlock(host, port, blockId, echoResultCollectCallBack)
}
})
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a712ef1c27..6d4f46125f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -18,12 +18,12 @@
package org.apache.spark.rdd
import java.io.IOException
-
import scala.reflect.ClassTag
-
-import org.apache.hadoop.fs.Path
import org.apache.spark._
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -34,6 +34,8 @@ private[spark]
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
+ val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
+
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
override def getPartitions: Array[Partition] = {
@@ -65,7 +67,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
- CheckpointRDD.readFromFile(file, context)
+ CheckpointRDD.readFromFile(file, broadcastedConf, context)
}
override def checkpoint() {
@@ -74,15 +76,18 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
}
private[spark] object CheckpointRDD extends Logging {
-
def splitIdToFile(splitId: Int): String = {
"part-%05d".format(splitId)
}
- def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+ def writeToFile[T](
+ path: String,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ blockSize: Int = -1
+ )(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+ val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -92,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
@@ -119,10 +124,14 @@ private[spark] object CheckpointRDD extends Logging {
}
}
- def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+ def readFromFile[T](
+ path: Path,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ context: TaskContext
+ ): Iterator[T] = {
val env = SparkEnv.get
- val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val fs = path.getFileSystem(broadcastedConf.value.value)
+ val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
@@ -144,8 +153,10 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
+ val conf = SparkHadoopUtil.get.newConfiguration()
+ val fs = path.getFileSystem(conf)
+ val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
+ sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 911a002884..4ba4696fef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.changeValue(k, update)
}
- val ser = SparkEnv.get.serializerManager.get(serializerClass)
+ val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 48168e152e..04a8d05988 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
+
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.Aggregator
import org.apache.spark.Partitioner
import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.util.SerializableHyperLogLog
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -208,6 +211,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
/**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+ * Partitioner to partition the output RDD.
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
+ val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
+ val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+
+ combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+ }
+
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+ * output RDD into numPartitions.
+ *
+ */
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Return approximate number of distinct values for each key this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+ * level.
+ */
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
+ }
+
+ /**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
new file mode 100644
index 0000000000..4c625d062e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+import java.io.{ObjectOutputStream, IOException}
+import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partition}
+
+
+/**
+ * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of corresponding partitions
+ * of parent RDDs.
+ */
+private[spark]
+class PartitionerAwareUnionRDDPartition(
+ @transient val rdds: Seq[RDD[_]],
+ val idx: Int
+ ) extends Partition {
+ var parents = rdds.map(_.partitions(idx)).toArray
+
+ override val index = idx
+ override def hashCode(): Int = idx
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ // Update the reference to parent partition at the time of task serialization
+ parents = rdds.map(_.partitions(index)).toArray
+ oos.defaultWriteObject()
+ }
+}
+
+/**
+ * Class representing an RDD that can take multiple RDDs partitioned by the same partitioner and
+ * unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each
+ * will be unified to a single RDD with p partitions and the same partitioner. The preferred
+ * location for each partition of the unified RDD will be the most common preferred location
+ * of the corresponding partitions of the parent RDDs. For example, location of partition 0
+ * of the unified RDD will be where most of partition 0 of the parent RDDs are located.
+ */
+private[spark]
+class PartitionerAwareUnionRDD[T: ClassTag](
+ sc: SparkContext,
+ var rdds: Seq[RDD[T]]
+ ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
+ require(rdds.length > 0)
+ require(rdds.flatMap(_.partitioner).toSet.size == 1,
+ "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
+
+ override val partitioner = rdds.head.partitioner
+
+ override def getPartitions: Array[Partition] = {
+ val numPartitions = partitioner.get.numPartitions
+ (0 until numPartitions).map(index => {
+ new PartitionerAwareUnionRDDPartition(rdds, index)
+ }).toArray
+ }
+
+ // Get the location where most of the partitions of parent RDDs are located
+ override def getPreferredLocations(s: Partition): Seq[String] = {
+ logDebug("Finding preferred location for " + this + ", partition " + s.index)
+ val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
+ val locations = rdds.zip(parentPartitions).flatMap {
+ case (rdd, part) => {
+ val parentLocations = currPrefLocs(rdd, part)
+ logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
+ parentLocations
+ }
+ }
+ val location = if (locations.isEmpty) {
+ None
+ } else {
+ // Find the location that maximum number of parent partitions prefer
+ Some(locations.groupBy(x => x).maxBy(_._2.length)._1)
+ }
+ logDebug("Selected location for " + this + ", partition " + s.index + " = " + location)
+ location.toSeq
+ }
+
+ override def compute(s: Partition, context: TaskContext): Iterator[T] = {
+ val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
+ rdds.zip(parentPartitions).iterator.flatMap {
+ case (rdd, p) => rdd.iterator(p, context)
+ }
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdds = null
+ }
+
+ // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+ private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = {
+ rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ea45566ad1..3f41b66279 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.Partitioner._
import org.apache.spark.api.java.JavaRDD
@@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
import org.apache.spark.SparkContext._
import org.apache.spark._
@@ -81,6 +82,7 @@ abstract class RDD[T: ClassTag](
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
+ private[spark] def conf = sc.conf
// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================
@@ -789,6 +791,19 @@ abstract class RDD[T: ClassTag](
}
/**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05.
+ */
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = {
+ val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+ aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+ }
+
+ /**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
@@ -938,7 +953,7 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE
/** Record user function generating this RDD. */
- @transient private[spark] val origin = Utils.formatSparkCallSite
+ @transient private[spark] val origin = sc.getCallSite
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 3b56e45aa9..bc688110f4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Partition, SparkException, Logging}
+import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging}
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
/**
@@ -40,7 +40,7 @@ private[spark] object CheckpointState extends Enumeration {
* manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
* of the checkpointed RDD.
*/
-private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
+private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
extends Logging with Serializable {
import CheckpointState._
@@ -85,14 +85,21 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
- val fs = path.getFileSystem(new Configuration())
+ val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// Save to file, and reload it as an RDD
- rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+ val broadcastedConf = rdd.context.broadcast(
+ new SerializableWritable(rdd.context.hadoopConfiguration))
+ rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
+ if (newRDD.partitions.size != rdd.partitions.size) {
+ throw new SparkException(
+ "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
+ "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
+ }
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
@@ -101,8 +108,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
RDDCheckpointData.clearTaskCaches()
- logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
}
+ logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}
// Get preferred location of a split after checkpointing
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index 3682c84598..0ccb309d0d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -59,7 +59,7 @@ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[P] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context,
- SparkEnv.get.serializerManager.get(serializerClass))
+ SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf))
}
override def clearDependencies() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index aab30b1bb4..4f90c7d3d6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -93,7 +93,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
- val serializer = SparkEnv.get.serializerManager.get(serializerClass)
+ val serializer = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
val map = new JHashMap[K, ArrayBuffer[V]]
def getSeq(k: K): ArrayBuffer[V] = {
val seq = map.get(k)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7603eb292f..043e01dbfb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -159,7 +159,8 @@ class DAGScheduler(
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+ val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
/**
* Starts the event processing actor. The actor has two responsibilities:
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 1791ee660d..90eb8a747f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
/**
* Parses and holds information about inputFormat (and files) specified as a parameter.
*/
-class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
+class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
val path: String) extends Logging {
var mapreduceInputFormat: Boolean = false
@@ -40,7 +40,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
validate()
- override def toString(): String = {
+ override def toString: String = {
"InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
}
@@ -125,7 +125,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
}
private def findPreferredLocations(): Set[SplitInfo] = {
- logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
+ logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
", inputFormatClazz : " + inputFormatClazz)
if (mapreduceInputFormat) {
return prefLocsFromMapreduceInputFormat()
@@ -143,14 +143,14 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
object InputFormatInfo {
/**
Computes the preferred locations based on input(s) and returned a location to block map.
- Typical use of this method for allocation would follow some algo like this
- (which is what we currently do in YARN branch) :
+ Typical use of this method for allocation would follow some algo like this:
+
a) For each host, count number of splits hosted on that host.
b) Decrement the currently allocated containers on that host.
c) Compute rack info for each host and update rack -> count map based on (b).
d) Allocate nodes based on (c)
- e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
- (even if data locality on that is very high) : this is to prevent fragility of job if a single
+ e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
+ (even if data locality on that is very high) : this is to prevent fragility of job if a single
(or small set of) hosts go down.
go to (a) until required nodes are allocated.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 310ec62ca8..28f3ba53b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -32,7 +32,9 @@ private[spark] object ResultTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues)
+ // TODO: This object shouldn't have global variables
+ val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 356fe56bf3..3cf995ea74 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import scala.xml.XML
@@ -49,10 +49,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
}
}
-private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+ val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 89aa098664..02bdbba825 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -31,7 +31,4 @@ private[spark] trait SchedulerBackend {
def defaultParallelism(): Int
def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
-
- // Memory used by each executor (in megabytes)
- protected val executorMemory: Int = SparkContext.executorMemoryRequested
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 0f2deb4bcb..a37ead5632 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -37,7 +37,9 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues)
+ // TODO: This object shouldn't have global variables
+ val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues, new SparkConf)
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
@@ -152,7 +154,7 @@ private[spark] class ShuffleMapTask(
try {
// Obtain all the block writers for shuffle blocks.
- val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
+ val ser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
// Write the map output to its associated buckets.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 89102720fa..e22b1e53e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -30,7 +30,8 @@ import org.apache.spark.util.Utils
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends Logging {
- private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
+
+ private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
THREADS, "Result resolver thread")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1b0f82fa24..0c8ed62759 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
* It can also work with a local setup by using a LocalBackend and setting isLocal to true.
* It handles common logic, like determining a scheduling order across jobs, waking up to launch
* speculative tasks, etc.
- *
+ *
* Clients should first call initialize() and start(), then submit task sets through the
* runTasks method.
*
@@ -47,15 +47,19 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
- val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+ val maxTaskFailures: Int,
isLocal: Boolean = false)
- extends TaskScheduler with Logging {
+ extends TaskScheduler with Logging
+{
+ def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt)
+
+ val conf = sc.conf
// How often to check for speculative tasks
- val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+ val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
+ val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong
// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
@@ -92,7 +96,7 @@ private[spark] class TaskSchedulerImpl(
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.scheduler.mode", "FIFO"))
+ conf.get("spark.scheduler.mode", "FIFO"))
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -110,7 +114,7 @@ private[spark] class TaskSchedulerImpl(
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool)
+ new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
@@ -121,7 +125,7 @@ private[spark] class TaskSchedulerImpl(
override def start() {
backend.start()
- if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
+ if (!isLocal && conf.get("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
@@ -281,7 +285,8 @@ private[spark] class TaskSchedulerImpl(
}
}
case None =>
- logInfo("Ignoring update from TID " + tid + " because its task set is gone")
+ logInfo("Ignoring update with state %s from TID %s because its task set is gone"
+ .format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
@@ -324,7 +329,7 @@ private[spark] class TaskSchedulerImpl(
// Have each task set throw a SparkException with the error
for ((taskSetId, manager) <- activeTaskSets) {
try {
- manager.error(message)
+ manager.abort(message)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 7929051791..6dd1469d8f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -54,12 +54,14 @@ private[spark] class TaskSetManager(
clock: Clock = SystemClock)
extends Schedulable with Logging
{
+ val conf = sched.sc.conf
+
// CPUs to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+ val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt
// Quantile of tasks at which to start speculation
- val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
- val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+ val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble
+ val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
@@ -114,7 +116,7 @@ private[spark] class TaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
- System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+ conf.get("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
@@ -546,11 +548,6 @@ private[spark] class TaskSetManager(
}
}
- def error(message: String) {
- // Save the error message
- abort("Error: " + message)
- }
-
def abort(message: String) {
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.dagScheduler.taskSetFailed(taskSet, message)
@@ -676,14 +673,14 @@ private[spark] class TaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = System.getProperty("spark.locality.wait", "3000")
+ val defaultWait = conf.get("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
- System.getProperty("spark.locality.wait.process", defaultWait).toLong
+ conf.get("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
- System.getProperty("spark.locality.wait.node", defaultWait).toLong
+ conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
- System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+ conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5c534a6f43..2f5bcafe40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -48,8 +48,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
-
- private val timeout = AkkaUtils.askTimeout
+ val conf = scheduler.sc.conf
+ private val timeout = AkkaUtils.askTimeout(conf)
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -63,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
- val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+ val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong
import context.dispatcher
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
@@ -119,7 +119,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
removeExecutor(executorId, reason)
sender ! true
- case DisassociatedEvent(_, address, _) =>
+ case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
}
@@ -164,14 +164,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
override def start() {
val properties = new ArrayBuffer[(String, String)]
- val iterator = System.getProperties.entrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
properties += ((key, value))
}
}
+ //TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
@@ -210,8 +208,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
driverActor ! KillTask(taskId, executorId)
}
- override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
- .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
+ override def defaultParallelism(): Int = {
+ conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse(
+ math.max(totalCoreCount.get(), 2))
+ }
// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index ec3e68e970..b44d1e43c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -33,13 +33,13 @@ private[spark] class SimrSchedulerBackend(
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
- val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
+ val maxCores = conf.get("spark.simr.executor.cores", "1").toInt
override def start() {
super.start()
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val conf = new Configuration()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 404ce7a452..9858717d13 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -38,23 +38,23 @@ private[spark] class SparkDeploySchedulerBackend(
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
- val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
override def start() {
super.start()
// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)
- val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
+ val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
- client = new Client(sc.env.actorSystem, masters, appDesc, this)
+ client = new Client(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 39573fc8c9..d46fceba89 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -62,7 +62,7 @@ private[spark] class CoarseMesosSchedulerBackend(
var driver: SchedulerDriver = null
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
@@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
+ val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt
var nextMesosTaskId = 0
@@ -122,12 +122,12 @@ private[spark] class CoarseMesosSchedulerBackend(
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val uri = System.getProperty("spark.executor.uri")
+ val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
- val runScript = new File(sparkHome, "spark-class").getCanonicalPath
+ val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
@@ -136,7 +136,7 @@ private[spark] class CoarseMesosSchedulerBackend(
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
- "cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
+ "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
@@ -177,7 +177,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+ if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
@@ -193,7 +193,7 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", executorMemory))
+ .addResources(createResource("mem", sc.executorMemory))
.build()
d.launchTasks(offer.getId, Collections.singletonList(task), filters)
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 6aa788c460..ae8d527352 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -100,20 +100,20 @@ private[spark] class MesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val uri = System.getProperty("spark.executor.uri")
+ val uri = sc.conf.get("spark.executor.uri", null)
if (uri == null) {
- command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+ command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./spark-executor".format(basename))
+ command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
+ .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -198,7 +198,7 @@ private[spark] class MesosSchedulerBackend(
def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
val slaveId = o.getSlaveId.getValue
- mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
}
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
@@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend(
}
// TODO: query Mesos for number of cores
- override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
+ override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 4edc6a0d3f..897d47a9ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -47,7 +47,8 @@ private[spark] class LocalActor(
private val localExecutorId = "localhost"
private val localExecutorHostname = "localhost"
- val executor = new Executor(localExecutorId, localExecutorHostname, Seq.empty, isLocal = true)
+ val executor = new Executor(
+ localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
def receive = {
case ReviveOffers =>
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 4de81617b1..5d3d43623d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -21,6 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.SparkConf
private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
@@ -77,6 +78,6 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
/**
* A Spark serializer that uses Java's built-in serialization.
*/
-class JavaSerializer extends Serializer {
+class JavaSerializer(conf: SparkConf) extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index e748c2275d..a24a3b04b8 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -25,18 +25,18 @@ import com.esotericsoftware.kryo.{KryoException, Kryo}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
-import org.apache.spark.{SerializableWritable, Logging}
+import org.apache.spark._
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
+import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/
-class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
-
+class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
private val bufferSize = {
- System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+ conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
}
def newKryoOutput() = new KryoOutput(bufferSize)
@@ -48,7 +48,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Do this before we invoke the user registrator so the user registrator can override this.
- kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
+ kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean)
for (cls <- KryoSerializer.toRegister) kryo.register(cls)
@@ -58,13 +58,13 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
// Allow the user to register their own classes by setting spark.kryo.registrator
try {
- Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+ for (regCls <- conf.getOption("spark.kryo.registrator")) {
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
} catch {
- case _: Exception => println("Failed to register spark.kryo.registrator")
+ case e: Exception => logError("Failed to run spark.kryo.registrator", e)
}
// Register Chill's classes; we do this after our ranges and the user's own classes to let
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 160cca4d6c..9a5e3cb77e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -29,6 +29,9 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream}
* A serializer. Because some serialization libraries are not thread safe, this class is used to
* create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are
* guaranteed to only be called from one thread at a time.
+ *
+ * Implementations of this trait should have a zero-arg constructor or a constructor that accepts a
+ * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence.
*/
trait Serializer {
def newInstance(): SerializerInstance
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 2955986fec..36a37af4f8 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -18,6 +18,7 @@
package org.apache.spark.serializer
import java.util.concurrent.ConcurrentHashMap
+import org.apache.spark.SparkConf
/**
@@ -26,18 +27,19 @@ import java.util.concurrent.ConcurrentHashMap
* creating a new one.
*/
private[spark] class SerializerManager {
+ // TODO: Consider moving this into SparkConf itself to remove the global singleton.
private val serializers = new ConcurrentHashMap[String, Serializer]
private var _default: Serializer = _
def default = _default
- def setDefault(clsName: String): Serializer = {
- _default = get(clsName)
+ def setDefault(clsName: String, conf: SparkConf): Serializer = {
+ _default = get(clsName, conf)
_default
}
- def get(clsName: String): Serializer = {
+ def get(clsName: String, conf: SparkConf): Serializer = {
if (clsName == null) {
default
} else {
@@ -51,8 +53,19 @@ private[spark] class SerializerManager {
serializer = serializers.get(clsName)
if (serializer == null) {
val clsLoader = Thread.currentThread.getContextClassLoader
- serializer =
- Class.forName(clsName, true, clsLoader).newInstance().asInstanceOf[Serializer]
+ val cls = Class.forName(clsName, true, clsLoader)
+
+ // First try with the constructor that takes SparkConf. If we can't find one,
+ // use a no-arg constructor instead.
+ try {
+ val constructor = cls.getConstructor(classOf[SparkConf])
+ serializer = constructor.newInstance(conf).asInstanceOf[Serializer]
+ } catch {
+ case _: NoSuchMethodException =>
+ val constructor = cls.getConstructor()
+ serializer = constructor.newInstance().asInstanceOf[Serializer]
+ }
+
serializers.put(clsName, serializer)
}
serializer
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index e51c5b30a3..47478631a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -312,7 +312,7 @@ object BlockFetcherIterator {
logDebug("Sending request for %d blocks (%s) from %s".format(
req.blocks.size, Utils.bytesToString(req.size), req.address.host))
val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
- val cpier = new ShuffleCopier
+ val cpier = new ShuffleCopier(blockManager.conf)
cpier.getBlocks(cmId, req.blocks, putResult)
logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
}
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
fetchRequestsSync.put(request)
}
- copiers = startCopiers(System.getProperty("spark.shuffle.copier.threads", "6").toInt)
+ copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt)
logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
Utils.getUsedTimeMs(startTime))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 19a025a329..6d2cda97b0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -30,7 +30,7 @@ import scala.concurrent.duration._
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
-import org.apache.spark.{Logging, SparkEnv, SparkException}
+import org.apache.spark.{SparkConf, Logging, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
@@ -43,12 +43,13 @@ private[spark] class BlockManager(
actorSystem: ActorSystem,
val master: BlockManagerMaster,
val defaultSerializer: Serializer,
- maxMemory: Long)
+ maxMemory: Long,
+ val conf: SparkConf)
extends Logging {
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
@@ -57,12 +58,12 @@ private[spark] class BlockManager(
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
- val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
- val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
+ val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean
+ val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
}
- val connectionManager = new ConnectionManager(0)
+ val connectionManager = new ConnectionManager(0, conf)
implicit val futureExecContext = connectionManager.futureExecContext
val blockManagerId = BlockManagerId(
@@ -71,18 +72,18 @@ private[spark] class BlockManager(
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
val maxBytesInFlight =
- System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
// Whether to compress broadcast variables that are stored
- val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean
// Whether to compress shuffle output that are stored
- val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
+ val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
- val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+ val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean
- val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+ val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
- val hostPort = Utils.localHostPort()
+ val hostPort = Utils.localHostPort(conf)
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
@@ -100,8 +101,11 @@ private[spark] class BlockManager(
var heartBeatTask: Cancellable = null
- private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks)
- private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks)
+ private val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
+ private val broadcastCleaner = new MetadataCleaner(
+ MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+
initialize()
// The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -109,14 +113,14 @@ private[spark] class BlockManager(
// program could be using a user-defined codec in a third party jar, which is loaded in
// Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
// loaded yet.
- private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
+ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
- serializer: Serializer) = {
- this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ serializer: Serializer, conf: SparkConf) = {
+ this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf)
}
/**
@@ -126,7 +130,7 @@ private[spark] class BlockManager(
private def initialize() {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
- if (!BlockManager.getDisableHeartBeatsForTesting) {
+ if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
heartBeat()
}
@@ -439,7 +443,7 @@ private[spark] class BlockManager(
: BlockFetcherIterator = {
val iter =
- if (System.getProperty("spark.shuffle.use.netty", "false").toBoolean) {
+ if (conf.get("spark.shuffle.use.netty", "false").toBoolean) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -465,7 +469,8 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
- new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
+ val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean
+ new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
}
/**
@@ -856,19 +861,18 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
-
val ID_GENERATOR = new IdGenerator
- def getMaxMemoryFromSystemProperties: Long = {
- val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
+ def getMaxMemory(conf: SparkConf): Long = {
+ val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
- def getHeartBeatFrequencyFromSystemProperties: Long =
- System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+ def getHeartBeatFrequency(conf: SparkConf): Long =
+ conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
- def getDisableHeartBeatsForTesting: Boolean =
- System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+ def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
+ conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e1d68ef592..b5afe8cd23 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -23,19 +23,20 @@ import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
import akka.pattern.ask
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkConf, Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
-class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
+class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
+ conf: SparkConf) extends Logging {
- val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
- val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
+ val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 21022e1cfb..58452d9657 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -27,7 +27,7 @@ import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkConf, Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -36,7 +36,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* all slaves' block managers.
*/
private[spark]
-class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Actor with Logging {
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo =
@@ -48,20 +48,18 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- private val akkaTimeout = AkkaUtils.askTimeout
+ private val akkaTimeout = AkkaUtils.askTimeout(conf)
- initLogging()
+ val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs",
+ "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
- val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
- "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
-
- val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ val checkTimeoutInterval = conf.get("spark.storage.blockManagerTimeoutIntervalMs",
"60000").toLong
var timeoutCheckingTask: Cancellable = null
override def preStart() {
- if (!BlockManager.getDisableHeartBeatsForTesting) {
+ if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 0c66addf9d..21f003609b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -30,7 +30,6 @@ import org.apache.spark.util.Utils
* TODO: Use event model.
*/
private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
- initLogging()
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
@@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
private[spark] object BlockManagerWorker extends Logging {
private var blockManagerWorker: BlockManagerWorker = null
- initLogging()
-
def startBlockManagerWorker(manager: BlockManager) {
blockManagerWorker = new BlockManagerWorker(manager)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 6ce9127c74..a06f50a0ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
def length = blockMessages.length
- initLogging()
-
def set(bufferMessage: BufferMessage) {
val startTime = System.currentTimeMillis
val newBlockMessages = new ArrayBuffer[BlockMessage]()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index b4451fc7b8..61e63c60d5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -74,7 +74,8 @@ class DiskBlockObjectWriter(
file: File,
serializer: Serializer,
bufferSize: Int,
- compressStream: OutputStream => OutputStream)
+ compressStream: OutputStream => OutputStream,
+ syncWrites: Boolean)
extends BlockObjectWriter(blockId)
with Logging
{
@@ -97,8 +98,6 @@ class DiskBlockObjectWriter(
override def flush() = out.flush()
}
- private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
-
/** The file channel, used for repositioning / truncating the file. */
private var channel: FileChannel = null
private var bs: OutputStream = null
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index fcd2e97982..55dcb3742c 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
- private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
+ private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 212ef6506f..39dc7bb19a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -59,12 +59,14 @@ private[spark] trait ShuffleWriterGroup {
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
+ def conf = blockManager.conf
+
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
// TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles =
- System.getProperty("spark.shuffle.consolidateFiles", "false").toBoolean
+ conf.get("spark.shuffle.consolidateFiles", "false").toBoolean
- private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+ private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024
/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
@@ -85,8 +87,8 @@ class ShuffleBlockManager(blockManager: BlockManager) {
type ShuffleId = Int
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
- private
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
+ private val metadataCleaner =
+ new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup {
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
index d52b3d8284..40734aab49 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -56,7 +56,7 @@ object StoragePerfTester {
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
- new KryoSerializer())
+ new KryoSerializer(sc.conf))
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index a8db37ded1..dca98c6c05 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -22,6 +22,7 @@ import akka.actor._
import java.util.concurrent.ArrayBlockingQueue
import util.Random
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.{SparkConf, SparkContext}
/**
* This class tests the BlockManager and MemoryStore for thread safety and
@@ -91,11 +92,12 @@ private[spark] object ThreadingTest {
def main(args: Array[String]) {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
- val serializer = new KryoSerializer
+ val conf = new SparkConf()
+ val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
val blockManager = new BlockManager(
- "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
+ "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index f1d86c0221..50dfdbdf5a 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext) extends Logging {
val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
- val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
+ val port = sc.conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
var boundPort: Option[Int] = None
var server: Option[Server] = None
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index fcd1b518d0..6ba15187d9 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui
import scala.util.Random
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.scheduler.SchedulingMode
@@ -27,25 +27,26 @@ import org.apache.spark.scheduler.SchedulingMode
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
- * Usage: ./run spark.ui.UIWorkloadGenerator [master]
+ * Usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]
*/
private[spark] object UIWorkloadGenerator {
+
val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 5000
def main(args: Array[String]) {
if (args.length < 2) {
- println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+ println("usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
- val master = args(0)
- val schedulingMode = SchedulingMode.withName(args(1))
- val appName = "Spark UI Tester"
+ val conf = new SparkConf().setMaster(args(0)).setAppName("Spark UI tester")
+
+ val schedulingMode = SchedulingMode.withName(args(1))
if (schedulingMode == SchedulingMode.FAIR) {
- System.setProperty("spark.scheduler.mode", "FAIR")
+ conf.set("spark.scheduler.mode", "FAIR")
}
- val sc = new SparkContext(master, appName)
+ val sc = new SparkContext(conf)
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
@@ -55,11 +56,11 @@ private[spark] object UIWorkloadGenerator {
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
- def nextFloat() = (new Random()).nextFloat()
+ def nextFloat() = new Random().nextFloat()
val jobs = Seq[(String, () => Long)](
("Count", baseData.count),
- ("Cache and Count", baseData.map(x => x).cache.count),
+ ("Cache and Count", baseData.map(x => x).cache().count),
("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
("Entirely failed phase", baseData.map(x => throw new Exception).count),
("Partially failed phase", {
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index c5bf2acc9e..88f41be8d3 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -48,12 +48,15 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
def jvmTable =
UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation, fixedWidth = true)
- val properties = System.getProperties.iterator.toSeq
- val classPathProperty = properties.find { case (k, v) =>
- k.contains("java.class.path")
+ val sparkProperties = sc.conf.getAll.sorted
+
+ val systemProperties = System.getProperties.iterator.toSeq
+ val classPathProperty = systemProperties.find { case (k, v) =>
+ k == "java.class.path"
}.getOrElse(("", ""))
- val sparkProperties = properties.filter(_._1.startsWith("spark")).sorted
- val otherProperties = properties.diff(sparkProperties :+ classPathProperty).sorted
+ val otherProperties = systemProperties.filter { case (k, v) =>
+ k != "java.class.path" && !k.startsWith("spark.")
+ }.sorted
val propertyHeaders = Seq("Name", "Value")
def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
@@ -63,7 +66,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
val classPathEntries = classPathProperty._2
- .split(System.getProperty("path.separator", ":"))
+ .split(sc.conf.get("path.separator", ":"))
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 058bc2a2e5..b7b87250b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
*/
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
- val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+ val RETAINED_STAGES = sc.conf.get("spark.ui.retained_stages", "1000").toInt
val DEFAULT_POOL_NAME = "default"
val stageIdToPool = new HashMap[Int, String]()
@@ -106,7 +106,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[StageInfo]())
stages += stage
}
-
+
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val sid = taskStart.task.stageId
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 1c8b51b8bc..7df7e3d8e5 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -21,6 +21,9 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.SparkConf
/**
* Various utility classes for working with Akka.
@@ -37,22 +40,29 @@ private[spark] object AkkaUtils {
* If indestructible is set to true, the Actor System will continue running in the event
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
- def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
- : (ActorSystem, Int) = {
+ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
+ conf: SparkConf): (ActorSystem, Int) = {
+
+ val akkaThreads = conf.get("spark.akka.threads", "4").toInt
+ val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt
- val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
- val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
+ val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "100").toInt
+ val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
+ val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
+ val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
+ if (!akkaLogLifecycleEvents) {
+ // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
+ // See: https://www.assembla.com/spaces/akka/tickets/3787#/
+ Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
+ }
- val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
- val lifecycleEvents =
- if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+ val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
- val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "600").toInt
+ val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
- System.getProperty("spark.akka.failure-detector.threshold", "300.0").toDouble
- val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "1000").toInt
+ conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
+ val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
val akkaConf = ConfigFactory.parseString(
s"""
@@ -72,7 +82,10 @@ private[spark] object AkkaUtils {
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
+ |akka.log-config-on-start = $logAkkaConfig
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+ |akka.log-dead-letters = $lifecycleEvents
+ |akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin)
val actorSystem = if (indestructible) {
@@ -87,7 +100,7 @@ private[spark] object AkkaUtils {
}
/** Returns the default Spark timeout to use for Akka ask operations. */
- def askTimeout: FiniteDuration = {
- Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
+ def askTimeout(conf: SparkConf): FiniteDuration = {
+ Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index fe56960cbf..aa7f52cafb 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -18,16 +18,21 @@
package org.apache.spark.util
import java.util.{TimerTask, Timer}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, SparkContext, Logging}
/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
-class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
+class MetadataCleaner(
+ cleanerType: MetadataCleanerType.MetadataCleanerType,
+ cleanupFunc: (Long) => Unit,
+ conf: SparkConf)
+ extends Logging
+{
val name = cleanerType.toString
- private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType)
+ private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)
@@ -65,22 +70,28 @@ object MetadataCleanerType extends Enumeration {
def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
}
+// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
+// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
object MetadataCleaner {
+ def getDelaySeconds(conf: SparkConf) = {
+ conf.get("spark.cleaner.ttl", "3500").toInt
+ }
- // using only sys props for now : so that workers can also get to it while preserving earlier behavior.
- def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
-
- def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
- System.getProperty(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
+ def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
+ {
+ conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
+ .toInt
}
- def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
- System.setProperty(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
+ def setDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType,
+ delay: Int)
+ {
+ conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}
- def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
+ def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) {
// override for all ?
- System.setProperty("spark.cleaner.ttl", delay.toString)
+ conf.set("spark.cleaner.ttl", delay.toString)
if (resetAll) {
for (cleanerType <- MetadataCleanerType.values) {
System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
new file mode 100644
index 0000000000..8b4e7c104c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{Externalizable, ObjectOutput, ObjectInput}
+import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
+
+/**
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
+ */
+private[spark]
+class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
+
+ def this() = this(null) // For deserialization
+
+ def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
+
+ def add[T](elem: T) = {
+ this.value.offer(elem)
+ this
+ }
+
+ def readExternal(in: ObjectInput) {
+ val byteLength = in.readInt()
+ val bytes = new Array[Byte](byteLength)
+ in.readFully(bytes)
+ value = HyperLogLog.Builder.build(bytes)
+ }
+
+ def writeExternal(out: ObjectOutput) {
+ val bytes = value.getBytes()
+ out.writeInt(bytes.length)
+ out.write(bytes)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index a25b37a2a9..bddb3bb735 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -30,10 +30,10 @@ import java.lang.management.ManagementFactory
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-import org.apache.spark.Logging
+import org.apache.spark.{SparkEnv, SparkConf, SparkContext, Logging}
/**
- * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
+ * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
* memory-aware caches.
*
* Based on the following JavaWorld article:
@@ -89,9 +89,11 @@ private[spark] object SizeEstimator extends Logging {
classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
}
- private def getIsCompressedOops : Boolean = {
+ private def getIsCompressedOops: Boolean = {
+ // This is only used by tests to override the detection of compressed oops. The test
+ // actually uses a system property instead of a SparkConf, so we'll stick with that.
if (System.getProperty("spark.test.useCompressedOops") != null) {
- return System.getProperty("spark.test.useCompressedOops").toBoolean
+ return System.getProperty("spark.test.useCompressedOops").toBoolean
}
try {
@@ -103,7 +105,7 @@ private[spark] object SizeEstimator extends Logging {
val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
Class.forName("java.lang.String"))
- val bean = ManagementFactory.newPlatformMXBeanProxy(server,
+ val bean = ManagementFactory.newPlatformMXBeanProxy(server,
hotSpotMBeanName, hotSpotMBeanClass)
// TODO: We could use reflection on the VMOption returned ?
return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
@@ -251,7 +253,7 @@ private[spark] object SizeEstimator extends Logging {
if (info != null) {
return info
}
-
+
val parent = getClassInfo(cls.getSuperclass)
var shellSize = parent.shellSize
var pointerFields = parent.pointerFields
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3f7858d2de..5f1253100b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -36,14 +36,13 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
/**
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
-
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -239,9 +238,9 @@ private[spark] object Utils extends Logging {
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
- def fetchFile(url: String, targetDir: File) {
+ def fetchFile(url: String, targetDir: File, conf: SparkConf) {
val filename = url.split("/").last
- val tempDir = getLocalDir
+ val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
@@ -311,8 +310,8 @@ private[spark] object Utils extends Logging {
* return a single directory, even though the spark.local.dir property might be a list of
* multiple paths.
*/
- def getLocalDir: String = {
- System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+ def getLocalDir(conf: SparkConf): String = {
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
}
/**
@@ -397,13 +396,12 @@ private[spark] object Utils extends Logging {
InetAddress.getByName(address).getHostName
}
- def localHostPort(): String = {
- val retval = System.getProperty("spark.hostPort", null)
+ def localHostPort(conf: SparkConf): String = {
+ val retval = conf.get("spark.hostPort", null)
if (retval == null) {
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
return localHostName()
}
-
retval
}
@@ -415,9 +413,12 @@ private[spark] object Utils extends Logging {
assert(hostPort.indexOf(':') != -1, message)
}
- // Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) {
- try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
+ try {
+ throw new Exception
+ } catch {
+ case ex: Exception => logError(msg, ex)
+ }
}
// Typically, this will be of order of number of nodes in cluster
@@ -837,7 +838,7 @@ private[spark] object Utils extends Logging {
}
}
- /**
+ /**
* Timing method based on iterations that permit JVM JIT optimization.
* @param numIters number of iterations
* @param f function to be executed