diff options
-rw-r--r-- | core/src/main/scala/spark/DAGScheduler.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/Executor.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/HadoopFileWriter.scala | 205 | ||||
-rw-r--r-- | core/src/main/scala/spark/LocalScheduler.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 111 | ||||
-rw-r--r-- | core/src/main/scala/spark/ResultTask.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/Scheduler.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/spark/ShuffleMapTask.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 65 | ||||
-rw-r--r-- | core/src/main/scala/spark/Task.scala | 6 |
10 files changed, 398 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index a970fb6526..99a69203af 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -145,7 +145,7 @@ private trait DAGScheduler extends Scheduler with Logging { missing.toList } - override def runJob[T, U](finalRdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) + override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]) (implicit m: ClassManifest[U]) : Array[U] = { val outputParts = partitions.toArray @@ -171,7 +171,7 @@ private trait DAGScheduler extends Scheduler with Logging { if (finalStage.parents.size == 0 && numOutputParts == 1) { logInfo("Computing the requested partition locally") val split = finalRdd.splits(outputParts(0)) - return Array(func(finalRdd.iterator(split))) + return Array(func(null, finalRdd.iterator(split))) } def submitStage(stage: Stage) { diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index b5e34629a2..0942fecff3 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -59,7 +59,7 @@ class Executor extends mesos.Executor with Logging { val task = Utils.deserialize[Task[Any]](arg, classLoader) for (gen <- task.generation) // Update generation if any is set env.mapOutputTracker.updateGeneration(gen) - val value = task.run + val value = task.run(taskId) val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates) d.sendStatusUpdate(new TaskStatus( diff --git a/core/src/main/scala/spark/HadoopFileWriter.scala b/core/src/main/scala/spark/HadoopFileWriter.scala new file mode 100644 index 0000000000..3a4cd8464c --- /dev/null +++ b/core/src/main/scala/spark/HadoopFileWriter.scala @@ -0,0 +1,205 @@ +package org.apache.hadoop.mapred + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.ReflectionUtils +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Text + +import java.text.SimpleDateFormat +import java.text.NumberFormat +import java.io.IOException +import java.net.URI +import java.util.Date + +import spark.SerializableWritable +import spark.Logging + +@serializable +class HadoopFileWriter (path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]], + outputCommitterClass: Class[_ <: OutputCommitter], + @transient jobConf: JobConf = null) extends Logging { + + private val now = new Date() + private val conf = new SerializableWritable[JobConf](if (jobConf == null) new JobConf() else jobConf) + private val confProvided = (jobConf != null) + + private var jobID = 0 + private var splitID = 0 + private var attemptID = 0 + private var jID: SerializableWritable[JobID] = null + private var taID: SerializableWritable[TaskAttemptID] = null + + @transient private var writer: RecordWriter[AnyRef,AnyRef] = null + @transient private var format: OutputFormat[AnyRef,AnyRef] = null + @transient private var committer: OutputCommitter = null + @transient private var jobContext: JobContext = null + @transient private var taskContext: TaskAttemptContext = null + + def this (path: String, @transient jobConf: JobConf) + = this (path, + jobConf.getOutputKeyClass, + jobConf.getOutputValueClass, + jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], + jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], + jobConf) + + def this (path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]], + outputCommitterClass: Class[_ <: OutputCommitter]) + + = this (path, + keyClass, + valueClass, + outputFormatClass, + outputCommitterClass, + null) + + def preSetup() { + setIDs(0, 0, 0) + setConfParams() + + val jCtxt = getJobContext() + getOutputCommitter().setupJob(jCtxt) + } + + + def setup(jobid: Int, splitid: Int, attemptid: Int) { + setIDs(jobid, splitid, attemptid) + setConfParams() + } + + def open() { + val numfmt = NumberFormat.getInstance() + numfmt.setMinimumIntegerDigits(5) + numfmt.setGroupingUsed(false) + + val outputName = "part-" + numfmt.format(splitID) + val fs = HadoopFileWriter.createPathFromString(path, conf.value) + .getFileSystem(conf.value) + + getOutputCommitter().setupTask(getTaskContext()) + writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) + } + + def write(key: AnyRef, value: AnyRef) { + if (writer!=null) { + //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")") + writer.write(key, value) + } else + throw new IOException("Writer is null, open() has not been called") + } + + def close() { + writer.close(Reporter.NULL) + } + + def commit(): Boolean = { + var result = false + val taCtxt = getTaskContext() + val cmtr = getOutputCommitter() + if (cmtr.needsTaskCommit(taCtxt)) { + try { + cmtr.commitTask(taCtxt) + logInfo (taID + ": Committed") + result = true + } catch { + case e:IOException => { + logError ("Error committing the output of task: " + taID.value) + e.printStackTrace() + cmtr.abortTask(taCtxt) + } + } + return result + } + logWarning ("No need to commit output of task: " + taID.value) + return true + } + + def cleanup() { + getOutputCommitter().cleanupJob(getJobContext()) + } + + // ********* Private Functions ********* + + private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = { + if (format == null) + format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]] + return format + } + + private def getOutputCommitter(): OutputCommitter = { + if (committer == null) + committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter] + return committer + } + + private def getJobContext(): JobContext = { + if (jobContext == null) + jobContext = new JobContext(conf.value, jID.value) + return jobContext + } + + private def getTaskContext(): TaskAttemptContext = { + if (taskContext == null) + taskContext = new TaskAttemptContext(conf.value, taID.value) + return taskContext + } + + private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { + jobID = jobid + splitID = splitid + attemptID = attemptid + + jID = new SerializableWritable[JobID](HadoopFileWriter.createJobID(now, jobid)) + taID = new SerializableWritable[TaskAttemptID] (new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) + } + + private def setConfParams() { + if (!confProvided) { + conf.value.setOutputFormat(outputFormatClass) + conf.value.setOutputCommitter(outputCommitterClass) + conf.value.setOutputKeyClass(keyClass) + conf.value.setOutputValueClass(valueClass) + } else { + + } + + FileOutputFormat.setOutputPath(conf.value, HadoopFileWriter.createPathFromString(path, conf.value)) + + conf.value.set("mapred.job.id", jID.value.toString); + conf.value.set("mapred.tip.id", taID.value.getTaskID.toString); + conf.value.set("mapred.task.id", taID.value.toString); + conf.value.setBoolean("mapred.task.is.map", true); + conf.value.setInt("mapred.task.partition", splitID); + } +} + +object HadoopFileWriter { + def createJobID(time: Date, id: Int): JobID = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + return new JobID(jobtrackerID, id) + } + + def createPathFromString(path: String, conf: JobConf): Path = { + if (path == null) + throw new IllegalArgumentException("Output path is null") + var outputPath = new Path(path) + val fs = outputPath.getFileSystem(conf) + if (outputPath == null || fs == null) + throw new IllegalArgumentException("Incorrectly formatted output path") + outputPath = outputPath.makeQualified(fs) + return outputPath + } + + def getInstance[K, V, F <: OutputFormat[K,V], C <: OutputCommitter](path: String) + (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]): HadoopFileWriter = { + new HadoopFileWriter(path, km.erasure, vm.erasure, fm.erasure.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], cm.erasure.asInstanceOf[Class[OutputCommitter]]) + } +} diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 7d3ab661aa..1f5b6f62c5 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -6,6 +6,7 @@ import java.util.concurrent._ * A simple Scheduler implementation that runs tasks locally in a thread pool. */ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { + var attemptId = 0 var threadPool: ExecutorService = Executors.newFixedThreadPool(threads, DaemonThreadFactory) @@ -16,7 +17,9 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { override def waitForRegister() {} override def submitTasks(tasks: Seq[Task[_]]) { - tasks.zipWithIndex.foreach { case (task, i) => + tasks.zipWithIndex.foreach { case (task, i) => + val myAttemptId = attemptId + attemptId = attemptId + 1 threadPool.submit(new Runnable { def run() { logInfo("Running task " + i) @@ -31,7 +34,7 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { logInfo("Size of task " + i + " is " + bytes.size + " bytes") val deserializedTask = Utils.deserialize[Task[_]]( bytes, currentThread.getContextClassLoader) - val result: Any = deserializedTask.run + val result: Any = deserializedTask.run(myAttemptId) val accumUpdates = Accumulators.values logInfo("Finished task " + i) taskEnded(tasks(i), Success, result, accumUpdates) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 7780c87d34..5ea5fa9b6e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -6,11 +6,24 @@ import java.io.ObjectInputStream import java.util.concurrent.atomic.AtomicLong import java.util.HashSet import java.util.Random +import java.util.Date import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Map import scala.collection.mutable.HashMap +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.HadoopFileWriter +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.hadoop.mapred.OutputCommitter +import org.apache.hadoop.mapred.FileOutputCommitter +import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.Text + import SparkContext._ import mesos._ @@ -72,10 +85,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) - def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] = + def groupBy[K: ClassManifest](func: T => K, numSplits: Int): RDD[(K, Seq[T])] = this.map(t => (func(t), t)).groupByKey(numSplits) - def groupBy[K](func: T => K): RDD[(K, Seq[T])] = + def groupBy[K: ClassManifest](func: T => K): RDD[(K, Seq[T])] = groupBy[K](func, sc.numCores) def pipe(command: String): RDD[String] = @@ -154,6 +167,14 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } + + def saveAsTextFile(path: String) { + this.map( x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text], FileOutputCommitter](path) + } + + def saveAsObjectFile(path: String) { + this.glom.map( x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))) ).saveAsSequenceFile(path) + } } class MappedRDD[U: ClassManifest, T: ClassManifest]( @@ -188,7 +209,7 @@ extends RDD[Array[T]](prev.context) { } -@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) { +@serializable class PairRDDExtras[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging { def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = { for ((k, v) <- m2) { @@ -347,6 +368,90 @@ extends RDD[Array[T]](prev.context) { (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])) } } + + def saveAsHadoopFile (path: String, jobConf: JobConf) { + saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf) + } + + def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) { + saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]]) + } + + def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) { + saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass) + } + + def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) { + saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null) + } + + private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) { + logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" ) + val writer = new HadoopFileWriter(path, + keyClass, + valueClass, + outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], + outputCommitterClass.asInstanceOf[Class[OutputCommitter]], + null) + writer.preSetup() + + def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = { + writer.setup(context.stageId, context.splitId, context.attemptId) + writer.open() + + var count = 0 + while(iter.hasNext) { + val record = iter.next + count += 1 + writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + } + + writer.close() + return writer + } + + self.context.runJob(self, writeToFile _ ).foreach(_.commit()) + writer.cleanup() + } + + def getKeyClass() = implicitly[ClassManifest[K]].erasure + + def getValueClass() = implicitly[ClassManifest[V]].erasure +} + +@serializable +class SequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging { + + def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { + val c = { + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) + classManifest[T].erasure + else + implicitly[T => Writable].getClass.getMethods()(0).getReturnType + } + c.asInstanceOf[Class[ _ <: Writable]] + } + + def saveAsSequenceFile(path: String) { + + def anyToWritable[U <% Writable](u: U): Writable = u + + val keyClass = getWritableClass[K] + val valueClass = getWritableClass[V] + val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) + val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) + + logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) + if (!convertKey && !convertValue) { + self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } else if (!convertKey && convertValue) { + self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } else if (convertKey && !convertValue) { + self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } else if (convertKey && convertValue) { + self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } + } def lookup(key: K): Seq[V] = { self.partitioner match { diff --git a/core/src/main/scala/spark/ResultTask.scala b/core/src/main/scala/spark/ResultTask.scala index 5e0797baaa..8bbe31444f 100644 --- a/core/src/main/scala/spark/ResultTask.scala +++ b/core/src/main/scala/spark/ResultTask.scala @@ -1,12 +1,13 @@ package spark -class ResultTask[T, U](stageId: Int, rdd: RDD[T], func: Iterator[T] => U, +class ResultTask[T, U](stageId: Int, rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, val partition: Int, locs: Seq[String], val outputId: Int) extends DAGTask[U](stageId) { val split = rdd.splits(partition) - override def run: U = { - func(rdd.iterator(split)) + override def run(attemptId: Int): U = { + val context = new TaskContext(stageId, partition, attemptId) + func(context, rdd.iterator(split)) } override def preferredLocations: Seq[String] = locs diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala index f03e586470..832d8b5705 100644 --- a/core/src/main/scala/spark/Scheduler.scala +++ b/core/src/main/scala/spark/Scheduler.scala @@ -9,6 +9,11 @@ private trait Scheduler { // Run a function on some partitions of an RDD, returning an array of results. def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) + (implicit m: ClassManifest[U]): Array[U] = + runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions) + + // Run a function on some partitions of an RDD, returning an array of results. + def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]) (implicit m: ClassManifest[U]): Array[U] def stop() diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala index 5321baa527..b64401c2c7 100644 --- a/core/src/main/scala/spark/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/ShuffleMapTask.scala @@ -9,7 +9,7 @@ class ShuffleMapTask(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], v extends DAGTask[String](stageId) with Logging { val split = rdd.splits(partition) - override def run: String = { + override def run (attemptId: Int): String = { val numOutputSplits = dep.partitioner.numPartitions val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] val partitioner = dep.partitioner.asInstanceOf[Partitioner] diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0bc3abf114..5f12b247a7 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -7,6 +7,16 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.SequenceFileInputFormat +import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.FloatWritable +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.BooleanWritable +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.ArrayWritable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Text import spark.broadcast._ @@ -107,6 +117,11 @@ extends Logging { vm.erasure.asInstanceOf[Class[V]]) } + def objectFile[T: ClassManifest](path: String): RDD[T] = { + sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes)).flatMap(x => x.toTraversable) + } + + /** Build the union of a list of RDDs. */ //def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = // new UnionRDD(this, rdds) @@ -153,7 +168,7 @@ extends Logging { * Run a function on a given set of partitions in an RDD and return the results. * This is the main entry point to the scheduler, by which all actions get launched. */ - def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) + def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]) (implicit m: ClassManifest[U]) : Array[U] = { logInfo("Starting job...") @@ -163,15 +178,26 @@ extends Logging { result } + def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) + (implicit m: ClassManifest[U]) + : Array[U] = { + runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions) + } + /** * Run a job on all partitions in an RDD and return the results in an array. */ - def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U) + def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U) (implicit m: ClassManifest[U]) : Array[U] = { runJob(rdd, func, 0 until rdd.splits.size) } + def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U) + (implicit m: ClassManifest[U]) + : Array[U] = { + runJob(rdd, func, 0 until rdd.splits.size) + } // Clean a closure to make it ready to serialized and send to tasks // (removes unreferenced variables in $outer's, updates REPL variables) private[spark] def clean[F <: AnyRef](f: F): F = { @@ -214,6 +240,39 @@ object SparkContext { // TODO: Add AccumulatorParams for other types, e.g. lists and strings - implicit def rddToPairRDDExtras[K, V](rdd: RDD[(K, V)]) = + implicit def rddToPairRDDExtras[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = new PairRDDExtras(rdd) + + implicit def rddToSequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) = + new SequencePairRDDExtras(rdd) + + implicit def intToIntWritable(i: Int) = new IntWritable(i) + + implicit def longToLongWritable(l: Long) = new LongWritable(l) + + implicit def floatToFloatWritable(f: Float) = new FloatWritable(f) + + implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d) + + implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b) + + implicit def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob) + + implicit def stringToText(s: String) = new Text(s) + + private implicit def arrayToArrayWritable[T <% Writable: ClassManifest] (arr: Traversable[T]): ArrayWritable = { + def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { + val c = { + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) + classManifest[T].erasure + else + implicitly[T => Writable].getClass.getMethods()(0).getReturnType + } + c.asInstanceOf[Class[ _ <: Writable]] + } + + def anyToWritable[U <% Writable](u: U): Writable = u + + new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray) + } } diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala index 4771bdd0f2..70547445ac 100644 --- a/core/src/main/scala/spark/Task.scala +++ b/core/src/main/scala/spark/Task.scala @@ -3,8 +3,12 @@ package spark import mesos._ @serializable +class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) { +} + +@serializable abstract class Task[T] { - def run: T + def run (id: Int): T def preferredLocations: Seq[String] = Nil def generation: Option[Long] = None } |