aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-07-14 00:20:14 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-07-14 00:20:14 -0400
commit2fb906e8e58e56a175517a43dc03e3ba50103ee4 (patch)
treeae4b1909ccaaec3e176cb07665b37d5288e7ebfe /core
parentd0c795836476ff0188fd655f18e568bd6ae00d49 (diff)
parent2604939f643bca125f5e2fb53e3221202996d41b (diff)
downloadspark-2fb906e8e58e56a175517a43dc03e3ba50103ee4.tar.gz
spark-2fb906e8e58e56a175517a43dc03e3ba50103ee4.tar.bz2
spark-2fb906e8e58e56a175517a43dc03e3ba50103ee4.zip
Merge branch 'master' into scala-2.9
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala1
-rw-r--r--core/src/main/scala/spark/SparkContext.scala66
-rw-r--r--core/src/test/scala/spark/FileSuite.scala18
3 files changed, 77 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 7f591137c6..278f76fb19 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -40,6 +40,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
classManifest[T].erasure
else
implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ // TODO: use something like WritableConverter to avoid reflection
}
c.asInstanceOf[Class[ _ <: Writable]]
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index e48d0f1940..f09ea3da3f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -129,18 +129,30 @@ extends Logging {
}
/**
- * Smarter version of sequenceFile() that obtains the key and value classes
- * from ClassManifests instead of requiring the user to pass them directly.
+ * Version of sequenceFile() for types implicitly convertible to Writables through a WritableConverter.
+ *
+ * WritableConverters are provided in a somewhat strange way (by an implicit function) to support both
+ * subclasses of Writable and types for which we define a converter (e.g. Int to IntWritable). The most
+ * natural thing would've been to have implicit objects for the converters, but then we couldn't have
+ * an object for every subclass of Writable (you can't have a parameterized singleton object). We use
+ * functions instead to create a new converter for the appropriate type. In addition, we pass the converter
+ * a ClassManifest of its type to allow it to figure out the Writable class to use in the subclass case.
*/
- def sequenceFile[K, V](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V]): RDD[(K, V)] = {
- sequenceFile(path,
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]])
+ def sequenceFile[K, V](path: String)
+ (implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
+ : RDD[(K, V)] = {
+ val kc = kcf()
+ val vc = vcf()
+ val format = classOf[SequenceFileInputFormat[Writable, Writable]]
+ val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]],
+ vc.writableClass(vm).asInstanceOf[Class[Writable]])
+ writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
}
def objectFile[T: ClassManifest](path: String): RDD[T] = {
- sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes)).flatMap(x => x.toTraversable)
+ import SparkContext.writableWritableConverter // To get converters for NullWritable and BytesWritable
+ sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes))
+ .flatMap(_.toTraversable)
}
@@ -269,6 +281,8 @@ object SparkContext {
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
+ // Implicit conversions to common Writable types, for saveAsSequenceFile
+
implicit def intToIntWritable(i: Int) = new IntWritable(i)
implicit def longToLongWritable(l: Long) = new LongWritable(l)
@@ -290,6 +304,7 @@ object SparkContext {
classManifest[T].erasure
else
implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ // TODO: use something like WritableConverter to avoid reflection
}
c.asInstanceOf[Class[ _ <: Writable]]
}
@@ -298,4 +313,39 @@ object SparkContext {
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray)
}
+
+ // Helper objects for converting common types to Writable
+
+ private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
+ val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
+ new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
+ }
+
+ implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
+
+ implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
+
+ implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
+
+ implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
+
+ implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
+
+ implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+
+ implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
+
+ implicit def writableWritableConverter[T <: Writable]() =
+ new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
+
+
+/**
+ * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
+ * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
+ * The getter for the writable class takes a ClassManifest[T] in case this is a generic object
+ * that doesn't know the type of T when it is created. This sounds strange but is necessary to
+ * support converting subclasses of Writable to themselves (writableWritableConverter).
+ */
+@serializable
+class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {}
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 17111f7753..d21de34e72 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -74,6 +74,24 @@ class FileSuite extends FunSuite {
sc.stop()
}
+ test("implicit conversions in reading SequenceFiles") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
+ nums.saveAsSequenceFile(outputDir)
+ // Similar to the tests above, we read a SequenceFile, but this time we pass type params
+ // that are convertable to Writable instead of calling sequenceFile[IntWritable, Text]
+ val output1 = sc.sequenceFile[Int, String](outputDir)
+ assert(output1.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
+ // Also try having one type be a subclass of Writable and one not
+ val output2 = sc.sequenceFile[Int, Text](outputDir)
+ assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ val output3 = sc.sequenceFile[IntWritable, String](outputDir)
+ assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ sc.stop()
+ }
+
test("object files of ints") {
val sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()