aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-07-10 00:06:15 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-07-10 00:06:15 -0400
commit25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd (patch)
tree427166a33ec32f26f82d01ddfda5ce42331a4ad8 /core
parentb7f1f62ff56212441bd346f6352dee99142555f4 (diff)
downloadspark-25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd.tar.gz
spark-25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd.tar.bz2
spark-25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd.zip
Moved PairRDD and SequenceFileRDD functions to separate source files
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala264
-rw-r--r--core/src/main/scala/spark/RDD.scala306
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala85
-rw-r--r--core/src/main/scala/spark/SparkContext.scala8
4 files changed, 371 insertions, 292 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
new file mode 100644
index 0000000000..d179328ccf
--- /dev/null
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -0,0 +1,264 @@
+package spark
+
+import java.io.EOFException
+import java.net.URL
+import java.io.ObjectInputStream
+import java.util.concurrent.atomic.AtomicLong
+import java.util.HashSet
+import java.util.Random
+import java.util.Date
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Map
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.HadoopFileWriter
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.mapred.TextOutputFormat
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.hadoop.mapred.OutputCommitter
+import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.Text
+
+import SparkContext._
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
+ */
+@serializable
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging {
+ def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
+ def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
+ for ((k, v) <- m2) {
+ m1.get(k) match {
+ case None => m1(k) = v
+ case Some(w) => m1(k) = func(w, v)
+ }
+ }
+ return m1
+ }
+ self.map(pair => HashMap(pair)).reduce(mergeMaps)
+ }
+
+ def combineByKey[C](createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ numSplits: Int)
+ : RDD[(K, C)] =
+ {
+ val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val partitioner = new HashPartitioner(numSplits)
+ new ShuffledRDD(self, aggregator, partitioner)
+ }
+
+ def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
+ combineByKey[V]((v: V) => v, func, func, numSplits)
+ }
+
+ def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
+ def createCombiner(v: V) = ArrayBuffer(v)
+ def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+ def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
+ val bufs = combineByKey[ArrayBuffer[V]](
+ createCombiner _, mergeValue _, mergeCombiners _, numSplits)
+ bufs.asInstanceOf[RDD[(K, Seq[V])]]
+ }
+
+ def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
+ val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
+ val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
+ (vs ++ ws).groupByKey(numSplits).flatMap {
+ case (k, seq) => {
+ val vbuf = new ArrayBuffer[V]
+ val wbuf = new ArrayBuffer[W]
+ seq.foreach(_ match {
+ case Left(v) => vbuf += v
+ case Right(w) => wbuf += w
+ })
+ for (v <- vbuf; w <- wbuf) yield (k, (v, w))
+ }
+ }
+ }
+
+ def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
+ val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
+ val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
+ (vs ++ ws).groupByKey(numSplits).flatMap {
+ case (k, seq) => {
+ val vbuf = new ArrayBuffer[V]
+ val wbuf = new ArrayBuffer[Option[W]]
+ seq.foreach(_ match {
+ case Left(v) => vbuf += v
+ case Right(w) => wbuf += Some(w)
+ })
+ if (wbuf.isEmpty) {
+ wbuf += None
+ }
+ for (v <- vbuf; w <- wbuf) yield (k, (v, w))
+ }
+ }
+ }
+
+ def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
+ val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
+ val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
+ (vs ++ ws).groupByKey(numSplits).flatMap {
+ case (k, seq) => {
+ val vbuf = new ArrayBuffer[Option[V]]
+ val wbuf = new ArrayBuffer[W]
+ seq.foreach(_ match {
+ case Left(v) => vbuf += Some(v)
+ case Right(w) => wbuf += w
+ })
+ if (vbuf.isEmpty) {
+ vbuf += None
+ }
+ for (v <- vbuf; w <- wbuf) yield (k, (v, w))
+ }
+ }
+ }
+
+ def combineByKey[C](createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C)
+ : RDD[(K, C)] = {
+ combineByKey(createCombiner, mergeValue, mergeCombiners, numCores)
+ }
+
+ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
+ reduceByKey(func, numCores)
+ }
+
+ def groupByKey(): RDD[(K, Seq[V])] = {
+ groupByKey(numCores)
+ }
+
+ def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
+ join(other, numCores)
+ }
+
+ def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
+ leftOuterJoin(other, numCores)
+ }
+
+ def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
+ rightOuterJoin(other, numCores)
+ }
+
+ def numCores = self.context.numCores
+
+ def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
+
+ def mapValues[U](f: V => U): RDD[(K, U)] = {
+ val cleanF = self.context.clean(f)
+ new MappedValuesRDD(self, cleanF)
+ }
+
+ def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
+ val cleanF = self.context.clean(f)
+ new FlatMappedValuesRDD(self, cleanF)
+ }
+
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ val part = self.partitioner match {
+ case Some(p) => p
+ case None => new HashPartitioner(numCores)
+ }
+ new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map {
+ case (k, Seq(vs, ws)) =>
+ (k, (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]))
+ }
+ }
+
+ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ val part = self.partitioner match {
+ case Some(p) => p
+ case None => new HashPartitioner(numCores)
+ }
+ new CoGroupedRDD[K](
+ Seq(self.asInstanceOf[RDD[(_, _)]],
+ other1.asInstanceOf[RDD[(_, _)]],
+ other2.asInstanceOf[RDD[(_, _)]]),
+ part).map {
+ case (k, Seq(vs, w1s, w2s)) =>
+ (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
+ }
+ }
+
+ def saveAsHadoopFile (path: String, jobConf: JobConf) {
+ saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf)
+ }
+
+ def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) {
+ saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]])
+ }
+
+ def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) {
+ saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass)
+ }
+
+ def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) {
+ saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null)
+ }
+
+ private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) {
+ logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" )
+ val writer = new HadoopFileWriter(path,
+ keyClass,
+ valueClass,
+ outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]],
+ outputCommitterClass.asInstanceOf[Class[OutputCommitter]],
+ null)
+ writer.preSetup()
+
+ def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = {
+ writer.setup(context.stageId, context.splitId, context.attemptId)
+ writer.open()
+
+ var count = 0
+ while(iter.hasNext) {
+ val record = iter.next
+ count += 1
+ writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ }
+
+ writer.close()
+ return writer
+ }
+
+ self.context.runJob(self, writeToFile _ ).foreach(_.commit())
+ writer.cleanup()
+ }
+
+ def getKeyClass() = implicitly[ClassManifest[K]].erasure
+
+ def getValueClass() = implicitly[ClassManifest[V]].erasure
+}
+
+class MappedValuesRDD[K, V, U](
+ prev: RDD[(K, V)], f: V => U)
+extends RDD[(K, U)](prev.context) {
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override val partitioner = prev.partitioner
+ override def compute(split: Split) =
+ prev.iterator(split).map{case (k, v) => (k, f(v))}
+}
+
+class FlatMappedValuesRDD[K, V, U](
+ prev: RDD[(K, V)], f: V => Traversable[U])
+extends RDD[(K, U)](prev.context) {
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override val partitioner = prev.partitioner
+ override def compute(split: Split) = {
+ prev.iterator(split).toStream.flatMap {
+ case (k, v) => f(v).map(x => (k, x))
+ }.iterator
+ }
+}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 8f34ffee2b..1d86062012 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -26,8 +26,24 @@ import org.apache.hadoop.io.Text
import SparkContext._
-import mesos._
-
+/**
+ * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents
+ * an immutable, partitioned collection of elements that can be operated on in parallel.
+ *
+ * Each RDD is characterized by five main properties:
+ * - A list of splits (partitions)
+ * - A function for computing each split
+ * - A list of dependencies on other RDDs
+ * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
+ * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for HDFS)
+ *
+ * All the scheduling and execution in Spark is done based on these methods, allowing each
+ * RDD to implement its own way of computing itself.
+ *
+ * This class also contains transformation methods available on all RDDs (e.g. map and filter).
+ * In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs,
+ * and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.
+ */
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
// Methods that must be implemented by subclasses
@@ -207,289 +223,3 @@ extends RDD[Array[T]](prev.context) {
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
}
-
-
-@serializable class PairRDDExtras[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging {
- def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
- def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
- for ((k, v) <- m2) {
- m1.get(k) match {
- case None => m1(k) = v
- case Some(w) => m1(k) = func(w, v)
- }
- }
- return m1
- }
- self.map(pair => HashMap(pair)).reduce(mergeMaps)
- }
-
- def combineByKey[C](createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C,
- numSplits: Int)
- : RDD[(K, C)] =
- {
- val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
- val partitioner = new HashPartitioner(numSplits)
- new ShuffledRDD(self, aggregator, partitioner)
- }
-
- def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
- combineByKey[V]((v: V) => v, func, func, numSplits)
- }
-
- def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, numSplits)
- bufs.asInstanceOf[RDD[(K, Seq[V])]]
- }
-
- def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[V]
- val wbuf = new ArrayBuffer[W]
- seq.foreach(_ match {
- case Left(v) => vbuf += v
- case Right(w) => wbuf += w
- })
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
- }
- }
-
- def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[V]
- val wbuf = new ArrayBuffer[Option[W]]
- seq.foreach(_ match {
- case Left(v) => vbuf += v
- case Right(w) => wbuf += Some(w)
- })
- if (wbuf.isEmpty) {
- wbuf += None
- }
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
- }
- }
-
- def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[Option[V]]
- val wbuf = new ArrayBuffer[W]
- seq.foreach(_ match {
- case Left(v) => vbuf += Some(v)
- case Right(w) => wbuf += w
- })
- if (vbuf.isEmpty) {
- vbuf += None
- }
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
- }
- }
-
- def combineByKey[C](createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C)
- : RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, numCores)
- }
-
- def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
- reduceByKey(func, numCores)
- }
-
- def groupByKey(): RDD[(K, Seq[V])] = {
- groupByKey(numCores)
- }
-
- def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
- join(other, numCores)
- }
-
- def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
- leftOuterJoin(other, numCores)
- }
-
- def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
- rightOuterJoin(other, numCores)
- }
-
- def numCores = self.context.numCores
-
- def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
-
- def mapValues[U](f: V => U): RDD[(K, U)] = {
- val cleanF = self.context.clean(f)
- new MappedValuesRDD(self, cleanF)
- }
-
- def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
- val cleanF = self.context.clean(f)
- new FlatMappedValuesRDD(self, cleanF)
- }
-
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
- val part = self.partitioner match {
- case Some(p) => p
- case None => new HashPartitioner(numCores)
- }
- new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map {
- case (k, Seq(vs, ws)) =>
- (k, (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]))
- }
- }
-
- def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
- val part = self.partitioner match {
- case Some(p) => p
- case None => new HashPartitioner(numCores)
- }
- new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(_, _)]],
- other1.asInstanceOf[RDD[(_, _)]],
- other2.asInstanceOf[RDD[(_, _)]]),
- part).map {
- case (k, Seq(vs, w1s, w2s)) =>
- (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
- }
- }
-
- def saveAsHadoopFile (path: String, jobConf: JobConf) {
- saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf)
- }
-
- def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) {
- saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]])
- }
-
- def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) {
- saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass)
- }
-
- def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) {
- saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null)
- }
-
- private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) {
- logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" )
- val writer = new HadoopFileWriter(path,
- keyClass,
- valueClass,
- outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]],
- outputCommitterClass.asInstanceOf[Class[OutputCommitter]],
- null)
- writer.preSetup()
-
- def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = {
- writer.setup(context.stageId, context.splitId, context.attemptId)
- writer.open()
-
- var count = 0
- while(iter.hasNext) {
- val record = iter.next
- count += 1
- writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
- }
-
- writer.close()
- return writer
- }
-
- self.context.runJob(self, writeToFile _ ).foreach(_.commit())
- writer.cleanup()
- }
-
- def getKeyClass() = implicitly[ClassManifest[K]].erasure
-
- def getValueClass() = implicitly[ClassManifest[V]].erasure
-}
-
-@serializable
-class SequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
-
- def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
- val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
- classManifest[T].erasure
- else
- implicitly[T => Writable].getClass.getMethods()(0).getReturnType
- }
- c.asInstanceOf[Class[ _ <: Writable]]
- }
-
- def saveAsSequenceFile(path: String) {
-
- def anyToWritable[U <% Writable](u: U): Writable = u
-
- val keyClass = getWritableClass[K]
- val valueClass = getWritableClass[V]
- val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
- val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
- if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
- } else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
- } else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
- } else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
- }
- }
-
- def lookup(key: K): Seq[V] = {
- self.partitioner match {
- case Some(p) =>
- val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
- val buf = new ArrayBuffer[V]
- for ((k, v) <- it if k == key)
- buf += v
- buf
- }
- val res = self.context.runJob(self, process _, Array(index))
- res(0)
- case None =>
- throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
- }
- }
-}
-
-class MappedValuesRDD[K, V, U](
- prev: RDD[(K, V)], f: V => U)
-extends RDD[(K, U)](prev.context) {
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override val partitioner = prev.partitioner
- override def compute(split: Split) =
- prev.iterator(split).map{case (k, v) => (k, f(v))}
-}
-
-class FlatMappedValuesRDD[K, V, U](
- prev: RDD[(K, V)], f: V => Traversable[U])
-extends RDD[(K, U)](prev.context) {
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override val partitioner = prev.partitioner
- override def compute(split: Split) = {
- prev.iterator(split).toStream.flatMap {
- case (k, v) => f(v).map(x => (k, x))
- }.iterator
- }
-}
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
new file mode 100644
index 0000000000..8eb19c5436
--- /dev/null
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -0,0 +1,85 @@
+package spark
+
+import java.io.EOFException
+import java.net.URL
+import java.io.ObjectInputStream
+import java.util.concurrent.atomic.AtomicLong
+import java.util.HashSet
+import java.util.Random
+import java.util.Date
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Map
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.HadoopFileWriter
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.mapred.TextOutputFormat
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.hadoop.mapred.OutputCommitter
+import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.Text
+
+import SparkContext._
+
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
+ * through an implicit conversion. Note that this can't be part of PairRDDFunctions because
+ * we need more implicit parameters to convert our keys and values to Writable.
+ */
+@serializable
+class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
+
+ def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
+ val c = {
+ if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
+ classManifest[T].erasure
+ else
+ implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ }
+ c.asInstanceOf[Class[ _ <: Writable]]
+ }
+
+ def saveAsSequenceFile(path: String) {
+
+ def anyToWritable[U <% Writable](u: U): Writable = u
+
+ val keyClass = getWritableClass[K]
+ val valueClass = getWritableClass[V]
+ val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
+ val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
+
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+ if (!convertKey && !convertValue) {
+ self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ } else if (!convertKey && convertValue) {
+ self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ } else if (convertKey && !convertValue) {
+ self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ } else if (convertKey && convertValue) {
+ self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ }
+ }
+
+ def lookup(key: K): Seq[V] = {
+ self.partitioner match {
+ case Some(p) =>
+ val index = p.getPartition(key)
+ def process(it: Iterator[(K, V)]): Seq[V] = {
+ val buf = new ArrayBuffer[V]
+ for ((k, v) <- it if k == key)
+ buf += v
+ buf
+ }
+ val res = self.context.runJob(self, process _, Array(index))
+ res(0)
+ case None =>
+ throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 536668399b..e48d0f1940 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -263,11 +263,11 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
- implicit def rddToPairRDDExtras[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
- new PairRDDExtras(rdd)
+ implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
+ new PairRDDFunctions(rdd)
- implicit def rddToSequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) =
- new SequencePairRDDExtras(rdd)
+ implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) =
+ new SequenceFileRDDFunctions(rdd)
implicit def intToIntWritable(i: Int) = new IntWritable(i)