aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/SparkContext.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/SparkContext.scala')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala337
1 files changed, 262 insertions, 75 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1d5131ad13..0d37075ef3 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -2,13 +2,15 @@ package spark
import java.io._
import java.util.concurrent.atomic.AtomicInteger
+import java.net.{URI, URLClassLoader}
+
+import scala.collection.Map
+import scala.collection.generic.Growable
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import akka.actor.Actor
import akka.actor.Actor._
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -25,34 +27,59 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.TextInputFormat
-
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
-
import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
-
+import spark.deploy.LocalSparkCluster
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
-
+import spark.rdd.HadoopRDD
+import spark.rdd.NewHadoopRDD
+import spark.rdd.UnionRDD
import spark.scheduler.ShuffleMapTask
import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.BlockManagerMaster
+/**
+ * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
+ * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param jobName A name for your job, to display on the cluster web UI.
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes.
+ */
class SparkContext(
val master: String,
- val frameworkName: String,
+ val jobName: String,
val sparkHome: String,
- val jars: Seq[String])
+ val jars: Seq[String],
+ environment: Map[String, String])
extends Logging {
-
- def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil)
+
+ /**
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param jobName A name for your job, to display on the cluster web UI
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ */
+ def this(master: String, jobName: String, sparkHome: String, jars: Seq[String]) =
+ this(master, jobName, sparkHome, jars, Map())
+
+ /**
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param jobName A name for your job, to display on the cluster web UI
+ */
+ def this(master: String, jobName: String) = this(master, jobName, null, Nil, Map())
// Ensure logging is initialized before we spawn any threads
initLogging()
@@ -68,46 +95,89 @@ class SparkContext(
private val isLocal = (master == "local" || master.startsWith("local["))
// Create the Spark execution environment (cache, map output tracker, etc)
- val env = SparkEnv.createFromSystemProperties(
+ private[spark] val env = SparkEnv.createFromSystemProperties(
System.getProperty("spark.master.host"),
System.getProperty("spark.master.port").toInt,
true,
isLocal)
SparkEnv.set(env)
+ // Used to store a URL for each static file/jar together with the file's local timestamp
+ private[spark] val addedFiles = HashMap[String, Long]()
+ private[spark] val addedJars = HashMap[String, Long]()
+
+ // Add each JAR given through the constructor
+ jars.foreach { addJar(_) }
+
+ // Environment variables to pass to our executors
+ private[spark] val executorEnvs = HashMap[String, String]()
+ for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
+ "SPARK_TESTING")) {
+ val value = System.getenv(key)
+ if (value != null) {
+ executorEnvs(key) = value
+ }
+ }
+ executorEnvs ++= environment
+
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
- val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
+ val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
+ // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
+ val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
master match {
- case "local" =>
- new LocalScheduler(1, 0)
+ case "local" =>
+ new LocalScheduler(1, 0, this)
- case LOCAL_N_REGEX(threads) =>
- new LocalScheduler(threads.toInt, 0)
+ case LOCAL_N_REGEX(threads) =>
+ new LocalScheduler(threads.toInt, 0, this)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
- new LocalScheduler(threads.toInt, maxFailures.toInt)
+ new LocalScheduler(threads.toInt, maxFailures.toInt, this)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
- val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName)
scheduler.initialize(backend)
scheduler
+ case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
+ // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
+ val memoryPerSlaveInt = memoryPerSlave.toInt
+ val sparkMemEnv = System.getenv("SPARK_MEM")
+ val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
+ if (sparkMemEnvInt > memoryPerSlaveInt) {
+ throw new SparkException(
+ "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
+ memoryPerSlaveInt, sparkMemEnvInt))
+ }
+
+ val scheduler = new ClusterScheduler(this)
+ val localCluster = new LocalSparkCluster(
+ numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
+ val sparkUrl = localCluster.start()
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName)
+ scheduler.initialize(backend)
+ backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
+ localCluster.stop()
+ }
+ scheduler
+
case _ =>
MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(this)
val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+ val masterWithoutProtocol = master.replaceFirst("^mesos://", "") // Strip initial mesos://
val backend = if (coarseGrained) {
- new CoarseMesosSchedulerBackend(scheduler, this, master, frameworkName)
+ new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName)
} else {
- new MesosSchedulerBackend(scheduler, this, master, frameworkName)
+ new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName)
}
scheduler.initialize(backend)
scheduler
@@ -119,14 +189,20 @@ class SparkContext(
// Methods for creating RDDs
- def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
+ /** Distribute a local Scala collection to form an RDD. */
+ def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollection[T](this, seq, numSlices)
}
-
- def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
+
+ /** Distribute a local Scala collection to form an RDD. */
+ def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}
+ /**
+ * Read a text file from HDFS, a local file system (available on all nodes), or any
+ * Hadoop-supported file system URI, and return it as an RDD of Strings.
+ */
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
.map(pair => pair._2.toString)
@@ -163,19 +239,31 @@ class SparkContext(
}
/**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
- * values and the InputFormat so that users don't need to pass them directly.
+ * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * values and the InputFormat so that users don't need to pass them directly. Instead, callers
+ * can just write, for example,
+ * {{{
+ * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
+ * }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
: RDD[(K, V)] = {
hadoopFile(path,
- fm.erasure.asInstanceOf[Class[F]],
+ fm.erasure.asInstanceOf[Class[F]],
km.erasure.asInstanceOf[Class[K]],
vm.erasure.asInstanceOf[Class[V]],
minSplits)
}
+ /**
+ * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * values and the InputFormat so that users don't need to pass them directly. Instead, callers
+ * can just write, for example,
+ * {{{
+ * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
+ * }}}
+ */
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
@@ -191,7 +279,7 @@ class SparkContext(
new Configuration)
}
- /**
+ /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@@ -207,7 +295,7 @@ class SparkContext(
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
}
- /**
+ /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@@ -219,7 +307,7 @@ class SparkContext(
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
}
- /** Get an RDD for a Hadoop SequenceFile with given key and value types */
+ /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
@@ -229,18 +317,23 @@ class SparkContext(
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
}
+ /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
/**
- * Version of sequenceFile() for types implicitly convertible to Writables through a
- * WritableConverter.
+ * Version of sequenceFile() for types implicitly convertible to Writables through a
+ * WritableConverter. For example, to access a SequenceFile where the keys are Text and the
+ * values are IntWritable, you could simply write
+ * {{{
+ * sparkContext.sequenceFile[String, Int](path, ...)
+ * }}}
*
* WritableConverters are provided in a somewhat strange way (by an implicit function) to support
- * both subclasses of Writable and types for which we define a converter (e.g. Int to
+ * both subclasses of Writable and types for which we define a converter (e.g. Int to
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
- * have a parameterized singleton object). We use functions instead to create a new converter
+ * have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
@@ -265,7 +358,7 @@ class SparkContext(
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T: ClassManifest](
- path: String,
+ path: String,
minSplits: Int = defaultMinSplits
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
@@ -275,43 +368,128 @@ class SparkContext(
/** Build the union of a list of RDDs. */
def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
- /** Build the union of a list of RDDs. */
+ /** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)
// Methods for creating shared variables
+ /**
+ * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
+ * to using the `+=` method. Only the master can access the accumulator's `value`.
+ */
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
/**
- * Create an accumulable shared variable, with a `+=` method
+ * Create an [[spark.Accumulable]] shared variable, with a `+=` method
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) =
new Accumulable(initialValue, param)
+ /**
+ * Create an accumulator from a "mutable collection" type.
+ *
+ * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
+ * standard mutable collections. So you can use this with mutable Map, Set, etc.
+ */
+ def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
+ val param = new GrowableAccumulableParam[R,T]
+ new Accumulable(initialValue, param)
+ }
+
+ /**
+ * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
+ * reading it in distributed functions. The variable will be sent to each cluster only once.
+ */
+ def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal)
+
+ /**
+ * Add a file to be downloaded into the working directory of this Spark job on every node.
+ * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
+ * filesystems), or an HTTP, HTTPS or FTP URI.
+ */
+ def addFile(path: String) {
+ val uri = new URI(path)
+ val key = uri.getScheme match {
+ case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
+ case _ => path
+ }
+ addedFiles(key) = System.currentTimeMillis
- // Keep around a weak hash map of values to Cached versions?
- def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal)
+ // Fetch the file locally in case the task is executed locally
+ val filename = new File(path.split("/").last)
+ Utils.fetchFile(path, new File("."))
- // Stop the SparkContext
+ logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
+ }
+
+ /**
+ * Return a map from the slave to the max memory available for caching and the remaining
+ * memory available for caching.
+ */
+ def getSlavesMemoryStatus: Map[String, (Long, Long)] = {
+ env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
+ (blockManagerId.ip + ":" + blockManagerId.port, mem)
+ }
+ }
+
+ /**
+ * Clear the job's list of files added by `addFile` so that they do not get donwloaded to
+ * any new nodes.
+ */
+ def clearFiles() {
+ addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
+ addedFiles.clear()
+ }
+
+ /**
+ * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
+ * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
+ * filesystems), or an HTTP, HTTPS or FTP URI.
+ */
+ def addJar(path: String) {
+ val uri = new URI(path)
+ val key = uri.getScheme match {
+ case null | "file" => env.httpFileServer.addJar(new File(uri.getPath))
+ case _ => path
+ }
+ addedJars(key) = System.currentTimeMillis
+ logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
+ }
+
+ /**
+ * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
+ * any new nodes.
+ */
+ def clearJars() {
+ addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
+ addedJars.clear()
+ }
+
+ /** Shut down the SparkContext. */
def stop() {
dagScheduler.stop()
dagScheduler = null
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
+ // Clean up locally linked files
+ clearFiles()
+ clearJars()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
logInfo("Successfully stopped SparkContext")
}
- // Get Spark's home location from either a value set through the constructor,
- // or the spark.home Java property, or the SPARK_HOME environment variable
- // (in that order of preference). If neither of these is set, return None.
- def getSparkHome(): Option[String] = {
+ /**
+ * Get Spark's home location from either a value set through the constructor,
+ * or the spark.home Java property, or the SPARK_HOME environment variable
+ * (in that order of preference). If neither of these is set, return None.
+ */
+ private[spark] def getSparkHome(): Option[String] = {
if (sparkHome != null) {
Some(sparkHome)
} else if (System.getProperty("spark.home") != null) {
@@ -326,7 +504,7 @@ class SparkContext(
/**
* Run a function on a given set of partitions in an RDD and return the results. This is the main
* entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies
- * whether the scheduler can run the computation on the master rather than shipping it out to the
+ * whether the scheduler can run the computation on the master rather than shipping it out to the
* cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
@@ -335,22 +513,27 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
- logInfo("Starting job...")
+ val callSite = Utils.getSparkCallSite
+ logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runJob(rdd, func, partitions, allowLocal)
- logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
+ val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal)
+ logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
+ /**
+ * Run a job on a given set of partitions of an RDD, but take a function of type
+ * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
+ */
def runJob[T, U: ClassManifest](
rdd: RDD[T],
- func: Iterator[T] => U,
+ func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
}
-
+
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
@@ -358,6 +541,9 @@ class SparkContext(
runJob(rdd, func, 0 until rdd.splits.size, false)
}
+ /**
+ * Run a job on all partitions in an RDD and return the results in an array.
+ */
def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.splits.size, false)
}
@@ -371,38 +557,37 @@ class SparkContext(
evaluator: ApproximateEvaluator[U, R],
timeout: Long
): PartialResult[R] = {
- logInfo("Starting job...")
+ val callSite = Utils.getSparkCallSite
+ logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, timeout)
- logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
+ val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout)
+ logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
- // Clean a closure to make it ready to serialized and send to tasks
- // (removes unreferenced variables in $outer's, updates REPL variables)
+ /**
+ * Clean a closure to make it ready to serialized and send to tasks
+ * (removes unreferenced variables in $outer's, updates REPL variables)
+ */
private[spark] def clean[F <: AnyRef](f: F): F = {
ClosureCleaner.clean(f)
return f
}
- // Default level of parallelism to use when not given by user (e.g. for reduce tasks)
+ /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */
def defaultParallelism: Int = taskScheduler.defaultParallelism
- // Default min number of splits for Hadoop RDDs when not given by user
+ /** Default min number of splits for Hadoop RDDs when not given by user */
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
private var nextShuffleId = new AtomicInteger(0)
- private[spark] def newShuffleId(): Int = {
- nextShuffleId.getAndIncrement()
- }
-
+ private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
+
private var nextRddId = new AtomicInteger(0)
- // Register a new RDD, returning its RDD ID
- private[spark] def newRddId(): Int = {
- nextRddId.getAndIncrement()
- }
+ /** Register a new RDD, returning its RDD ID */
+ private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
}
/**
@@ -429,7 +614,7 @@ object SparkContext {
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
-
+
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
@@ -450,7 +635,7 @@ object SparkContext {
implicit def longToLongWritable(l: Long) = new LongWritable(l)
implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
-
+
implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
@@ -461,7 +646,7 @@ object SparkContext {
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
-
+
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
}
@@ -489,8 +674,10 @@ object SparkContext {
implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
- // Find the JAR from which a given class was loaded, to make it easy for users to pass
- // their JARs to SparkContext
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext
+ */
def jarOfClass(cls: Class[_]): Seq[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
if (uri != null) {
@@ -505,8 +692,8 @@ object SparkContext {
Nil
}
}
-
- // Find the JAR that contains the class of a particular object
+
+ /** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
}
@@ -518,7 +705,7 @@ object SparkContext {
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
-class WritableConverter[T](
+private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable