aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-02-03 20:17:12 -0800
committerReynold Xin <rxin@databricks.com>2015-02-03 20:17:12 -0800
commitd37978d8aafef8a2e637687f3848ca0a8b935b33 (patch)
tree6e46fa8b7bcd03a8828ae089c71a93fb26d8b0b3 /core
parent1077f2e1def6266aee6ad6f0640a8f46cd273e21 (diff)
downloadspark-d37978d8aafef8a2e637687f3848ca0a8b935b33.tar.gz
spark-d37978d8aafef8a2e637687f3848ca0a8b935b33.tar.bz2
spark-d37978d8aafef8a2e637687f3848ca0a8b935b33.zip
[SPARK-4795][Core] Redesign the "primitive type => Writable" implicit APIs to make them be activated automatically
Try to redesign the "primitive type => Writable" implicit APIs to make them be activated automatically and without breaking binary compatibility. However, this PR will breaking the source compatibility if people use `xxxToXxxWritable` occasionally. See the unit test in `graphx`. Author: zsxwing <zsxwing@gmail.com> Closes #3642 from zsxwing/SPARK-4795 and squashes the following commits: 914b2d6 [zsxwing] Add implicit back to the Writables methods 0b9017f [zsxwing] Add some docs a0e8509 [zsxwing] Merge branch 'master' into SPARK-4795 39343de [zsxwing] Fix the unit test 64853af [zsxwing] Reorganize the rest 'implicit' methods in SparkContext
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala70
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/package.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala49
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala14
7 files changed, 128 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6a16a31654..16c6fdbe52 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1749,8 +1749,14 @@ object SparkContext extends Logging {
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
- rdd: RDD[(K, V)]) =
+ rdd: RDD[(K, V)]) = {
+ val kf = implicitly[K => Writable]
+ val vf = implicitly[V => Writable]
+ // Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
+ implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
+ implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
RDD.rddToSequenceFileRDDFunctions(rdd)
+ }
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
@@ -1767,20 +1773,35 @@ object SparkContext extends Logging {
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
RDD.numericRDDToDoubleRDDFunctions(rdd)
- // Implicit conversions to common Writable types, for saveAsSequenceFile
+ // The following deprecated functions have already been moved to `object WritableFactory` to
+ // make the compiler find them automatically. They are still kept here for backward compatibility.
+ @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)
+ @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)
+ @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)
+ @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)
+ @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)
+ @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)
+ @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
implicit def stringToText(s: String): Text = new Text(s)
private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
@@ -2070,7 +2091,7 @@ object WritableConverter {
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
- // The following implicit functions were in SparkContext before 1.2 and users had to
+ // The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
@@ -2103,3 +2124,46 @@ object WritableConverter {
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
+
+/**
+ * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
+ * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
+ * The Writable class will be used in `SequenceFileRDDFunctions`.
+ */
+private[spark] class WritableFactory[T](
+ val writableClass: ClassTag[T] => Class[_ <: Writable],
+ val convert: T => Writable) extends Serializable
+
+object WritableFactory {
+
+ private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W)
+ : WritableFactory[T] = {
+ val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]]
+ new WritableFactory[T](_ => writableClass, convert)
+ }
+
+ implicit def intWritableFactory: WritableFactory[Int] =
+ simpleWritableFactory(new IntWritable(_))
+
+ implicit def longWritableFactory: WritableFactory[Long] =
+ simpleWritableFactory(new LongWritable(_))
+
+ implicit def floatWritableFactory: WritableFactory[Float] =
+ simpleWritableFactory(new FloatWritable(_))
+
+ implicit def doubleWritableFactory: WritableFactory[Double] =
+ simpleWritableFactory(new DoubleWritable(_))
+
+ implicit def booleanWritableFactory: WritableFactory[Boolean] =
+ simpleWritableFactory(new BooleanWritable(_))
+
+ implicit def bytesWritableFactory: WritableFactory[Array[Byte]] =
+ simpleWritableFactory(new BytesWritable(_))
+
+ implicit def stringWritableFactory: WritableFactory[String] =
+ simpleWritableFactory(new Text(_))
+
+ implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] =
+ simpleWritableFactory(w => w)
+
+}
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index c0cbd28a84..cf289fb3ae 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -107,7 +107,6 @@ private[python] class WritableToDoubleArrayConverter extends Converter[Any, Arra
* given directory (probably a temp directory)
*/
object WriteInputFormatTestDataGenerator {
- import SparkContext._
def main(args: Array[String]) {
val path = args(0)
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 5ad73c3d27..b6249b4921 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -27,8 +27,7 @@ package org.apache
* contains operations available only on RDDs of Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can
* be saved as SequenceFiles. These operations are automatically available on any RDD of the right
- * type (e.g. RDD[(Int, Int)] through implicit conversions except `saveAsSequenceFile`. You need to
- * `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
+ * type (e.g. RDD[(Int, Int)] through implicit conversions.
*
* Java programmers should reference the [[org.apache.spark.api.java]] package
* for Spark programming APIs in Java.
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 97aee58bdd..fe55a5124f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -25,11 +25,8 @@ import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
-import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.spark._
@@ -57,8 +54,7 @@ import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, Bernoulli
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
- * through implicit conversions except `saveAsSequenceFile`. You need to
- * `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
+ * through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
@@ -1527,7 +1523,7 @@ abstract class RDD[T: ClassTag](
*/
object RDD {
- // The following implicit functions were in SparkContext before 1.2 and users had to
+ // The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
@@ -1541,9 +1537,15 @@ object RDD {
new AsyncRDDActions(rdd)
}
- implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
- rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
- new SequenceFileRDDFunctions(rdd)
+ implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
+ (implicit kt: ClassTag[K], vt: ClassTag[V],
+ keyWritableFactory: WritableFactory[K],
+ valueWritableFactory: WritableFactory[V])
+ : SequenceFileRDDFunctions[K, V] = {
+ implicit val keyConverter = keyWritableFactory.convert
+ implicit val valueConverter = valueWritableFactory.convert
+ new SequenceFileRDDFunctions(rdd,
+ keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
}
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 2b48916951..059f896369 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -30,13 +30,35 @@ import org.apache.spark.Logging
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*
- * Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
*/
class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
- self: RDD[(K, V)])
+ self: RDD[(K, V)],
+ _keyWritableClass: Class[_ <: Writable],
+ _valueWritableClass: Class[_ <: Writable])
extends Logging
with Serializable {
+ @deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0")
+ def this(self: RDD[(K, V)]) {
+ this(self, null, null)
+ }
+
+ private val keyWritableClass =
+ if (_keyWritableClass == null) {
+ // pre 1.3.0, we need to use Reflection to get the Writable class
+ getWritableClass[K]()
+ } else {
+ _keyWritableClass
+ }
+
+ private val valueWritableClass =
+ if (_valueWritableClass == null) {
+ // pre 1.3.0, we need to use Reflection to get the Writable class
+ getWritableClass[V]()
+ } else {
+ _valueWritableClass
+ }
+
private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
@@ -55,6 +77,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
c.asInstanceOf[Class[_ <: Writable]]
}
+
/**
* Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
* and value types. If the key or value are Writable, then we use their classes directly;
@@ -65,26 +88,28 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
def anyToWritable[U <% Writable](u: U): Writable = u
- val keyClass = getWritableClass[K]
- val valueClass = getWritableClass[V]
- val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass)
- val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass)
+ // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
+ // valueWritableClass at the compile time. To implement that, we need to add type parameters to
+ // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
+ // breaking change.
+ val convertKey = self.keyClass != keyWritableClass
+ val convertValue = self.valueClass != valueWritableClass
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
- valueClass.getSimpleName + ")" )
+ logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
+ valueWritableClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
+ self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
- path, keyClass, valueClass, format, jobConf, codec)
+ path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
- path, keyClass, valueClass, format, jobConf, codec)
+ path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
- path, keyClass, valueClass, format, jobConf, codec)
+ path, keyWritableClass, valueWritableClass, format, jobConf, codec)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 5e24196101..7acd27c735 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD}
import org.apache.spark.util.Utils
diff --git a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
index 4918e2d92b..daa795a043 100644
--- a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
+++ b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
@@ -44,13 +44,21 @@ class ImplicitSuite {
}
def testRddToSequenceFileRDDFunctions(): Unit = {
- // TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions.
- // That will be a breaking change.
- import org.apache.spark.SparkContext.intToIntWritable
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}
+ def testRddToSequenceFileRDDFunctionsWithWritable(): Unit = {
+ val rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)]
+ = mockRDD
+ rdd.saveAsSequenceFile("/a/test/path")
+ }
+
+ def testRddToSequenceFileRDDFunctionsWithBytesArray(): Unit = {
+ val rdd: org.apache.spark.rdd.RDD[(Int, Array[Byte])] = mockRDD
+ rdd.saveAsSequenceFile("/a/test/path")
+ }
+
def testRddToOrderedRDDFunctions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.sortByKey()