aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-07-14 12:40:56 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-07-14 12:40:56 -0400
commit969644df8eec13f4b89597a0682c8037949d855b (patch)
tree2b2fa4d01130cd76759d68606d2b5ab669ca52be
parent2604939f643bca125f5e2fb53e3221202996d41b (diff)
downloadspark-969644df8eec13f4b89597a0682c8037949d855b.tar.gz
spark-969644df8eec13f4b89597a0682c8037949d855b.tar.bz2
spark-969644df8eec13f4b89597a0682c8037949d855b.zip
Cleaned up a few issues to do with default parallelism levels. Also
renamed HadoopFileWriter to HadoopWriter (since it's not only for files) and fixed a bug for lookup().
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala2
-rw-r--r--core/src/main/scala/spark/DAGScheduler.scala8
-rw-r--r--core/src/main/scala/spark/HadoopFileWriter.scala171
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala5
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala2
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala4
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala43
-rw-r--r--core/src/main/scala/spark/RDD.scala22
-rw-r--r--core/src/main/scala/spark/Scheduler.scala17
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala18
-rw-r--r--core/src/main/scala/spark/SparkContext.scala99
11 files changed, 115 insertions, 276 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index 08ff1d8a01..92d4132e68 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -14,7 +14,7 @@ object Bagel extends Logging {
combiner: Combiner[M, C] = new DefaultCombiner[M],
aggregator: Aggregator[V, A] = new NullAggregator[V],
superstep: Int = 0,
- numSplits: Int = sc.numCores
+ numSplits: Int = sc.defaultParallelism
)(
compute: (V, Option[C], A, Int) => (V, Iterable[M])
): RDD[V] = {
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala
index ecdce037e9..42bb3c2a75 100644
--- a/core/src/main/scala/spark/DAGScheduler.scala
+++ b/core/src/main/scala/spark/DAGScheduler.scala
@@ -145,7 +145,8 @@ private trait DAGScheduler extends Scheduler with Logging {
missing.toList
}
- override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int])
+ override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
+ partitions: Seq[Int], allowLocal: Boolean)
(implicit m: ClassManifest[U])
: Array[U] = {
val outputParts = partitions.toArray
@@ -167,8 +168,9 @@ private trait DAGScheduler extends Scheduler with Logging {
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
- // Optimization for first() and take() if the RDD has no shuffle dependencies
- if (finalStage.parents.size == 0 && numOutputParts == 1) {
+ // Optimization for short actions like first() and take() that can be computed locally
+ // without shipping tasks to the cluster.
+ if (allowLocal && finalStage.parents.size == 0 && numOutputParts == 1) {
logInfo("Computing the requested partition locally")
val split = finalRdd.splits(outputParts(0))
val taskContext = new TaskContext(finalStage.id, outputParts(0), 0)
diff --git a/core/src/main/scala/spark/HadoopFileWriter.scala b/core/src/main/scala/spark/HadoopFileWriter.scala
deleted file mode 100644
index 596b309d34..0000000000
--- a/core/src/main/scala/spark/HadoopFileWriter.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-package org.apache.hadoop.mapred
-
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-
-import java.text.SimpleDateFormat
-import java.text.NumberFormat
-import java.io.IOException
-import java.net.URI
-import java.util.Date
-
-import spark.SerializableWritable
-import spark.Logging
-
-/**
- * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should
- * also contain an output key class, an output value class, a filename to write to, etc
- * exactly like in a Hadoop job.
- */
-@serializable
-class HadoopFileWriter (@transient jobConf: JobConf) extends Logging {
- private val now = new Date()
- private val conf = new SerializableWritable(jobConf)
-
- private var jobID = 0
- private var splitID = 0
- private var attemptID = 0
- private var jID: SerializableWritable[JobID] = null
- private var taID: SerializableWritable[TaskAttemptID] = null
-
- @transient private var writer: RecordWriter[AnyRef,AnyRef] = null
- @transient private var format: OutputFormat[AnyRef,AnyRef] = null
- @transient private var committer: OutputCommitter = null
- @transient private var jobContext: JobContext = null
- @transient private var taskContext: TaskAttemptContext = null
-
- def preSetup() {
- setIDs(0, 0, 0)
- setConfParams()
-
- val jCtxt = getJobContext()
- getOutputCommitter().setupJob(jCtxt)
- }
-
-
- def setup(jobid: Int, splitid: Int, attemptid: Int) {
- setIDs(jobid, splitid, attemptid)
- setConfParams()
- }
-
- def open() {
- val numfmt = NumberFormat.getInstance()
- numfmt.setMinimumIntegerDigits(5)
- numfmt.setGroupingUsed(false)
-
- val outputName = "part-" + numfmt.format(splitID)
- val path = FileOutputFormat.getOutputPath(conf.value)
- val fs: FileSystem = {
- if (path != null)
- path.getFileSystem(conf.value)
- else
- FileSystem.get(conf.value)
- }
-
- getOutputCommitter().setupTask(getTaskContext())
- writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
- }
-
- def write(key: AnyRef, value: AnyRef) {
- if (writer!=null) {
- //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
- writer.write(key, value)
- } else
- throw new IOException("Writer is null, open() has not been called")
- }
-
- def close() {
- writer.close(Reporter.NULL)
- }
-
- def commit(): Boolean = {
- var result = false
- val taCtxt = getTaskContext()
- val cmtr = getOutputCommitter()
- if (cmtr.needsTaskCommit(taCtxt)) {
- try {
- cmtr.commitTask(taCtxt)
- logInfo (taID + ": Committed")
- result = true
- } catch {
- case e:IOException => {
- logError ("Error committing the output of task: " + taID.value)
- e.printStackTrace()
- cmtr.abortTask(taCtxt)
- }
- }
- return result
- }
- logWarning ("No need to commit output of task: " + taID.value)
- return true
- }
-
- def cleanup() {
- getOutputCommitter().cleanupJob(getJobContext())
- }
-
- // ********* Private Functions *********
-
- private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
- if (format == null)
- format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]]
- return format
- }
-
- private def getOutputCommitter(): OutputCommitter = {
- if (committer == null)
- committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter]
- return committer
- }
-
- private def getJobContext(): JobContext = {
- if (jobContext == null)
- jobContext = new JobContext(conf.value, jID.value)
- return jobContext
- }
-
- private def getTaskContext(): TaskAttemptContext = {
- if (taskContext == null)
- taskContext = new TaskAttemptContext(conf.value, taID.value)
- return taskContext
- }
-
- private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
- jobID = jobid
- splitID = splitid
- attemptID = attemptid
-
- jID = new SerializableWritable[JobID](HadoopFileWriter.createJobID(now, jobid))
- taID = new SerializableWritable[TaskAttemptID] (new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
- }
-
- private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString);
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString);
- conf.value.set("mapred.task.id", taID.value.toString);
- conf.value.setBoolean("mapred.task.is.map", true);
- conf.value.setInt("mapred.task.partition", splitID);
- }
-}
-
-object HadoopFileWriter {
- def createJobID(time: Date, id: Int): JobID = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- val jobtrackerID = formatter.format(new Date())
- return new JobID(jobtrackerID, id)
- }
-
- def createPathFromString(path: String, conf: JobConf): Path = {
- if (path == null)
- throw new IllegalArgumentException("Output path is null")
- var outputPath = new Path(path)
- val fs = outputPath.getFileSystem(conf)
- if (outputPath == null || fs == null)
- throw new IllegalArgumentException("Incorrectly formatted output path")
- outputPath = outputPath.makeQualified(fs)
- return outputPath
- }
-}
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index b71c3aa522..5d8a2d0e35 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -34,13 +34,14 @@ class HadoopRDD[K, V](
@transient conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
- valueClass: Class[V])
+ valueClass: Class[V],
+ minSplits: Int)
extends RDD[(K, V)](sc) {
val serializableConf = new SerializableWritable(conf)
@transient val splits_ : Array[Split] = {
val inputFormat = createInputFormat(conf)
- val inputSplits = inputFormat.getSplits(conf, sc.numCores)
+ val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Split] (inputSplits.size)
for (i <- 0 until inputSplits.size)
array(i) = new HadoopSplit(id, i, inputSplits(i))
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index 1f5b6f62c5..e43516c84b 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -53,5 +53,5 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
override def stop() {}
- override def numCores() = threads
+ override def defaultParallelism() = threads
}
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index d635e95dba..9776963c5f 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -241,8 +241,8 @@ extends MScheduler with DAGScheduler with Logging
}
// TODO: query Mesos for number of cores
- override def numCores() =
- System.getProperty("spark.default.parallelism", "2").toInt
+ override def defaultParallelism() =
+ System.getProperty("spark.default.parallelism", "8").toInt
// Create a server for all the JARs added by the user to SparkContext.
// We first copy the JARs to a temp directory for easier server setup.
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index f24b999e2a..260c31cb5e 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -18,7 +18,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.HadoopFileWriter
+import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.OutputFormat
@@ -127,30 +127,30 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)
: RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, numCores)
+ combineByKey(createCombiner, mergeValue, mergeCombiners, defaultParallelism)
}
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
- reduceByKey(func, numCores)
+ reduceByKey(func, defaultParallelism)
}
def groupByKey(): RDD[(K, Seq[V])] = {
- groupByKey(numCores)
+ groupByKey(defaultParallelism)
}
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
- join(other, numCores)
+ join(other, defaultParallelism)
}
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
- leftOuterJoin(other, numCores)
+ leftOuterJoin(other, defaultParallelism)
}
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
- rightOuterJoin(other, numCores)
+ rightOuterJoin(other, defaultParallelism)
}
- def numCores = self.context.numCores
+ def defaultParallelism = self.context.defaultParallelism
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
@@ -167,7 +167,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
val part = self.partitioner match {
case Some(p) => p
- case None => new HashPartitioner(numCores)
+ case None => new HashPartitioner(defaultParallelism)
}
new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map {
case (k, Seq(vs, ws)) =>
@@ -179,7 +179,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
val part = self.partitioner match {
case Some(p) => p
- case None => new HashPartitioner(numCores)
+ case None => new HashPartitioner(defaultParallelism)
}
new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]],
@@ -191,6 +191,23 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
}
}
+ def lookup(key: K): Seq[V] = {
+ self.partitioner match {
+ case Some(p) =>
+ val index = p.getPartition(key)
+ def process(it: Iterator[(K, V)]): Seq[V] = {
+ val buf = new ArrayBuffer[V]
+ for ((k, v) <- it if k == key)
+ buf += v
+ buf
+ }
+ val res = self.context.runJob(self, process _, Array(index), false)
+ res(0)
+ case None =>
+ throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
+ }
+ }
+
def saveAsHadoopFile [F <: OutputFormat[K, V]] (path: String) (implicit fm: ClassManifest[F]) {
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
@@ -204,7 +221,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
conf.setOutputValueClass(valueClass)
conf.setOutputFormat(outputFormatClass)
conf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(conf, HadoopFileWriter.createPathFromString(path, conf))
+ FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
@@ -221,10 +238,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
- val writer = new HadoopFileWriter(conf)
+ val writer = new HadoopWriter(conf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = {
+ def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopWriter = {
writer.setup(context.stageId, context.splitId, context.attemptId)
writer.open()
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 2cddb5db9e..3f07b95191 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -12,17 +12,17 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.HadoopFileWriter
+import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.OutputCommitter
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.TextOutputFormat
import SparkContext._
@@ -107,7 +107,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
}
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
- groupBy[K](f, sc.numCores)
+ groupBy[K](f, sc.defaultParallelism)
def pipe(command: String): RDD[String] =
new PipedRDD(this, command)
@@ -175,7 +175,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
var p = 0
while (buf.size < num && p < splits.size) {
val left = num - buf.size
- val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p))
+ val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
buf ++= res(0)
if (buf.size == num)
return buf.toArray
@@ -184,7 +184,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
return buf.toArray
}
- def first: T = take(1) match {
+ def first(): T = take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala
index 832d8b5705..df86db64a6 100644
--- a/core/src/main/scala/spark/Scheduler.scala
+++ b/core/src/main/scala/spark/Scheduler.scala
@@ -7,17 +7,14 @@ private trait Scheduler {
// Wait for registration with Mesos.
def waitForRegister()
- // Run a function on some partitions of an RDD, returning an array of results.
- def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
- (implicit m: ClassManifest[U]): Array[U] =
- runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions)
-
- // Run a function on some partitions of an RDD, returning an array of results.
- def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int])
- (implicit m: ClassManifest[U]): Array[U]
+ // Run a function on some partitions of an RDD, returning an array of results. The allowLocal flag specifies
+ // whether the scheduler is allowed to run the job on the master machine rather than shipping it to the cluster,
+ // for actions that create short jobs such as first() and take().
+ def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
+ partitions: Seq[Int], allowLocal: Boolean): Array[U]
def stop()
- // Get the number of cores in the cluster, as a hint for sizing jobs.
- def numCores(): Int
+ // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
+ def defaultParallelism(): Int
}
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 278f76fb19..6ca0556d9f 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -13,7 +13,6 @@ import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.HadoopFileWriter
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapred.SequenceFileOutputFormat
@@ -66,21 +65,4 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
}
}
-
- def lookup(key: K): Seq[V] = {
- self.partitioner match {
- case Some(p) =>
- val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
- val buf = new ArrayBuffer[V]
- for ((k, v) <- it if k == key)
- buf += v
- buf
- }
- val res = self.context.runJob(self, process _, Array(index))
- res(0)
- case None =>
- throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
- }
- }
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index f09ea3da3f..0a866ce198 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -63,20 +63,14 @@ extends Logging {
// Methods for creating RDDs
- def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] =
+ def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] =
new ParallelArray[T](this, seq, numSlices)
-
- def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] =
- parallelize(seq, numCores)
- def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] =
+ def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] =
parallelize(seq, numSlices)
- def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] =
- parallelize(seq, numCores)
-
- def textFile(path: String): RDD[String] = {
- hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
+ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
+ hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
.map(pair => pair._2.toString)
}
@@ -88,22 +82,24 @@ extends Logging {
def hadoopRDD[K, V](conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
- valueClass: Class[V])
+ valueClass: Class[V],
+ minSplits: Int = defaultMinSplits)
: RDD[(K, V)] = {
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K, V](path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
- valueClass: Class[V])
+ valueClass: Class[V],
+ minSplits: Int = defaultMinSplits)
: RDD[(K, V)] = {
val conf = new JobConf()
FileInputFormat.setInputPaths(conf, path)
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
/**
@@ -111,23 +107,29 @@ extends Logging {
* the classes of keys, values and the InputFormat so that users don't need
* to pass them directly.
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
+ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
: RDD[(K, V)] = {
- hadoopFile(path,
- fm.erasure.asInstanceOf[Class[F]],
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]])
+ hadoopFile(path, fm.erasure.asInstanceOf[Class[F]], km.erasure.asInstanceOf[Class[K]],
+ vm.erasure.asInstanceOf[Class[V]], minSplits)
}
+ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
+ (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] =
+ hadoopFile[K, V, F](path, defaultMinSplits)
+
/** Get an RDD for a Hadoop SequenceFile with given key and value types */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
- valueClass: Class[V]): RDD[(K, V)] = {
+ valueClass: Class[V],
+ minSplits: Int): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
}
+ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
+ sequenceFile(path, keyClass, valueClass, defaultMinSplits)
+
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a WritableConverter.
*
@@ -138,27 +140,32 @@ extends Logging {
* functions instead to create a new converter for the appropriate type. In addition, we pass the converter
* a ClassManifest of its type to allow it to figure out the Writable class to use in the subclass case.
*/
- def sequenceFile[K, V](path: String)
+ def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
(implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
val vc = vcf()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]])
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
}
- def objectFile[T: ClassManifest](path: String): RDD[T] = {
- import SparkContext.writableWritableConverter // To get converters for NullWritable and BytesWritable
- sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes))
- .flatMap(_.toTraversable)
+ /**
+ * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys
+ * and BytesWritable values that contain a serialized partition. This is still an experimental
+ * storage format and may not be supported exactly as is in future Spark releases. It will also
+ * be pretty slow if you use the default serializer (Java serialization), though the nice thing
+ * about it is that there's very little effort required to save arbitrary objects.
+ */
+ def objectFile[T: ClassManifest](path: String, minSplits: Int = defaultMinSplits): RDD[T] = {
+ sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
+ .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}
-
/** Build the union of a list of RDDs. */
- //def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
- // new UnionRDD(this, rdds)
+ def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
+ new UnionRDD(this, rdds)
// Methods for creating shared variables
@@ -202,37 +209,38 @@ extends Logging {
/**
* Run a function on a given set of partitions in an RDD and return the results.
* This is the main entry point to the scheduler, by which all actions get launched.
+ * The allowLocal flag specifies whether the scheduler can run the computation on the
+ * master rather than shipping it out to the cluster, for short actions like first().
*/
- def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int])
- (implicit m: ClassManifest[U])
+ def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
+ partitions: Seq[Int], allowLocal: Boolean)
: Array[U] = {
logInfo("Starting job...")
val start = System.nanoTime
- val result = scheduler.runJob(rdd, func, partitions)
+ val result = scheduler.runJob(rdd, func, partitions, allowLocal)
logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
result
}
- def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
- (implicit m: ClassManifest[U])
+ def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int],
+ allowLocal: Boolean)
: Array[U] = {
- runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions)
+ runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
- def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U)
- (implicit m: ClassManifest[U])
+ def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U)
: Array[U] = {
- runJob(rdd, func, 0 until rdd.splits.size)
+ runJob(rdd, func, 0 until rdd.splits.size, false)
}
- def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)
- (implicit m: ClassManifest[U])
+ def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U)
: Array[U] = {
- runJob(rdd, func, 0 until rdd.splits.size)
+ runJob(rdd, func, 0 until rdd.splits.size, false)
}
+
// Clean a closure to make it ready to serialized and send to tasks
// (removes unreferenced variables in $outer's, updates REPL variables)
private[spark] def clean[F <: AnyRef](f: F): F = {
@@ -240,8 +248,11 @@ extends Logging {
return f
}
- // Get the number of cores available to run tasks (as reported by Scheduler)
- def numCores = scheduler.numCores
+ // Default level of parallelism to use when not given by user (e.g. for reduce tasks)
+ def defaultParallelism: Int = scheduler.defaultParallelism
+
+ // Default min number of splits for Hadoop RDDs when not given by user
+ def defaultMinSplits: Int = Math.min(defaultParallelism, 4)
private var nextShuffleId = new AtomicInteger(0)