From 969644df8eec13f4b89597a0682c8037949d855b Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 14 Jul 2011 12:40:56 -0400 Subject: Cleaned up a few issues to do with default parallelism levels. Also renamed HadoopFileWriter to HadoopWriter (since it's not only for files) and fixed a bug for lookup(). --- core/src/main/scala/spark/DAGScheduler.scala | 8 +- core/src/main/scala/spark/HadoopFileWriter.scala | 171 --------------------- core/src/main/scala/spark/HadoopRDD.scala | 5 +- core/src/main/scala/spark/LocalScheduler.scala | 2 +- core/src/main/scala/spark/MesosScheduler.scala | 4 +- core/src/main/scala/spark/PairRDDFunctions.scala | 43 ++++-- core/src/main/scala/spark/RDD.scala | 22 +-- core/src/main/scala/spark/Scheduler.scala | 17 +- .../scala/spark/SequenceFileRDDFunctions.scala | 18 --- core/src/main/scala/spark/SparkContext.scala | 99 ++++++------ 10 files changed, 114 insertions(+), 275 deletions(-) delete mode 100644 core/src/main/scala/spark/HadoopFileWriter.scala (limited to 'core') diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index ecdce037e9..42bb3c2a75 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -145,7 +145,8 @@ private trait DAGScheduler extends Scheduler with Logging { missing.toList } - override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]) + override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], allowLocal: Boolean) (implicit m: ClassManifest[U]) : Array[U] = { val outputParts = partitions.toArray @@ -167,8 +168,9 @@ private trait DAGScheduler extends Scheduler with Logging { logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) - // Optimization for first() and take() if the RDD has no shuffle dependencies - if (finalStage.parents.size == 0 && numOutputParts == 1) { + // Optimization for short actions like first() and take() that can be computed locally + // without shipping tasks to the cluster. + if (allowLocal && finalStage.parents.size == 0 && numOutputParts == 1) { logInfo("Computing the requested partition locally") val split = finalRdd.splits(outputParts(0)) val taskContext = new TaskContext(finalStage.id, outputParts(0), 0) diff --git a/core/src/main/scala/spark/HadoopFileWriter.scala b/core/src/main/scala/spark/HadoopFileWriter.scala deleted file mode 100644 index 596b309d34..0000000000 --- a/core/src/main/scala/spark/HadoopFileWriter.scala +++ /dev/null @@ -1,171 +0,0 @@ -package org.apache.hadoop.mapred - -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path -import org.apache.hadoop.util.ReflectionUtils -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.Text - -import java.text.SimpleDateFormat -import java.text.NumberFormat -import java.io.IOException -import java.net.URI -import java.util.Date - -import spark.SerializableWritable -import spark.Logging - -/** - * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should - * also contain an output key class, an output value class, a filename to write to, etc - * exactly like in a Hadoop job. - */ -@serializable -class HadoopFileWriter (@transient jobConf: JobConf) extends Logging { - private val now = new Date() - private val conf = new SerializableWritable(jobConf) - - private var jobID = 0 - private var splitID = 0 - private var attemptID = 0 - private var jID: SerializableWritable[JobID] = null - private var taID: SerializableWritable[TaskAttemptID] = null - - @transient private var writer: RecordWriter[AnyRef,AnyRef] = null - @transient private var format: OutputFormat[AnyRef,AnyRef] = null - @transient private var committer: OutputCommitter = null - @transient private var jobContext: JobContext = null - @transient private var taskContext: TaskAttemptContext = null - - def preSetup() { - setIDs(0, 0, 0) - setConfParams() - - val jCtxt = getJobContext() - getOutputCommitter().setupJob(jCtxt) - } - - - def setup(jobid: Int, splitid: Int, attemptid: Int) { - setIDs(jobid, splitid, attemptid) - setConfParams() - } - - def open() { - val numfmt = NumberFormat.getInstance() - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val outputName = "part-" + numfmt.format(splitID) - val path = FileOutputFormat.getOutputPath(conf.value) - val fs: FileSystem = { - if (path != null) - path.getFileSystem(conf.value) - else - FileSystem.get(conf.value) - } - - getOutputCommitter().setupTask(getTaskContext()) - writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) - } - - def write(key: AnyRef, value: AnyRef) { - if (writer!=null) { - //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")") - writer.write(key, value) - } else - throw new IOException("Writer is null, open() has not been called") - } - - def close() { - writer.close(Reporter.NULL) - } - - def commit(): Boolean = { - var result = false - val taCtxt = getTaskContext() - val cmtr = getOutputCommitter() - if (cmtr.needsTaskCommit(taCtxt)) { - try { - cmtr.commitTask(taCtxt) - logInfo (taID + ": Committed") - result = true - } catch { - case e:IOException => { - logError ("Error committing the output of task: " + taID.value) - e.printStackTrace() - cmtr.abortTask(taCtxt) - } - } - return result - } - logWarning ("No need to commit output of task: " + taID.value) - return true - } - - def cleanup() { - getOutputCommitter().cleanupJob(getJobContext()) - } - - // ********* Private Functions ********* - - private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = { - if (format == null) - format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]] - return format - } - - private def getOutputCommitter(): OutputCommitter = { - if (committer == null) - committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter] - return committer - } - - private def getJobContext(): JobContext = { - if (jobContext == null) - jobContext = new JobContext(conf.value, jID.value) - return jobContext - } - - private def getTaskContext(): TaskAttemptContext = { - if (taskContext == null) - taskContext = new TaskAttemptContext(conf.value, taID.value) - return taskContext - } - - private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { - jobID = jobid - splitID = splitid - attemptID = attemptid - - jID = new SerializableWritable[JobID](HadoopFileWriter.createJobID(now, jobid)) - taID = new SerializableWritable[TaskAttemptID] (new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) - } - - private def setConfParams() { - conf.value.set("mapred.job.id", jID.value.toString); - conf.value.set("mapred.tip.id", taID.value.getTaskID.toString); - conf.value.set("mapred.task.id", taID.value.toString); - conf.value.setBoolean("mapred.task.is.map", true); - conf.value.setInt("mapred.task.partition", splitID); - } -} - -object HadoopFileWriter { - def createJobID(time: Date, id: Int): JobID = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) - return new JobID(jobtrackerID, id) - } - - def createPathFromString(path: String, conf: JobConf): Path = { - if (path == null) - throw new IllegalArgumentException("Output path is null") - var outputPath = new Path(path) - val fs = outputPath.getFileSystem(conf) - if (outputPath == null || fs == null) - throw new IllegalArgumentException("Incorrectly formatted output path") - outputPath = outputPath.makeQualified(fs) - return outputPath - } -} diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index b71c3aa522..5d8a2d0e35 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -34,13 +34,14 @@ class HadoopRDD[K, V]( @transient conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], - valueClass: Class[V]) + valueClass: Class[V], + minSplits: Int) extends RDD[(K, V)](sc) { val serializableConf = new SerializableWritable(conf) @transient val splits_ : Array[Split] = { val inputFormat = createInputFormat(conf) - val inputSplits = inputFormat.getSplits(conf, sc.numCores) + val inputSplits = inputFormat.getSplits(conf, minSplits) val array = new Array[Split] (inputSplits.size) for (i <- 0 until inputSplits.size) array(i) = new HadoopSplit(id, i, inputSplits(i)) diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 1f5b6f62c5..e43516c84b 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -53,5 +53,5 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { override def stop() {} - override def numCores() = threads + override def defaultParallelism() = threads } diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index d635e95dba..9776963c5f 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -241,8 +241,8 @@ extends MScheduler with DAGScheduler with Logging } // TODO: query Mesos for number of cores - override def numCores() = - System.getProperty("spark.default.parallelism", "2").toInt + override def defaultParallelism() = + System.getProperty("spark.default.parallelism", "8").toInt // Create a server for all the JARs added by the user to SparkContext. // We first copy the JARs to a temp directory for easier server setup. diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index f24b999e2a..260c31cb5e 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -18,7 +18,7 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileOutputCommitter import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.HadoopFileWriter +import org.apache.hadoop.mapred.HadoopWriter import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCommitter import org.apache.hadoop.mapred.OutputFormat @@ -127,30 +127,30 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, numCores) + combineByKey(createCombiner, mergeValue, mergeCombiners, defaultParallelism) } def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { - reduceByKey(func, numCores) + reduceByKey(func, defaultParallelism) } def groupByKey(): RDD[(K, Seq[V])] = { - groupByKey(numCores) + groupByKey(defaultParallelism) } def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { - join(other, numCores) + join(other, defaultParallelism) } def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { - leftOuterJoin(other, numCores) + leftOuterJoin(other, defaultParallelism) } def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { - rightOuterJoin(other, numCores) + rightOuterJoin(other, defaultParallelism) } - def numCores = self.context.numCores + def defaultParallelism = self.context.defaultParallelism def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) @@ -167,7 +167,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { val part = self.partitioner match { case Some(p) => p - case None => new HashPartitioner(numCores) + case None => new HashPartitioner(defaultParallelism) } new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map { case (k, Seq(vs, ws)) => @@ -179,7 +179,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { val part = self.partitioner match { case Some(p) => p - case None => new HashPartitioner(numCores) + case None => new HashPartitioner(defaultParallelism) } new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], @@ -191,6 +191,23 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex } } + def lookup(key: K): Seq[V] = { + self.partitioner match { + case Some(p) => + val index = p.getPartition(key) + def process(it: Iterator[(K, V)]): Seq[V] = { + val buf = new ArrayBuffer[V] + for ((k, v) <- it if k == key) + buf += v + buf + } + val res = self.context.runJob(self, process _, Array(index), false) + res(0) + case None => + throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") + } + } + def saveAsHadoopFile [F <: OutputFormat[K, V]] (path: String) (implicit fm: ClassManifest[F]) { saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } @@ -204,7 +221,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex conf.setOutputValueClass(valueClass) conf.setOutputFormat(outputFormatClass) conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(conf, HadoopFileWriter.createPathFromString(path, conf)) + FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) saveAsHadoopDataset(conf) } @@ -221,10 +238,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") - val writer = new HadoopFileWriter(conf) + val writer = new HadoopWriter(conf) writer.preSetup() - def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = { + def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopWriter = { writer.setup(context.stageId, context.splitId, context.attemptId) writer.open() diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 2cddb5db9e..3f07b95191 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,17 +12,17 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Map import scala.collection.mutable.HashMap +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.FileOutputCommitter +import org.apache.hadoop.mapred.HadoopWriter import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.HadoopFileWriter +import org.apache.hadoop.mapred.OutputCommitter import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.hadoop.mapred.OutputCommitter -import org.apache.hadoop.mapred.FileOutputCommitter -import org.apache.hadoop.io.Writable -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.TextOutputFormat import SparkContext._ @@ -107,7 +107,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { } def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = - groupBy[K](f, sc.numCores) + groupBy[K](f, sc.defaultParallelism) def pipe(command: String): RDD[String] = new PipedRDD(this, command) @@ -175,7 +175,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { var p = 0 while (buf.size < num && p < splits.size) { val left = num - buf.size - val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p)) + val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true) buf ++= res(0) if (buf.size == num) return buf.toArray @@ -184,7 +184,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { return buf.toArray } - def first: T = take(1) match { + def first(): T = take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala index 832d8b5705..df86db64a6 100644 --- a/core/src/main/scala/spark/Scheduler.scala +++ b/core/src/main/scala/spark/Scheduler.scala @@ -7,17 +7,14 @@ private trait Scheduler { // Wait for registration with Mesos. def waitForRegister() - // Run a function on some partitions of an RDD, returning an array of results. - def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) - (implicit m: ClassManifest[U]): Array[U] = - runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions) - - // Run a function on some partitions of an RDD, returning an array of results. - def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]) - (implicit m: ClassManifest[U]): Array[U] + // Run a function on some partitions of an RDD, returning an array of results. The allowLocal flag specifies + // whether the scheduler is allowed to run the job on the master machine rather than shipping it to the cluster, + // for actions that create short jobs such as first() and take(). + def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], allowLocal: Boolean): Array[U] def stop() - // Get the number of cores in the cluster, as a hint for sizing jobs. - def numCores(): Int + // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. + def defaultParallelism(): Int } diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 278f76fb19..6ca0556d9f 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -13,7 +13,6 @@ import scala.collection.mutable.Map import scala.collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.HadoopFileWriter import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapred.SequenceFileOutputFormat @@ -66,21 +65,4 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) } } - - def lookup(key: K): Seq[V] = { - self.partitioner match { - case Some(p) => - val index = p.getPartition(key) - def process(it: Iterator[(K, V)]): Seq[V] = { - val buf = new ArrayBuffer[V] - for ((k, v) <- it if k == key) - buf += v - buf - } - val res = self.context.runJob(self, process _, Array(index)) - res(0) - case None => - throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") - } - } } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index f09ea3da3f..0a866ce198 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -63,20 +63,14 @@ extends Logging { // Methods for creating RDDs - def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] = + def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = new ParallelArray[T](this, seq, numSlices) - - def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] = - parallelize(seq, numCores) - def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] = + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = parallelize(seq, numSlices) - def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] = - parallelize(seq, numCores) - - def textFile(path: String): RDD[String] = { - hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) + def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { + hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } @@ -88,22 +82,24 @@ extends Logging { def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], - valueClass: Class[V]) + valueClass: Class[V], + minSplits: Int = defaultMinSplits) : RDD[(K, V)] = { - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], - valueClass: Class[V]) + valueClass: Class[V], + minSplits: Int = defaultMinSplits) : RDD[(K, V)] = { val conf = new JobConf() FileInputFormat.setInputPaths(conf, path) val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } /** @@ -111,23 +107,29 @@ extends Logging { * the classes of keys, values and the InputFormat so that users don't need * to pass them directly. */ - def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) : RDD[(K, V)] = { - hadoopFile(path, - fm.erasure.asInstanceOf[Class[F]], - km.erasure.asInstanceOf[Class[K]], - vm.erasure.asInstanceOf[Class[V]]) + hadoopFile(path, fm.erasure.asInstanceOf[Class[F]], km.erasure.asInstanceOf[Class[K]], + vm.erasure.asInstanceOf[Class[V]], minSplits) } + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) + (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = + hadoopFile[K, V, F](path, defaultMinSplits) + /** Get an RDD for a Hadoop SequenceFile with given key and value types */ def sequenceFile[K, V](path: String, keyClass: Class[K], - valueClass: Class[V]): RDD[(K, V)] = { + valueClass: Class[V], + minSplits: Int): RDD[(K, V)] = { val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) } + def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, defaultMinSplits) + /** * Version of sequenceFile() for types implicitly convertible to Writables through a WritableConverter. * @@ -138,27 +140,32 @@ extends Logging { * functions instead to create a new converter for the appropriate type. In addition, we pass the converter * a ClassManifest of its type to allow it to figure out the Writable class to use in the subclass case. */ - def sequenceFile[K, V](path: String) + def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits) (implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { val kc = kcf() val vc = vcf() val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]]) + vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) writables.map{case (k,v) => (kc.convert(k), vc.convert(v))} } - def objectFile[T: ClassManifest](path: String): RDD[T] = { - import SparkContext.writableWritableConverter // To get converters for NullWritable and BytesWritable - sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes)) - .flatMap(_.toTraversable) + /** + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys + * and BytesWritable values that contain a serialized partition. This is still an experimental + * storage format and may not be supported exactly as is in future Spark releases. It will also + * be pretty slow if you use the default serializer (Java serialization), though the nice thing + * about it is that there's very little effort required to save arbitrary objects. + */ + def objectFile[T: ClassManifest](path: String, minSplits: Int = defaultMinSplits): RDD[T] = { + sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) + .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes)) } - /** Build the union of a list of RDDs. */ - //def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = - // new UnionRDD(this, rdds) + def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = + new UnionRDD(this, rdds) // Methods for creating shared variables @@ -202,37 +209,38 @@ extends Logging { /** * Run a function on a given set of partitions in an RDD and return the results. * This is the main entry point to the scheduler, by which all actions get launched. + * The allowLocal flag specifies whether the scheduler can run the computation on the + * master rather than shipping it out to the cluster, for short actions like first(). */ - def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]) - (implicit m: ClassManifest[U]) + def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], allowLocal: Boolean) : Array[U] = { logInfo("Starting job...") val start = System.nanoTime - val result = scheduler.runJob(rdd, func, partitions) + val result = scheduler.runJob(rdd, func, partitions, allowLocal) logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") result } - def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) - (implicit m: ClassManifest[U]) + def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int], + allowLocal: Boolean) : Array[U] = { - runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions) + runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) } /** * Run a job on all partitions in an RDD and return the results in an array. */ - def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U) - (implicit m: ClassManifest[U]) + def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U) : Array[U] = { - runJob(rdd, func, 0 until rdd.splits.size) + runJob(rdd, func, 0 until rdd.splits.size, false) } - def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U) - (implicit m: ClassManifest[U]) + def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U) : Array[U] = { - runJob(rdd, func, 0 until rdd.splits.size) + runJob(rdd, func, 0 until rdd.splits.size, false) } + // Clean a closure to make it ready to serialized and send to tasks // (removes unreferenced variables in $outer's, updates REPL variables) private[spark] def clean[F <: AnyRef](f: F): F = { @@ -240,8 +248,11 @@ extends Logging { return f } - // Get the number of cores available to run tasks (as reported by Scheduler) - def numCores = scheduler.numCores + // Default level of parallelism to use when not given by user (e.g. for reduce tasks) + def defaultParallelism: Int = scheduler.defaultParallelism + + // Default min number of splits for Hadoop RDDs when not given by user + def defaultMinSplits: Int = Math.min(defaultParallelism, 4) private var nextShuffleId = new AtomicInteger(0) -- cgit v1.2.3