aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala247
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala61
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala69
4 files changed, 365 insertions, 94 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index adaa1ef6cf..f3b05e1243 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,8 +17,9 @@
package org.apache.spark.api.python
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
* transformation code by overriding the convert method.
*/
@Experimental
-trait Converter[T, U] extends Serializable {
+trait Converter[T, + U] extends Serializable {
def convert(obj: T): U
}
private[python] object Converter extends Logging {
- def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
+ def getInstance(converterClass: Option[String],
+ defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
@@ -49,7 +51,7 @@ private[python] object Converter extends Logging {
logError(s"Failed to load converter: $cc")
throw err
}
- }.getOrElse { new DefaultConverter }
+ }.getOrElse { defaultConverter }
}
}
@@ -57,7 +59,9 @@ private[python] object Converter extends Logging {
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
-private[python] class DefaultConverter extends Converter[Any, Any] {
+private[python] class WritableToJavaConverter(
+ conf: Broadcast[SerializableWritable[Configuration]],
+ batchSize: Int) extends Converter[Any, Any] {
/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
- case byw: BytesWritable => byw.getBytes
+ case byw: BytesWritable =>
+ val bytes = new Array[Byte](byw.getLength)
+ System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
+ bytes
case n: NullWritable => null
- case aw: ArrayWritable => aw.get().map(convertWritable(_))
- case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
- (convertWritable(k), convertWritable(v))
- }.toMap)
+ case aw: ArrayWritable =>
+ // Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
+ // Since we can't determine element types for empty arrays, we will not attempt to
+ // convert to primitive arrays (which get pickled to Python arrays). Users may want
+ // write custom converters for arrays if they know the element types a priori.
+ aw.get().map(convertWritable(_))
+ case mw: MapWritable =>
+ val map = new java.util.HashMap[Any, Any]()
+ mw.foreach { case (k, v) =>
+ map.put(convertWritable(k), convertWritable(v))
+ }
+ map
+ case w: Writable =>
+ if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
case other => other
}
}
- def convert(obj: Any): Any = {
+ override def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
}
}
+/**
+ * A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
+ * types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
+ * to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
+ * [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
+ * PySpark RDD `saveAsNewAPIHadoopFile` doctest.
+ */
+private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
+
+ /**
+ * Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
+ * supported out-of-the-box.
+ */
+ private def convertToWritable(obj: Any): Writable = {
+ import collection.JavaConversions._
+ obj match {
+ case i: java.lang.Integer => new IntWritable(i)
+ case d: java.lang.Double => new DoubleWritable(d)
+ case l: java.lang.Long => new LongWritable(l)
+ case f: java.lang.Float => new FloatWritable(f)
+ case s: java.lang.String => new Text(s)
+ case b: java.lang.Boolean => new BooleanWritable(b)
+ case aob: Array[Byte] => new BytesWritable(aob)
+ case null => NullWritable.get()
+ case map: java.util.Map[_, _] =>
+ val mapWritable = new MapWritable()
+ map.foreach { case (k, v) =>
+ mapWritable.put(convertToWritable(k), convertToWritable(v))
+ }
+ mapWritable
+ case other => throw new SparkException(
+ s"Data of type ${other.getClass.getName} cannot be used")
+ }
+ }
+
+ override def convert(obj: Any): Writable = obj match {
+ case writable: Writable => writable
+ case other => convertToWritable(other)
+ }
+}
+
/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {
@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
- * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
+ * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f551a59ee3..a9d758bf99 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,15 +23,18 @@ import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
+import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
import net.razorvine.pickle.{Pickler, Unpickler}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.{InputFormat, JobConf}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
import org.apache.spark._
+import org.apache.spark.SparkContext._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
@@ -365,19 +368,17 @@ private[spark] object PythonRDD extends Logging {
valueClassMaybeNull: String,
keyConverterClass: String,
valueConverterClass: String,
- minSplits: Int) = {
+ minSplits: Int,
+ batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -394,17 +395,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
- val conf = PythonHadoopUtil.mapToConf(confAsMap)
- val baseConf = sc.hadoopConfiguration()
- val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
+ val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -421,15 +421,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
@@ -439,18 +440,14 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
- val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
- val rdd = if (path.isDefined) {
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
}
- rdd
}
/**
@@ -467,17 +464,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
- val conf = PythonHadoopUtil.mapToConf(confAsMap)
- val baseConf = sc.hadoopConfiguration()
- val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
+ val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -494,15 +490,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
@@ -512,18 +509,14 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
- val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
- val rdd = if (path.isDefined) {
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
}
- rdd
}
def writeUTF(str: String, dataOut: DataOutputStream) {
@@ -562,6 +555,152 @@ private[spark] object PythonRDD extends Logging {
}
}
+ private def getMergedConf(confAsMap: java.util.HashMap[String, String],
+ baseConf: Configuration): Configuration = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ PythonHadoopUtil.mergeConfs(baseConf, conf)
+ }
+
+ private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
+ valueConverterClass: String = null): (Class[_], Class[_]) = {
+ // Peek at an element to figure out key/value types. Since Writables are not serializable,
+ // we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
+ // and then convert locally.
+ val (key, value) = rdd.first()
+ val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ (kc.convert(key).getClass, vc.convert(value).getClass)
+ }
+
+ private def getKeyValueTypes(keyClass: String, valueClass: String):
+ Option[(Class[_], Class[_])] = {
+ for {
+ k <- Option(keyClass)
+ v <- Option(valueClass)
+ } yield (Class.forName(k), Class.forName(v))
+ }
+
+ private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
+ defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
+ val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
+ val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
+ (keyConverter, valueConverter)
+ }
+
+ /**
+ * Convert an RDD of key-value pairs from internal types to serializable types suitable for
+ * output, or vice versa.
+ */
+ private def convertRDD[K, V](rdd: RDD[(K, V)],
+ keyConverterClass: String,
+ valueConverterClass: String,
+ defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+ val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+ defaultConverter)
+ PythonHadoopUtil.convertRDD(rdd, kc, vc)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs as a Hadoop SequenceFile using the Writable types
+ * we convert from the RDD's key and value types. Note that keys and values can't be
+ * [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
+ * `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
+ */
+ def saveAsSequenceFile[K, V, C <: CompressionCodec](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ compressionCodecClass: String) = {
+ saveAsHadoopFile(
+ pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ null, null, null, null, new java.util.HashMap(), compressionCodecClass)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using old Hadoop
+ * `OutputFormat` in mapred package. Keys and values are converted to suitable output
+ * types using either user specified converters or, if not specified,
+ * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+ * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+ * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+ * this RDD.
+ */
+ def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ outputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String],
+ compressionCodecClass: String) = {
+ val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+ val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+ inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+ val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+ val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using new Hadoop
+ * `OutputFormat` in mapreduce package. Keys and values are converted to suitable output
+ * types using either user specified converters or, if not specified,
+ * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+ * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+ * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+ * this RDD.
+ */
+ def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ outputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String]) = {
+ val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+ val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+ inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+ val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using a Hadoop conf
+ * converted from the passed-in `confAsMap`. The conf should set relevant output params (
+ * e.g., output path, output format, etc), in the same way as it would be configured for
+ * a Hadoop MapReduce job. Both old and new Hadoop OutputFormat APIs are supported
+ * (mapred vs. mapreduce). Keys/values are converted for output using either user specified
+ * converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
+ */
+ def saveAsHadoopDataset[K, V](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ confAsMap: java.util.HashMap[String, String],
+ keyConverterClass: String,
+ valueConverterClass: String,
+ useNewAPI: Boolean) = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
+ keyConverterClass, valueConverterClass, new JavaToWritableConverter)
+ if (useNewAPI) {
+ converted.saveAsNewAPIHadoopDataset(conf)
+ } else {
+ converted.saveAsHadoopDataset(new JobConf(conf))
+ }
+ }
+
/**
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
* PySpark.
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 9a012e7254..efc9009c08 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -17,13 +17,14 @@
package org.apache.spark.api.python
-import scala.util.Try
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
-import scala.util.Success
+import scala.collection.JavaConversions._
import scala.util.Failure
-import net.razorvine.pickle.Pickler
+import scala.util.Try
+import net.razorvine.pickle.{Unpickler, Pickler}
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
@@ -65,20 +66,52 @@ private[python] object SerDeUtil extends Logging {
* by PySpark. By default, if serialization fails, toString is called and the string
* representation is serialized
*/
- def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = {
+ def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
val (keyFailed, valueFailed) = checkPickle(rdd.first())
rdd.mapPartitions { iter =>
val pickle = new Pickler
- iter.map { case (k, v) =>
- if (keyFailed && valueFailed) {
- pickle.dumps(Array(k.toString, v.toString))
- } else if (keyFailed) {
- pickle.dumps(Array(k.toString, v))
- } else if (!keyFailed && valueFailed) {
- pickle.dumps(Array(k, v.toString))
+ val cleaned = iter.map { case (k, v) =>
+ val key = if (keyFailed) k.toString else k
+ val value = if (valueFailed) v.toString else v
+ Array[Any](key, value)
+ }
+ if (batchSize > 1) {
+ cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+ } else {
+ cleaned.map(pickle.dumps(_))
+ }
+ }
+ }
+
+ /**
+ * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
+ */
+ def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = {
+ def isPair(obj: Any): Boolean = {
+ Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+ obj.asInstanceOf[Array[_]].length == 2
+ }
+ pyRDD.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ val unpickled =
+ if (batchSerialized) {
+ iter.flatMap { batch =>
+ unpickle.loads(batch) match {
+ case objs: java.util.List[_] => collectionAsScalaIterable(objs)
+ case other => throw new SparkException(
+ s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
+ }
+ }
} else {
- pickle.dumps(Array(k, v))
+ iter.map(unpickle.loads(_))
}
+ unpickled.map {
+ case obj if isPair(obj) =>
+ // we only accept (K, V)
+ val arr = obj.asInstanceOf[Array[_]]
+ (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
+ case other => throw new SparkException(
+ s"RDD element of type ${other.getClass.getName} cannot be used")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index f0e3fb9aff..d11db978b8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -17,15 +17,16 @@
package org.apache.spark.api.python
-import org.apache.spark.SparkContext
-import org.apache.hadoop.io._
-import scala.Array
import java.io.{DataOutput, DataInput}
+import java.nio.charset.Charset
+
+import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.{SparkContext, SparkException}
/**
- * A class to test MsgPack serialization on the Scala side, that will be deserialized
+ * A class to test Pyrolite serialization on the Scala side, that will be deserialized
* in Python
* @param str
* @param int
@@ -54,7 +55,13 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
}
}
-class TestConverter extends Converter[Any, Any] {
+private[python] class TestInputKeyConverter extends Converter[Any, Any] {
+ override def convert(obj: Any) = {
+ obj.asInstanceOf[IntWritable].get().toChar
+ }
+}
+
+private[python] class TestInputValueConverter extends Converter[Any, Any] {
import collection.JavaConversions._
override def convert(obj: Any) = {
val m = obj.asInstanceOf[MapWritable]
@@ -62,6 +69,38 @@ class TestConverter extends Converter[Any, Any] {
}
}
+private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
+ override def convert(obj: Any) = {
+ new Text(obj.asInstanceOf[Int].toString)
+ }
+}
+
+private[python] class TestOutputValueConverter extends Converter[Any, Any] {
+ import collection.JavaConversions._
+ override def convert(obj: Any) = {
+ new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head)
+ }
+}
+
+private[python] class DoubleArrayWritable extends ArrayWritable(classOf[DoubleWritable])
+
+private[python] class DoubleArrayToWritableConverter extends Converter[Any, Writable] {
+ override def convert(obj: Any) = obj match {
+ case arr if arr.getClass.isArray && arr.getClass.getComponentType == classOf[Double] =>
+ val daw = new DoubleArrayWritable
+ daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_)))
+ daw
+ case other => throw new SparkException(s"Data of type $other is not supported")
+ }
+}
+
+private[python] class WritableToDoubleArrayConverter extends Converter[Any, Array[Double]] {
+ override def convert(obj: Any): Array[Double] = obj match {
+ case daw : DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get())
+ case other => throw new SparkException(s"Data of type $other is not supported")
+ }
+}
+
/**
* This object contains method to generate SequenceFile test data and write it to a
* given directory (probably a temp directory)
@@ -97,7 +136,8 @@ object WriteInputFormatTestDataGenerator {
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
- sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
+ sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
+ ).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) =>
@@ -106,19 +146,20 @@ object WriteInputFormatTestDataGenerator {
// Create test data for ArrayWritable
val data = Seq(
- (1, Array(1.0, 2.0, 3.0)),
+ (1, Array()),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
- (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
- }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
+ val va = new DoubleArrayWritable
+ va.set(v.map(new DoubleWritable(_)))
+ (new IntWritable(k), va)
+ }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, DoubleArrayWritable]](arrPath)
// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
- (1, Map(2.0 -> "aa")),
- (2, Map(3.0 -> "bb")),
+ (1, Map()),
(2, Map(1.0 -> "cc")),
(3, Map(2.0 -> "dd")),
(2, Map(1.0 -> "aa")),
@@ -126,9 +167,9 @@ object WriteInputFormatTestDataGenerator {
)
sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
val mw = new MapWritable()
- val k = m.keys.head
- val v = m.values.head
- mw.put(new DoubleWritable(k), new Text(v))
+ m.foreach { case (k, v) =>
+ mw.put(new DoubleWritable(k), new Text(v))
+ }
(new IntWritable(i), mw)
}.saveAsSequenceFile(mapPath)