diff options
-rw-r--r-- | core/src/main/scala/spark/SequenceFileRDDFunctions.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 82 | ||||
-rw-r--r-- | core/src/test/scala/spark/FileSuite.scala | 18 |
3 files changed, 70 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 7f591137c6..278f76fb19 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -40,6 +40,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla classManifest[T].erasure else implicitly[T => Writable].getClass.getMethods()(0).getReturnType + // TODO: use something like WritableConverter to avoid reflection } c.asInstanceOf[Class[ _ <: Writable]] } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 1b94760b14..f09ea3da3f 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -129,20 +129,30 @@ extends Logging { } /** - * Version of sequenceFile() for types implicitly convertible to Writables + * Version of sequenceFile() for types implicitly convertible to Writables through a WritableConverter. + * + * WritableConverters are provided in a somewhat strange way (by an implicit function) to support both + * subclasses of Writable and types for which we define a converter (e.g. Int to IntWritable). The most + * natural thing would've been to have implicit objects for the converters, but then we couldn't have + * an object for every subclass of Writable (you can't have a parameterized singleton object). We use + * functions instead to create a new converter for the appropriate type. In addition, we pass the converter + * a ClassManifest of its type to allow it to figure out the Writable class to use in the subclass case. */ def sequenceFile[K, V](path: String) - (implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConv[K], vcf: () => WritableConv[V]) + (implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { val kc = kcf() val vc = vcf() - val fmt = classOf[SequenceFileInputFormat[Writable, Writable]] - hadoopFile(path, fmt, kc.writableClass(km).asInstanceOf[Class[Writable]], vc.writableClass(vm).asInstanceOf[Class[Writable]]).map{case (k,v) => (kc.convert(k), vc.convert(v))} + val format = classOf[SequenceFileInputFormat[Writable, Writable]] + val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], + vc.writableClass(vm).asInstanceOf[Class[Writable]]) + writables.map{case (k,v) => (kc.convert(k), vc.convert(v))} } def objectFile[T: ClassManifest](path: String): RDD[T] = { - import SparkContext.writableWritableConv - sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes)).flatMap(x => x.toTraversable) + import SparkContext.writableWritableConverter // To get converters for NullWritable and BytesWritable + sequenceFile[NullWritable,BytesWritable](path).map(x => Utils.deserialize[Array[T]](x._2.getBytes)) + .flatMap(_.toTraversable) } @@ -271,6 +281,8 @@ object SparkContext { implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) + // Implicit conversions to common Writable types, for saveAsSequenceFile + implicit def intToIntWritable(i: Int) = new IntWritable(i) implicit def longToLongWritable(l: Long) = new LongWritable(l) @@ -285,30 +297,6 @@ object SparkContext { implicit def stringToText(s: String) = new Text(s) - implicit def intWritableConv() = - new WritableConv[Int](_ => classOf[IntWritable], _.asInstanceOf[IntWritable].get) - - implicit def longWritableConv() = - new WritableConv[Long](_ => classOf[LongWritable], _.asInstanceOf[LongWritable].get) - - implicit def doubleWritableConv() = - new WritableConv[Double](_ => classOf[DoubleWritable], _.asInstanceOf[DoubleWritable].get) - - implicit def floatWritableConv() = - new WritableConv[Float](_ => classOf[FloatWritable], _.asInstanceOf[FloatWritable].get) - - implicit def booleanWritableConv() = - new WritableConv[Boolean](_ => classOf[BooleanWritable], _.asInstanceOf[BooleanWritable].get) - - implicit def bytesWritableConv() = - new WritableConv[Array[Byte]](_ => classOf[BytesWritable], _.asInstanceOf[BytesWritable].getBytes) - - implicit def stringWritableConv() = - new WritableConv[String](_ => classOf[Text], _.asInstanceOf[Text].toString) - - implicit def writableWritableConv[T <: Writable]() = - new WritableConv[T](_.erasure, _.asInstanceOf[T]) - private implicit def arrayToArrayWritable[T <% Writable: ClassManifest] (arr: Traversable[T]): ArrayWritable = { def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { @@ -316,6 +304,7 @@ object SparkContext { classManifest[T].erasure else implicitly[T => Writable].getClass.getMethods()(0).getReturnType + // TODO: use something like WritableConverter to avoid reflection } c.asInstanceOf[Class[ _ <: Writable]] } @@ -324,8 +313,39 @@ object SparkContext { new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray) } + + // Helper objects for converting common types to Writable + + private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = { + val wClass = classManifest[W].erasure.asInstanceOf[Class[W]] + new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) + } + + implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get) + + implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get) + + implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get) + + implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get) + + implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get) + + implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) + + implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString) + + implicit def writableWritableConverter[T <: Writable]() = + new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T]) } +/** + * A class encapsulating how to convert some type T to Writable. It stores both the Writable class + * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. + * The getter for the writable class takes a ClassManifest[T] in case this is a generic object + * that doesn't know the type of T when it is created. This sounds strange but is necessary to + * support converting subclasses of Writable to themselves (writableWritableConverter). + */ @serializable -class WritableConv[T](val writableClass: ClassManifest[T] => Class[_], val convert: Writable => T) {} +class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {} diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala index 17111f7753..d21de34e72 100644 --- a/core/src/test/scala/spark/FileSuite.scala +++ b/core/src/test/scala/spark/FileSuite.scala @@ -74,6 +74,24 @@ class FileSuite extends FunSuite { sc.stop() } + test("implicit conversions in reading SequenceFiles") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa) + nums.saveAsSequenceFile(outputDir) + // Similar to the tests above, we read a SequenceFile, but this time we pass type params + // that are convertable to Writable instead of calling sequenceFile[IntWritable, Text] + val output1 = sc.sequenceFile[Int, String](outputDir) + assert(output1.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa"))) + // Also try having one type be a subclass of Writable and one not + val output2 = sc.sequenceFile[Int, Text](outputDir) + assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) + val output3 = sc.sequenceFile[IntWritable, String](outputDir) + assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) + sc.stop() + } + test("object files of ints") { val sc = new SparkContext("local", "test") val tempDir = Files.createTempDir() |