From 8c82f43db393a460fcefdc274d5e0f8c9d46543e Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 4 Oct 2012 22:59:36 -0700 Subject: Scaladoc documentation for some core Spark functionality --- core/src/main/scala/spark/Logging.scala | 24 ++--- core/src/main/scala/spark/RDD.scala | 80 +++++++++++----- core/src/main/scala/spark/SparkContext.scala | 131 ++++++++++++++++++++------- core/src/main/scala/spark/package.scala | 15 +++ 4 files changed, 180 insertions(+), 70 deletions(-) create mode 100644 core/src/main/scala/spark/package.scala (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/Logging.scala b/core/src/main/scala/spark/Logging.scala index 69935b86de..90bae26202 100644 --- a/core/src/main/scala/spark/Logging.scala +++ b/core/src/main/scala/spark/Logging.scala @@ -15,7 +15,7 @@ trait Logging { private var log_ : Logger = null // Method to get or create the logger for this object - def log: Logger = { + protected def log: Logger = { if (log_ == null) { var className = this.getClass.getName // Ignore trailing $'s in the class names for Scala objects @@ -28,48 +28,48 @@ trait Logging { } // Log methods that take only a String - def logInfo(msg: => String) { + protected def logInfo(msg: => String) { if (log.isInfoEnabled) log.info(msg) } - def logDebug(msg: => String) { + protected def logDebug(msg: => String) { if (log.isDebugEnabled) log.debug(msg) } - def logTrace(msg: => String) { + protected def logTrace(msg: => String) { if (log.isTraceEnabled) log.trace(msg) } - def logWarning(msg: => String) { + protected def logWarning(msg: => String) { if (log.isWarnEnabled) log.warn(msg) } - def logError(msg: => String) { + protected def logError(msg: => String) { if (log.isErrorEnabled) log.error(msg) } // Log methods that take Throwables (Exceptions/Errors) too - def logInfo(msg: => String, throwable: Throwable) { + protected def logInfo(msg: => String, throwable: Throwable) { if (log.isInfoEnabled) log.info(msg, throwable) } - def logDebug(msg: => String, throwable: Throwable) { + protected def logDebug(msg: => String, throwable: Throwable) { if (log.isDebugEnabled) log.debug(msg, throwable) } - def logTrace(msg: => String, throwable: Throwable) { + protected def logTrace(msg: => String, throwable: Throwable) { if (log.isTraceEnabled) log.trace(msg, throwable) } - def logWarning(msg: => String, throwable: Throwable) { + protected def logWarning(msg: => String, throwable: Throwable) { if (log.isWarnEnabled) log.warn(msg, throwable) } - def logError(msg: => String, throwable: Throwable) { + protected def logError(msg: => String, throwable: Throwable) { if (log.isErrorEnabled) log.error(msg, throwable) } // Method for ensuring that logging is initialized, to avoid having multiple // threads do it concurrently (as SLF4J initialization is not thread safe). - def initLogging() { log } + protected def initLogging() { log } } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 35d70b1393..3244753bfe 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -37,50 +37,69 @@ import SparkContext._ /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, - * partitioned collection of elements that can be operated on in parallel. + * partitioned collection of elements that can be operated on in parallel. This class contains the + * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, + * [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such + * as `groupByKey` and `join`; [[spark.DoubleRDDFunctions]] contains operations available only on + * RDDs of Doubles; and [[spark.SequenceFileRDDFunctions]] contains operations available on RDDs + * that can be saved as SequenceFiles. These operations are automatically available on any RDD of + * the right type (e.g. RDD[(Int, Int)] through implicit conversions when you + * `import spark.SparkContext._`. * - * Each RDD is characterized by five main properties: - * - A list of splits (partitions) - * - A function for computing each split - * - A list of dependencies on other RDDs - * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) - * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for - * HDFS) + * Internally, each RDD is characterized by five main properties: * - * All the scheduling and execution in Spark is done based on these methods, allowing each RDD to - * implement its own way of computing itself. + * - A list of splits (partitions) + * - A function for computing each split + * - A list of dependencies on other RDDs + * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) + * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for + * an HDFS file) * - * This class also contains transformation methods available on all RDDs (e.g. map and filter). In - * addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, and - * SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles. + * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD + * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for + * reading data from a new storage system) by overriding these functions. Please refer to the + * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details + * on RDD internals. */ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable { - // Methods that must be implemented by subclasses + // Methods that must be implemented by subclasses: + + /** Set of partitions in this RDD. */ def splits: Array[Split] + + /** Function for computing a given partition. */ def compute(split: Split): Iterator[T] + + /** How this RDD depends on any parent RDDs. */ @transient val dependencies: List[Dependency[_]] + + // Methods available on all RDDs: - // Record user function generating this RDD - val origin = Utils.getSparkCallSite + /** Record user function generating this RDD. */ + private[spark] val origin = Utils.getSparkCallSite - // Optionally overridden by subclasses to specify how they are partitioned + /** Optionally overridden by subclasses to specify how they are partitioned. */ val partitioner: Option[Partitioner] = None - // Optionally overridden by subclasses to specify placement preferences + /** Optionally overridden by subclasses to specify placement preferences. */ def preferredLocations(split: Split): Seq[String] = Nil + /** The [[spark.SparkContext]] that this RDD was created on. */ def context = sc - def elementClassManifest: ClassManifest[T] = classManifest[T] + private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] - // Get a unique ID for this RDD + /** A unique ID for this RDD (within its SparkContext). */ val id = sc.newRddId() // Variables relating to persistence private var storageLevel: StorageLevel = StorageLevel.NONE - // Change this RDD's storage level + /** + * Set this RDD's storage level to persist its values across operations after the first time + * it is computed. Can only be called once on each RDD. + */ def persist(newLevel: StorageLevel): RDD[T] = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { @@ -91,12 +110,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial this } - // Turn on the default caching level for this RDD + /** Persist this RDD with the default storage level (MEMORY_ONLY). */ def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY) - // Turn on the default caching level for this RDD + /** Persist this RDD with the default storage level (MEMORY_ONLY). */ def cache(): RDD[T] = persist() + /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel private[spark] def checkpoint(level: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): RDD[T] = { @@ -118,7 +138,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } } - // Read this RDD; will read from cache if applicable, or otherwise compute + /** + * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. + * This should ''not'' be called by users directly, but is available for implementors of custom + * subclasses of RDD. + */ final def iterator(split: Split): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel) @@ -175,8 +199,16 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial Utils.randomizeInPlace(samples, rand).take(total) } + /** + * Return the union of this RDD and another one. Any identical elements will appear multiple + * times (use `.distinct()` to eliminate them). + */ def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other)) + /** + * Return the union of this RDD and another one. Any identical elements will appear multiple + * times (use `.distinct()` to eliminate them). + */ def ++(other: RDD[T]): RDD[T] = this.union(other) def glom(): RDD[Array[T]] = new GlommedRDD(this) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 83c1b49203..37ba308546 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -49,14 +49,20 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.BlockManagerMaster -class SparkContext( - master: String, - frameworkName: String, - val sparkHome: String, - val jars: Seq[String]) +/** + * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark + * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. + * + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI + * @param sparkHome Location where Spark is instaled on cluster nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + */ +class SparkContext(master: String, jobName: String, val sparkHome: String, val jars: Seq[String]) extends Logging { - def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil) + def this(master: String, jobName: String) = this(master, jobName, null, Nil) // Ensure logging is initialized before we spawn any threads initLogging() @@ -72,7 +78,7 @@ class SparkContext( private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) - val env = SparkEnv.createFromSystemProperties( + private[spark] val env = SparkEnv.createFromSystemProperties( System.getProperty("spark.master.host"), System.getProperty("spark.master.port").toInt, true, @@ -80,8 +86,8 @@ class SparkContext( SparkEnv.set(env) // Used to store a URL for each static file/jar together with the file's local timestamp - val addedFiles = HashMap[String, Long]() - val addedJars = HashMap[String, Long]() + private[spark] val addedFiles = HashMap[String, Long]() + private[spark] val addedJars = HashMap[String, Long]() // Add each JAR given through the constructor jars.foreach { addJar(_) } @@ -109,7 +115,7 @@ class SparkContext( case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName) scheduler.initialize(backend) scheduler @@ -128,7 +134,7 @@ class SparkContext( val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val sparkUrl = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() @@ -140,9 +146,9 @@ class SparkContext( val scheduler = new ClusterScheduler(this) val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, this, master, frameworkName) + new CoarseMesosSchedulerBackend(scheduler, this, master, jobName) } else { - new MesosSchedulerBackend(scheduler, this, master, frameworkName) + new MesosSchedulerBackend(scheduler, this, master, jobName) } scheduler.initialize(backend) scheduler @@ -154,14 +160,20 @@ class SparkContext( // Methods for creating RDDs - def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { + /** Distribute a local Scala collection to form an RDD. */ + def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollection[T](this, seq, numSlices) } - def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { + /** Distribute a local Scala collection to form an RDD. */ + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { parallelize(seq, numSlices) } + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) @@ -199,7 +211,11 @@ class SparkContext( /** * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, - * values and the InputFormat so that users don't need to pass them directly. + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) + * }}} */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) @@ -211,6 +227,14 @@ class SparkContext( minSplits) } + /** + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) + * }}} + */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = hadoopFile[K, V, F](path, defaultMinSplits) @@ -254,7 +278,7 @@ class SparkContext( new NewHadoopRDD(this, fClass, kClass, vClass, conf) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -264,12 +288,17 @@ class SparkContext( hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) } + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = sequenceFile(path, keyClass, valueClass, defaultMinSplits) /** * Version of sequenceFile() for types implicitly convertible to Writables through a - * WritableConverter. + * WritableConverter. For example, to access a SequenceFile where the keys are Text and the + * values are IntWritable, you could simply write + * {{{ + * sparkContext.sequenceFile[String, Int](path, ...) + * }}} * * WritableConverters are provided in a somewhat strange way (by an implicit function) to support * both subclasses of Writable and types for which we define a converter (e.g. Int to @@ -310,17 +339,21 @@ class SparkContext( /** Build the union of a list of RDDs. */ def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) - /** Build the union of a list of RDDs. */ + /** Build the union of a list of RDDs passed as variable-length arguments. */ def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] = new UnionRDD(this, Seq(first) ++ rest) // Methods for creating shared variables + /** + * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values + * to using the `+=` method. Only the master can access the accumulator's `value`. + */ def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) /** - * Create an accumulable shared variable, with a `+=` method + * Create an [[spark.Accumulable]] shared variable, with a `+=` method * @tparam T accumulator type * @tparam R type that can be added to the accumulator */ @@ -338,10 +371,17 @@ class SparkContext( new Accumulable(initialValue, param) } - // Keep around a weak hash map of values to Cached versions? - def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal) + /** + * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for + * reading it in distributed functions. The variable will be sent to each cluster only once. + */ + def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal) - // Adds a file dependency to all Tasks executed in the future. + /** + * Add a file to be downloaded into the working directory of this Spark job on every node. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ def addFile(path: String) { val uri = new URI(path) val key = uri.getScheme match { @@ -357,12 +397,20 @@ class SparkContext( logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) } + /** + * Clear the job's list of files added by `addFile` so that they do not get donwloaded to + * any new nodes. + */ def clearFiles() { addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedFiles.clear() } - // Adds a jar dependency to all Tasks executed in the future. + /** + * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ def addJar(path: String) { val uri = new URI(path) val key = uri.getScheme match { @@ -373,12 +421,16 @@ class SparkContext( logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } + /** + * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to + * any new nodes. + */ def clearJars() { addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedJars.clear() } - // Stop the SparkContext + /** Shut down the SparkContext. */ def stop() { dagScheduler.stop() dagScheduler = null @@ -393,10 +445,12 @@ class SparkContext( logInfo("Successfully stopped SparkContext") } - // Get Spark's home location from either a value set through the constructor, - // or the spark.home Java property, or the SPARK_HOME environment variable - // (in that order of preference). If neither of these is set, return None. - def getSparkHome(): Option[String] = { + /** + * Get Spark's home location from either a value set through the constructor, + * or the spark.home Java property, or the SPARK_HOME environment variable + * (in that order of preference). If neither of these is set, return None. + */ + private[spark] def getSparkHome(): Option[String] = { if (sparkHome != null) { Some(sparkHome) } else if (System.getProperty("spark.home") != null) { @@ -428,6 +482,10 @@ class SparkContext( result } + /** + * Run a job on a given set of partitions of an RDD, but take a function of type + * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. + */ def runJob[T, U: ClassManifest]( rdd: RDD[T], func: Iterator[T] => U, @@ -444,6 +502,9 @@ class SparkContext( runJob(rdd, func, 0 until rdd.splits.size, false) } + /** + * Run a job on all partitions in an RDD and return the results in an array. + */ def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.splits.size, false) } @@ -465,17 +526,19 @@ class SparkContext( result } - // Clean a closure to make it ready to serialized and send to tasks - // (removes unreferenced variables in $outer's, updates REPL variables) + /** + * Clean a closure to make it ready to serialized and send to tasks + * (removes unreferenced variables in $outer's, updates REPL variables) + */ private[spark] def clean[F <: AnyRef](f: F): F = { ClosureCleaner.clean(f) return f } - // Default level of parallelism to use when not given by user (e.g. for reduce tasks) + /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ def defaultParallelism: Int = taskScheduler.defaultParallelism - // Default min number of splits for Hadoop RDDs when not given by user + /** Default min number of splits for Hadoop RDDs when not given by user */ def defaultMinSplits: Int = math.min(defaultParallelism, 2) private var nextShuffleId = new AtomicInteger(0) @@ -486,7 +549,7 @@ class SparkContext( private var nextRddId = new AtomicInteger(0) - // Register a new RDD, returning its RDD ID + /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = { nextRddId.getAndIncrement() } diff --git a/core/src/main/scala/spark/package.scala b/core/src/main/scala/spark/package.scala new file mode 100644 index 0000000000..389ec4da3e --- /dev/null +++ b/core/src/main/scala/spark/package.scala @@ -0,0 +1,15 @@ +/** + * Core Spark functionality. [[spark.SparkContext]] serves as the main entry point to Spark, while + * [[spark.RDD]] is the data type representing a distributed collection, and provides most + * parallel operations. + * + * In addition, [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value + * pairs, such as `groupByKey` and `join`; [[spark.DoubleRDDFunctions]] contains operations + * available only on RDDs of Doubles; and [[spark.SequenceFileRDDFunctions]] contains operations + * available on RDDs that can be saved as SequenceFiles. These operations are automatically + * available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when + * you `import spark.SparkContext._`. + */ +package object spark { + // For package docs only +} -- cgit v1.2.3