diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-10 00:06:15 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-10 00:06:15 -0400 |
commit | 25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd (patch) | |
tree | 427166a33ec32f26f82d01ddfda5ce42331a4ad8 | |
parent | b7f1f62ff56212441bd346f6352dee99142555f4 (diff) | |
download | spark-25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd.tar.gz spark-25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd.tar.bz2 spark-25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd.zip |
Moved PairRDD and SequenceFileRDD functions to separate source files
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 264 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 306 | ||||
-rw-r--r-- | core/src/main/scala/spark/SequenceFileRDDFunctions.scala | 85 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 8 |
4 files changed, 371 insertions, 292 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala new file mode 100644 index 0000000000..d179328ccf --- /dev/null +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -0,0 +1,264 @@ +package spark + +import java.io.EOFException +import java.net.URL +import java.io.ObjectInputStream +import java.util.concurrent.atomic.AtomicLong +import java.util.HashSet +import java.util.Random +import java.util.Date + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Map +import scala.collection.mutable.HashMap + +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.HadoopFileWriter +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.hadoop.mapred.OutputCommitter +import org.apache.hadoop.mapred.FileOutputCommitter +import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.Text + +import SparkContext._ + +/** + * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. + */ +@serializable +class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging { + def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { + def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = { + for ((k, v) <- m2) { + m1.get(k) match { + case None => m1(k) = v + case Some(w) => m1(k) = func(w, v) + } + } + return m1 + } + self.map(pair => HashMap(pair)).reduce(mergeMaps) + } + + def combineByKey[C](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numSplits: Int) + : RDD[(K, C)] = + { + val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + val partitioner = new HashPartitioner(numSplits) + new ShuffledRDD(self, aggregator, partitioner) + } + + def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { + combineByKey[V]((v: V) => v, func, func, numSplits) + } + + def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, numSplits) + bufs.asInstanceOf[RDD[(K, Seq[V])]] + } + + def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { + val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } + val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } + (vs ++ ws).groupByKey(numSplits).flatMap { + case (k, seq) => { + val vbuf = new ArrayBuffer[V] + val wbuf = new ArrayBuffer[W] + seq.foreach(_ match { + case Left(v) => vbuf += v + case Right(w) => wbuf += w + }) + for (v <- vbuf; w <- wbuf) yield (k, (v, w)) + } + } + } + + def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { + val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } + val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } + (vs ++ ws).groupByKey(numSplits).flatMap { + case (k, seq) => { + val vbuf = new ArrayBuffer[V] + val wbuf = new ArrayBuffer[Option[W]] + seq.foreach(_ match { + case Left(v) => vbuf += v + case Right(w) => wbuf += Some(w) + }) + if (wbuf.isEmpty) { + wbuf += None + } + for (v <- vbuf; w <- wbuf) yield (k, (v, w)) + } + } + } + + def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { + val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } + val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } + (vs ++ ws).groupByKey(numSplits).flatMap { + case (k, seq) => { + val vbuf = new ArrayBuffer[Option[V]] + val wbuf = new ArrayBuffer[W] + seq.foreach(_ match { + case Left(v) => vbuf += Some(v) + case Right(w) => wbuf += w + }) + if (vbuf.isEmpty) { + vbuf += None + } + for (v <- vbuf; w <- wbuf) yield (k, (v, w)) + } + } + } + + def combineByKey[C](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C) + : RDD[(K, C)] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, numCores) + } + + def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { + reduceByKey(func, numCores) + } + + def groupByKey(): RDD[(K, Seq[V])] = { + groupByKey(numCores) + } + + def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + join(other, numCores) + } + + def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + leftOuterJoin(other, numCores) + } + + def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + rightOuterJoin(other, numCores) + } + + def numCores = self.context.numCores + + def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) + + def mapValues[U](f: V => U): RDD[(K, U)] = { + val cleanF = self.context.clean(f) + new MappedValuesRDD(self, cleanF) + } + + def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = { + val cleanF = self.context.clean(f) + new FlatMappedValuesRDD(self, cleanF) + } + + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + val part = self.partitioner match { + case Some(p) => p + case None => new HashPartitioner(numCores) + } + new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map { + case (k, Seq(vs, ws)) => + (k, (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])) + } + } + + def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + val part = self.partitioner match { + case Some(p) => p + case None => new HashPartitioner(numCores) + } + new CoGroupedRDD[K]( + Seq(self.asInstanceOf[RDD[(_, _)]], + other1.asInstanceOf[RDD[(_, _)]], + other2.asInstanceOf[RDD[(_, _)]]), + part).map { + case (k, Seq(vs, w1s, w2s)) => + (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])) + } + } + + def saveAsHadoopFile (path: String, jobConf: JobConf) { + saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf) + } + + def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) { + saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]]) + } + + def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) { + saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass) + } + + def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) { + saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null) + } + + private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) { + logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" ) + val writer = new HadoopFileWriter(path, + keyClass, + valueClass, + outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], + outputCommitterClass.asInstanceOf[Class[OutputCommitter]], + null) + writer.preSetup() + + def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = { + writer.setup(context.stageId, context.splitId, context.attemptId) + writer.open() + + var count = 0 + while(iter.hasNext) { + val record = iter.next + count += 1 + writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + } + + writer.close() + return writer + } + + self.context.runJob(self, writeToFile _ ).foreach(_.commit()) + writer.cleanup() + } + + def getKeyClass() = implicitly[ClassManifest[K]].erasure + + def getValueClass() = implicitly[ClassManifest[V]].erasure +} + +class MappedValuesRDD[K, V, U]( + prev: RDD[(K, V)], f: V => U) +extends RDD[(K, U)](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override val partitioner = prev.partitioner + override def compute(split: Split) = + prev.iterator(split).map{case (k, v) => (k, f(v))} +} + +class FlatMappedValuesRDD[K, V, U]( + prev: RDD[(K, V)], f: V => Traversable[U]) +extends RDD[(K, U)](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override val partitioner = prev.partitioner + override def compute(split: Split) = { + prev.iterator(split).toStream.flatMap { + case (k, v) => f(v).map(x => (k, x)) + }.iterator + } +} diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8f34ffee2b..1d86062012 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -26,8 +26,24 @@ import org.apache.hadoop.io.Text import SparkContext._ -import mesos._ - +/** + * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents + * an immutable, partitioned collection of elements that can be operated on in parallel. + * + * Each RDD is characterized by five main properties: + * - A list of splits (partitions) + * - A function for computing each split + * - A list of dependencies on other RDDs + * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) + * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for HDFS) + * + * All the scheduling and execution in Spark is done based on these methods, allowing each + * RDD to implement its own way of computing itself. + * + * This class also contains transformation methods available on all RDDs (e.g. map and filter). + * In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, + * and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles. + */ @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { // Methods that must be implemented by subclasses @@ -207,289 +223,3 @@ extends RDD[Array[T]](prev.context) { override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator } - - -@serializable class PairRDDExtras[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging { - def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { - def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = { - for ((k, v) <- m2) { - m1.get(k) match { - case None => m1(k) = v - case Some(w) => m1(k) = func(w, v) - } - } - return m1 - } - self.map(pair => HashMap(pair)).reduce(mergeMaps) - } - - def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - numSplits: Int) - : RDD[(K, C)] = - { - val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - val partitioner = new HashPartitioner(numSplits) - new ShuffledRDD(self, aggregator, partitioner) - } - - def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { - combineByKey[V]((v: V) => v, func, func, numSplits) - } - - def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 - val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, numSplits) - bufs.asInstanceOf[RDD[(K, Seq[V])]] - } - - def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[V] - val wbuf = new ArrayBuffer[W] - seq.foreach(_ match { - case Left(v) => vbuf += v - case Right(w) => wbuf += w - }) - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } - } - } - - def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[V] - val wbuf = new ArrayBuffer[Option[W]] - seq.foreach(_ match { - case Left(v) => vbuf += v - case Right(w) => wbuf += Some(w) - }) - if (wbuf.isEmpty) { - wbuf += None - } - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } - } - } - - def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[Option[V]] - val wbuf = new ArrayBuffer[W] - seq.foreach(_ match { - case Left(v) => vbuf += Some(v) - case Right(w) => wbuf += w - }) - if (vbuf.isEmpty) { - vbuf += None - } - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } - } - } - - def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, numCores) - } - - def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { - reduceByKey(func, numCores) - } - - def groupByKey(): RDD[(K, Seq[V])] = { - groupByKey(numCores) - } - - def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { - join(other, numCores) - } - - def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { - leftOuterJoin(other, numCores) - } - - def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { - rightOuterJoin(other, numCores) - } - - def numCores = self.context.numCores - - def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) - - def mapValues[U](f: V => U): RDD[(K, U)] = { - val cleanF = self.context.clean(f) - new MappedValuesRDD(self, cleanF) - } - - def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = { - val cleanF = self.context.clean(f) - new FlatMappedValuesRDD(self, cleanF) - } - - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { - val part = self.partitioner match { - case Some(p) => p - case None => new HashPartitioner(numCores) - } - new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map { - case (k, Seq(vs, ws)) => - (k, (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])) - } - } - - def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - val part = self.partitioner match { - case Some(p) => p - case None => new HashPartitioner(numCores) - } - new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], - other1.asInstanceOf[RDD[(_, _)]], - other2.asInstanceOf[RDD[(_, _)]]), - part).map { - case (k, Seq(vs, w1s, w2s)) => - (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])) - } - } - - def saveAsHadoopFile (path: String, jobConf: JobConf) { - saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf) - } - - def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) { - saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]]) - } - - def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) { - saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass) - } - - def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) { - saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null) - } - - private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) { - logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" ) - val writer = new HadoopFileWriter(path, - keyClass, - valueClass, - outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], - outputCommitterClass.asInstanceOf[Class[OutputCommitter]], - null) - writer.preSetup() - - def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = { - writer.setup(context.stageId, context.splitId, context.attemptId) - writer.open() - - var count = 0 - while(iter.hasNext) { - val record = iter.next - count += 1 - writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) - } - - writer.close() - return writer - } - - self.context.runJob(self, writeToFile _ ).foreach(_.commit()) - writer.cleanup() - } - - def getKeyClass() = implicitly[ClassManifest[K]].erasure - - def getValueClass() = implicitly[ClassManifest[V]].erasure -} - -@serializable -class SequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging { - - def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { - val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) - classManifest[T].erasure - else - implicitly[T => Writable].getClass.getMethods()(0).getReturnType - } - c.asInstanceOf[Class[ _ <: Writable]] - } - - def saveAsSequenceFile(path: String) { - - def anyToWritable[U <% Writable](u: U): Writable = u - - val keyClass = getWritableClass[K] - val valueClass = getWritableClass[V] - val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) - val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) - - logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) - if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) - } else if (!convertKey && convertValue) { - self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) - } else if (convertKey && !convertValue) { - self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) - } else if (convertKey && convertValue) { - self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) - } - } - - def lookup(key: K): Seq[V] = { - self.partitioner match { - case Some(p) => - val index = p.getPartition(key) - def process(it: Iterator[(K, V)]): Seq[V] = { - val buf = new ArrayBuffer[V] - for ((k, v) <- it if k == key) - buf += v - buf - } - val res = self.context.runJob(self, process _, Array(index)) - res(0) - case None => - throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") - } - } -} - -class MappedValuesRDD[K, V, U]( - prev: RDD[(K, V)], f: V => U) -extends RDD[(K, U)](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override val partitioner = prev.partitioner - override def compute(split: Split) = - prev.iterator(split).map{case (k, v) => (k, f(v))} -} - -class FlatMappedValuesRDD[K, V, U]( - prev: RDD[(K, V)], f: V => Traversable[U]) -extends RDD[(K, U)](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override val partitioner = prev.partitioner - override def compute(split: Split) = { - prev.iterator(split).toStream.flatMap { - case (k, v) => f(v).map(x => (k, x)) - }.iterator - } -} diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala new file mode 100644 index 0000000000..8eb19c5436 --- /dev/null +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -0,0 +1,85 @@ +package spark + +import java.io.EOFException +import java.net.URL +import java.io.ObjectInputStream +import java.util.concurrent.atomic.AtomicLong +import java.util.HashSet +import java.util.Random +import java.util.Date + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Map +import scala.collection.mutable.HashMap + +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.HadoopFileWriter +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.hadoop.mapred.OutputCommitter +import org.apache.hadoop.mapred.FileOutputCommitter +import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.Text + +import SparkContext._ + + +/** + * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, + * through an implicit conversion. Note that this can't be part of PairRDDFunctions because + * we need more implicit parameters to convert our keys and values to Writable. + */ +@serializable +class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging { + + def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { + val c = { + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) + classManifest[T].erasure + else + implicitly[T => Writable].getClass.getMethods()(0).getReturnType + } + c.asInstanceOf[Class[ _ <: Writable]] + } + + def saveAsSequenceFile(path: String) { + + def anyToWritable[U <% Writable](u: U): Writable = u + + val keyClass = getWritableClass[K] + val valueClass = getWritableClass[V] + val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) + val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) + + logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) + if (!convertKey && !convertValue) { + self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } else if (!convertKey && convertValue) { + self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } else if (convertKey && !convertValue) { + self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } else if (convertKey && convertValue) { + self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + } + } + + def lookup(key: K): Seq[V] = { + self.partitioner match { + case Some(p) => + val index = p.getPartition(key) + def process(it: Iterator[(K, V)]): Seq[V] = { + val buf = new ArrayBuffer[V] + for ((k, v) <- it if k == key) + buf += v + buf + } + val res = self.context.runJob(self, process _, Array(index)) + res(0) + case None => + throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") + } + } +} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 536668399b..e48d0f1940 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -263,11 +263,11 @@ object SparkContext { // TODO: Add AccumulatorParams for other types, e.g. lists and strings - implicit def rddToPairRDDExtras[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = - new PairRDDExtras(rdd) + implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = + new PairRDDFunctions(rdd) - implicit def rddToSequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) = - new SequencePairRDDExtras(rdd) + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) = + new SequenceFileRDDFunctions(rdd) implicit def intToIntWritable(i: Int) = new IntWritable(i) |