aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/spark/Executor.scala2
-rw-r--r--core/src/main/scala/spark/HadoopFileWriter.scala205
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala7
-rw-r--r--core/src/main/scala/spark/RDD.scala111
-rw-r--r--core/src/main/scala/spark/ResultTask.scala7
-rw-r--r--core/src/main/scala/spark/Scheduler.scala5
-rw-r--r--core/src/main/scala/spark/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/spark/SparkContext.scala65
-rw-r--r--core/src/main/scala/spark/Task.scala6
10 files changed, 398 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala
index a970fb6526..99a69203af 100644
--- a/core/src/main/scala/spark/DAGScheduler.scala
+++ b/core/src/main/scala/spark/DAGScheduler.scala
@@ -145,7 +145,7 @@ private trait DAGScheduler extends Scheduler with Logging {
missing.toList
}
- override def runJob[T, U](finalRdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
+ override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int])
(implicit m: ClassManifest[U])
: Array[U] = {
val outputParts = partitions.toArray
@@ -171,7 +171,7 @@ private trait DAGScheduler extends Scheduler with Logging {
if (finalStage.parents.size == 0 && numOutputParts == 1) {
logInfo("Computing the requested partition locally")
val split = finalRdd.splits(outputParts(0))
- return Array(func(finalRdd.iterator(split)))
+ return Array(func(null, finalRdd.iterator(split)))
}
def submitStage(stage: Stage) {
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index b5e34629a2..0942fecff3 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -59,7 +59,7 @@ class Executor extends mesos.Executor with Logging {
val task = Utils.deserialize[Task[Any]](arg, classLoader)
for (gen <- task.generation) // Update generation if any is set
env.mapOutputTracker.updateGeneration(gen)
- val value = task.run
+ val value = task.run(taskId)
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
d.sendStatusUpdate(new TaskStatus(
diff --git a/core/src/main/scala/spark/HadoopFileWriter.scala b/core/src/main/scala/spark/HadoopFileWriter.scala
new file mode 100644
index 0000000000..3a4cd8464c
--- /dev/null
+++ b/core/src/main/scala/spark/HadoopFileWriter.scala
@@ -0,0 +1,205 @@
+package org.apache.hadoop.mapred
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+
+import java.text.SimpleDateFormat
+import java.text.NumberFormat
+import java.io.IOException
+import java.net.URI
+import java.util.Date
+
+import spark.SerializableWritable
+import spark.Logging
+
+@serializable
+class HadoopFileWriter (path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]],
+ outputCommitterClass: Class[_ <: OutputCommitter],
+ @transient jobConf: JobConf = null) extends Logging {
+
+ private val now = new Date()
+ private val conf = new SerializableWritable[JobConf](if (jobConf == null) new JobConf() else jobConf)
+ private val confProvided = (jobConf != null)
+
+ private var jobID = 0
+ private var splitID = 0
+ private var attemptID = 0
+ private var jID: SerializableWritable[JobID] = null
+ private var taID: SerializableWritable[TaskAttemptID] = null
+
+ @transient private var writer: RecordWriter[AnyRef,AnyRef] = null
+ @transient private var format: OutputFormat[AnyRef,AnyRef] = null
+ @transient private var committer: OutputCommitter = null
+ @transient private var jobContext: JobContext = null
+ @transient private var taskContext: TaskAttemptContext = null
+
+ def this (path: String, @transient jobConf: JobConf)
+ = this (path,
+ jobConf.getOutputKeyClass,
+ jobConf.getOutputValueClass,
+ jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]],
+ jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]],
+ jobConf)
+
+ def this (path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]],
+ outputCommitterClass: Class[_ <: OutputCommitter])
+
+ = this (path,
+ keyClass,
+ valueClass,
+ outputFormatClass,
+ outputCommitterClass,
+ null)
+
+ def preSetup() {
+ setIDs(0, 0, 0)
+ setConfParams()
+
+ val jCtxt = getJobContext()
+ getOutputCommitter().setupJob(jCtxt)
+ }
+
+
+ def setup(jobid: Int, splitid: Int, attemptid: Int) {
+ setIDs(jobid, splitid, attemptid)
+ setConfParams()
+ }
+
+ def open() {
+ val numfmt = NumberFormat.getInstance()
+ numfmt.setMinimumIntegerDigits(5)
+ numfmt.setGroupingUsed(false)
+
+ val outputName = "part-" + numfmt.format(splitID)
+ val fs = HadoopFileWriter.createPathFromString(path, conf.value)
+ .getFileSystem(conf.value)
+
+ getOutputCommitter().setupTask(getTaskContext())
+ writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
+ }
+
+ def write(key: AnyRef, value: AnyRef) {
+ if (writer!=null) {
+ //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
+ writer.write(key, value)
+ } else
+ throw new IOException("Writer is null, open() has not been called")
+ }
+
+ def close() {
+ writer.close(Reporter.NULL)
+ }
+
+ def commit(): Boolean = {
+ var result = false
+ val taCtxt = getTaskContext()
+ val cmtr = getOutputCommitter()
+ if (cmtr.needsTaskCommit(taCtxt)) {
+ try {
+ cmtr.commitTask(taCtxt)
+ logInfo (taID + ": Committed")
+ result = true
+ } catch {
+ case e:IOException => {
+ logError ("Error committing the output of task: " + taID.value)
+ e.printStackTrace()
+ cmtr.abortTask(taCtxt)
+ }
+ }
+ return result
+ }
+ logWarning ("No need to commit output of task: " + taID.value)
+ return true
+ }
+
+ def cleanup() {
+ getOutputCommitter().cleanupJob(getJobContext())
+ }
+
+ // ********* Private Functions *********
+
+ private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
+ if (format == null)
+ format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]]
+ return format
+ }
+
+ private def getOutputCommitter(): OutputCommitter = {
+ if (committer == null)
+ committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter]
+ return committer
+ }
+
+ private def getJobContext(): JobContext = {
+ if (jobContext == null)
+ jobContext = new JobContext(conf.value, jID.value)
+ return jobContext
+ }
+
+ private def getTaskContext(): TaskAttemptContext = {
+ if (taskContext == null)
+ taskContext = new TaskAttemptContext(conf.value, taID.value)
+ return taskContext
+ }
+
+ private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
+ jobID = jobid
+ splitID = splitid
+ attemptID = attemptid
+
+ jID = new SerializableWritable[JobID](HadoopFileWriter.createJobID(now, jobid))
+ taID = new SerializableWritable[TaskAttemptID] (new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ }
+
+ private def setConfParams() {
+ if (!confProvided) {
+ conf.value.setOutputFormat(outputFormatClass)
+ conf.value.setOutputCommitter(outputCommitterClass)
+ conf.value.setOutputKeyClass(keyClass)
+ conf.value.setOutputValueClass(valueClass)
+ } else {
+
+ }
+
+ FileOutputFormat.setOutputPath(conf.value, HadoopFileWriter.createPathFromString(path, conf.value))
+
+ conf.value.set("mapred.job.id", jID.value.toString);
+ conf.value.set("mapred.tip.id", taID.value.getTaskID.toString);
+ conf.value.set("mapred.task.id", taID.value.toString);
+ conf.value.setBoolean("mapred.task.is.map", true);
+ conf.value.setInt("mapred.task.partition", splitID);
+ }
+}
+
+object HadoopFileWriter {
+ def createJobID(time: Date, id: Int): JobID = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ val jobtrackerID = formatter.format(new Date())
+ return new JobID(jobtrackerID, id)
+ }
+
+ def createPathFromString(path: String, conf: JobConf): Path = {
+ if (path == null)
+ throw new IllegalArgumentException("Output path is null")
+ var outputPath = new Path(path)
+ val fs = outputPath.getFileSystem(conf)
+ if (outputPath == null || fs == null)
+ throw new IllegalArgumentException("Incorrectly formatted output path")
+ outputPath = outputPath.makeQualified(fs)
+ return outputPath
+ }
+
+ def getInstance[K, V, F <: OutputFormat[K,V], C <: OutputCommitter](path: String)
+ (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]): HadoopFileWriter = {
+ new HadoopFileWriter(path, km.erasure, vm.erasure, fm.erasure.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], cm.erasure.asInstanceOf[Class[OutputCommitter]])
+ }
+}
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index 7d3ab661aa..1f5b6f62c5 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -6,6 +6,7 @@ import java.util.concurrent._
* A simple Scheduler implementation that runs tasks locally in a thread pool.
*/
private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
+ var attemptId = 0
var threadPool: ExecutorService =
Executors.newFixedThreadPool(threads, DaemonThreadFactory)
@@ -16,7 +17,9 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
override def waitForRegister() {}
override def submitTasks(tasks: Seq[Task[_]]) {
- tasks.zipWithIndex.foreach { case (task, i) =>
+ tasks.zipWithIndex.foreach { case (task, i) =>
+ val myAttemptId = attemptId
+ attemptId = attemptId + 1
threadPool.submit(new Runnable {
def run() {
logInfo("Running task " + i)
@@ -31,7 +34,7 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
val deserializedTask = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader)
- val result: Any = deserializedTask.run
+ val result: Any = deserializedTask.run(myAttemptId)
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)
taskEnded(tasks(i), Success, result, accumUpdates)
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 7780c87d34..5ea5fa9b6e 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -6,11 +6,24 @@ import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong
import java.util.HashSet
import java.util.Random
+import java.util.Date
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.HadoopFileWriter
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.mapred.TextOutputFormat
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.hadoop.mapred.OutputCommitter
+import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.Text
+
import SparkContext._
import mesos._
@@ -72,10 +85,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
new CartesianRDD(sc, this, other)
- def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassManifest](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
this.map(t => (func(t), t)).groupByKey(numSplits)
- def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassManifest](func: T => K): RDD[(K, Seq[T])] =
groupBy[K](func, sc.numCores)
def pipe(command: String): RDD[String] =
@@ -154,6 +167,14 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
+
+ def saveAsTextFile(path: String) {
+ this.map( x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text], FileOutputCommitter](path)
+ }
+
+ def saveAsObjectFile(path: String) {
+ this.glom.map( x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))) ).saveAsSequenceFile(path)
+ }
}
class MappedRDD[U: ClassManifest, T: ClassManifest](
@@ -188,7 +209,7 @@ extends RDD[Array[T]](prev.context) {
}
-@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) {
+@serializable class PairRDDExtras[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
for ((k, v) <- m2) {
@@ -347,6 +368,90 @@ extends RDD[Array[T]](prev.context) {
(k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
}
}
+
+ def saveAsHadoopFile (path: String, jobConf: JobConf) {
+ saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf)
+ }
+
+ def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) {
+ saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]])
+ }
+
+ def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) {
+ saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass)
+ }
+
+ def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) {
+ saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null)
+ }
+
+ private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) {
+ logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" )
+ val writer = new HadoopFileWriter(path,
+ keyClass,
+ valueClass,
+ outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]],
+ outputCommitterClass.asInstanceOf[Class[OutputCommitter]],
+ null)
+ writer.preSetup()
+
+ def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = {
+ writer.setup(context.stageId, context.splitId, context.attemptId)
+ writer.open()
+
+ var count = 0
+ while(iter.hasNext) {
+ val record = iter.next
+ count += 1
+ writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ }
+
+ writer.close()
+ return writer
+ }
+
+ self.context.runJob(self, writeToFile _ ).foreach(_.commit())
+ writer.cleanup()
+ }
+
+ def getKeyClass() = implicitly[ClassManifest[K]].erasure
+
+ def getValueClass() = implicitly[ClassManifest[V]].erasure
+}
+
+@serializable
+class SequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
+
+ def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
+ val c = {
+ if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
+ classManifest[T].erasure
+ else
+ implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ }
+ c.asInstanceOf[Class[ _ <: Writable]]
+ }
+
+ def saveAsSequenceFile(path: String) {
+
+ def anyToWritable[U <% Writable](u: U): Writable = u
+
+ val keyClass = getWritableClass[K]
+ val valueClass = getWritableClass[V]
+ val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
+ val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
+
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+ if (!convertKey && !convertValue) {
+ self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ } else if (!convertKey && convertValue) {
+ self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ } else if (convertKey && !convertValue) {
+ self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ } else if (convertKey && convertValue) {
+ self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ }
+ }
def lookup(key: K): Seq[V] = {
self.partitioner match {
diff --git a/core/src/main/scala/spark/ResultTask.scala b/core/src/main/scala/spark/ResultTask.scala
index 5e0797baaa..8bbe31444f 100644
--- a/core/src/main/scala/spark/ResultTask.scala
+++ b/core/src/main/scala/spark/ResultTask.scala
@@ -1,12 +1,13 @@
package spark
-class ResultTask[T, U](stageId: Int, rdd: RDD[T], func: Iterator[T] => U,
+class ResultTask[T, U](stageId: Int, rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
val partition: Int, locs: Seq[String], val outputId: Int)
extends DAGTask[U](stageId) {
val split = rdd.splits(partition)
- override def run: U = {
- func(rdd.iterator(split))
+ override def run(attemptId: Int): U = {
+ val context = new TaskContext(stageId, partition, attemptId)
+ func(context, rdd.iterator(split))
}
override def preferredLocations: Seq[String] = locs
diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala
index f03e586470..832d8b5705 100644
--- a/core/src/main/scala/spark/Scheduler.scala
+++ b/core/src/main/scala/spark/Scheduler.scala
@@ -9,6 +9,11 @@ private trait Scheduler {
// Run a function on some partitions of an RDD, returning an array of results.
def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
+ (implicit m: ClassManifest[U]): Array[U] =
+ runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions)
+
+ // Run a function on some partitions of an RDD, returning an array of results.
+ def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int])
(implicit m: ClassManifest[U]): Array[U]
def stop()
diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala
index 5321baa527..b64401c2c7 100644
--- a/core/src/main/scala/spark/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/ShuffleMapTask.scala
@@ -9,7 +9,7 @@ class ShuffleMapTask(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], v
extends DAGTask[String](stageId) with Logging {
val split = rdd.splits(partition)
- override def run: String = {
+ override def run (attemptId: Int): String = {
val numOutputSplits = dep.partitioner.numPartitions
val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
val partitioner = dep.partitioner.asInstanceOf[Partitioner]
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0bc3abf114..5f12b247a7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -7,6 +7,16 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.SequenceFileInputFormat
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.FloatWritable
+import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.io.BooleanWritable
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.ArrayWritable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
import spark.broadcast._
@@ -107,6 +117,11 @@ extends Logging {
vm.erasure.asInstanceOf[Class[V]])
}
+ def objectFile[T: ClassManifest](path: String): RDD[T] = {
+ sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes)).flatMap(x => x.toTraversable)
+ }
+
+
/** Build the union of a list of RDDs. */
//def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
// new UnionRDD(this, rdds)
@@ -153,7 +168,7 @@ extends Logging {
* Run a function on a given set of partitions in an RDD and return the results.
* This is the main entry point to the scheduler, by which all actions get launched.
*/
- def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
+ def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int])
(implicit m: ClassManifest[U])
: Array[U] = {
logInfo("Starting job...")
@@ -163,15 +178,26 @@ extends Logging {
result
}
+ def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
+ (implicit m: ClassManifest[U])
+ : Array[U] = {
+ runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions)
+ }
+
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
- def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)
+ def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U)
(implicit m: ClassManifest[U])
: Array[U] = {
runJob(rdd, func, 0 until rdd.splits.size)
}
+ def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)
+ (implicit m: ClassManifest[U])
+ : Array[U] = {
+ runJob(rdd, func, 0 until rdd.splits.size)
+ }
// Clean a closure to make it ready to serialized and send to tasks
// (removes unreferenced variables in $outer's, updates REPL variables)
private[spark] def clean[F <: AnyRef](f: F): F = {
@@ -214,6 +240,39 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
- implicit def rddToPairRDDExtras[K, V](rdd: RDD[(K, V)]) =
+ implicit def rddToPairRDDExtras[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDExtras(rdd)
+
+ implicit def rddToSequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) =
+ new SequencePairRDDExtras(rdd)
+
+ implicit def intToIntWritable(i: Int) = new IntWritable(i)
+
+ implicit def longToLongWritable(l: Long) = new LongWritable(l)
+
+ implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
+
+ implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
+
+ implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
+
+ implicit def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob)
+
+ implicit def stringToText(s: String) = new Text(s)
+
+ private implicit def arrayToArrayWritable[T <% Writable: ClassManifest] (arr: Traversable[T]): ArrayWritable = {
+ def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
+ val c = {
+ if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
+ classManifest[T].erasure
+ else
+ implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ }
+ c.asInstanceOf[Class[ _ <: Writable]]
+ }
+
+ def anyToWritable[U <% Writable](u: U): Writable = u
+
+ new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray)
+ }
}
diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala
index 4771bdd0f2..70547445ac 100644
--- a/core/src/main/scala/spark/Task.scala
+++ b/core/src/main/scala/spark/Task.scala
@@ -3,8 +3,12 @@ package spark
import mesos._
@serializable
+class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) {
+}
+
+@serializable
abstract class Task[T] {
- def run: T
+ def run (id: Int): T
def preferredLocations: Seq[String] = Nil
def generation: Option[Long] = None
}