aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala1
-rw-r--r--core/src/main/scala/spark/SparkContext.scala82
-rw-r--r--core/src/test/scala/spark/FileSuite.scala18
3 files changed, 70 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 7f591137c6..278f76fb19 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -40,6 +40,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
classManifest[T].erasure
else
implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ // TODO: use something like WritableConverter to avoid reflection
}
c.asInstanceOf[Class[ _ <: Writable]]
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1b94760b14..f09ea3da3f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -129,20 +129,30 @@ extends Logging {
}
/**
- * Version of sequenceFile() for types implicitly convertible to Writables
+ * Version of sequenceFile() for types implicitly convertible to Writables through a WritableConverter.
+ *
+ * WritableConverters are provided in a somewhat strange way (by an implicit function) to support both
+ * subclasses of Writable and types for which we define a converter (e.g. Int to IntWritable). The most
+ * natural thing would've been to have implicit objects for the converters, but then we couldn't have
+ * an object for every subclass of Writable (you can't have a parameterized singleton object). We use
+ * functions instead to create a new converter for the appropriate type. In addition, we pass the converter
+ * a ClassManifest of its type to allow it to figure out the Writable class to use in the subclass case.
*/
def sequenceFile[K, V](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConv[K], vcf: () => WritableConv[V])
+ (implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
val vc = vcf()
- val fmt = classOf[SequenceFileInputFormat[Writable, Writable]]
- hadoopFile(path, fmt, kc.writableClass(km).asInstanceOf[Class[Writable]], vc.writableClass(vm).asInstanceOf[Class[Writable]]).map{case (k,v) => (kc.convert(k), vc.convert(v))}
+ val format = classOf[SequenceFileInputFormat[Writable, Writable]]
+ val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]],
+ vc.writableClass(vm).asInstanceOf[Class[Writable]])
+ writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
}
def objectFile[T: ClassManifest](path: String): RDD[T] = {
- import SparkContext.writableWritableConv
- sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes)).flatMap(x => x.toTraversable)
+ import SparkContext.writableWritableConverter // To get converters for NullWritable and BytesWritable
+ sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes))
+ .flatMap(_.toTraversable)
}
@@ -271,6 +281,8 @@ object SparkContext {
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
+ // Implicit conversions to common Writable types, for saveAsSequenceFile
+
implicit def intToIntWritable(i: Int) = new IntWritable(i)
implicit def longToLongWritable(l: Long) = new LongWritable(l)
@@ -285,30 +297,6 @@ object SparkContext {
implicit def stringToText(s: String) = new Text(s)
- implicit def intWritableConv() =
- new WritableConv[Int](_ => classOf[IntWritable], _.asInstanceOf[IntWritable].get)
-
- implicit def longWritableConv() =
- new WritableConv[Long](_ => classOf[LongWritable], _.asInstanceOf[LongWritable].get)
-
- implicit def doubleWritableConv() =
- new WritableConv[Double](_ => classOf[DoubleWritable], _.asInstanceOf[DoubleWritable].get)
-
- implicit def floatWritableConv() =
- new WritableConv[Float](_ => classOf[FloatWritable], _.asInstanceOf[FloatWritable].get)
-
- implicit def booleanWritableConv() =
- new WritableConv[Boolean](_ => classOf[BooleanWritable], _.asInstanceOf[BooleanWritable].get)
-
- implicit def bytesWritableConv() =
- new WritableConv[Array[Byte]](_ => classOf[BytesWritable], _.asInstanceOf[BytesWritable].getBytes)
-
- implicit def stringWritableConv() =
- new WritableConv[String](_ => classOf[Text], _.asInstanceOf[Text].toString)
-
- implicit def writableWritableConv[T <: Writable]() =
- new WritableConv[T](_.erasure, _.asInstanceOf[T])
-
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest] (arr: Traversable[T]): ArrayWritable = {
def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
@@ -316,6 +304,7 @@ object SparkContext {
classManifest[T].erasure
else
implicitly[T => Writable].getClass.getMethods()(0).getReturnType
+ // TODO: use something like WritableConverter to avoid reflection
}
c.asInstanceOf[Class[ _ <: Writable]]
}
@@ -324,8 +313,39 @@ object SparkContext {
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray)
}
+
+ // Helper objects for converting common types to Writable
+
+ private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
+ val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
+ new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
+ }
+
+ implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
+
+ implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
+
+ implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
+
+ implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
+
+ implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
+
+ implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+
+ implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
+
+ implicit def writableWritableConverter[T <: Writable]() =
+ new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
+/**
+ * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
+ * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
+ * The getter for the writable class takes a ClassManifest[T] in case this is a generic object
+ * that doesn't know the type of T when it is created. This sounds strange but is necessary to
+ * support converting subclasses of Writable to themselves (writableWritableConverter).
+ */
@serializable
-class WritableConv[T](val writableClass: ClassManifest[T] => Class[_], val convert: Writable => T) {}
+class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {}
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 17111f7753..d21de34e72 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -74,6 +74,24 @@ class FileSuite extends FunSuite {
sc.stop()
}
+ test("implicit conversions in reading SequenceFiles") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
+ nums.saveAsSequenceFile(outputDir)
+ // Similar to the tests above, we read a SequenceFile, but this time we pass type params
+ // that are convertable to Writable instead of calling sequenceFile[IntWritable, Text]
+ val output1 = sc.sequenceFile[Int, String](outputDir)
+ assert(output1.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
+ // Also try having one type be a subclass of Writable and one not
+ val output2 = sc.sequenceFile[Int, Text](outputDir)
+ assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ val output3 = sc.sequenceFile[IntWritable, String](outputDir)
+ assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ sc.stop()
+ }
+
test("object files of ints") {
val sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()