aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-07-30 13:19:05 -0700
committerJosh Rosen <joshrosen@apache.org>2014-07-30 13:19:05 -0700
commit94d1f46fc43c0cb85125f757fb40db9271caf1f4 (patch)
tree8878443a963ad6ce5ba3af679567d893c8df70cc /core
parent437dc8c5b54f0dcf9564c1fb07e8dce9e771c8cd (diff)
downloadspark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.gz
spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.bz2
spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.zip
[SPARK-2024] Add saveAsSequenceFile to PySpark
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024 This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats. * Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs. * Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types. * No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples. * Added HBase and Cassandra output examples to show how custom output formats and converters can be used. cc MLnick mateiz ahirreddy pwendell Author: Kan Zhang <kzhang@apache.org> Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits: c01e3ef [Kan Zhang] [SPARK-2024] code formatting 6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10 57a7a5e [Kan Zhang] [SPARK-2024] correcting typo 75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD 0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests 9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests 0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases 7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala247
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala61
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala69
4 files changed, 365 insertions, 94 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index adaa1ef6cf..f3b05e1243 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,8 +17,9 @@
package org.apache.spark.api.python
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
* transformation code by overriding the convert method.
*/
@Experimental
-trait Converter[T, U] extends Serializable {
+trait Converter[T, + U] extends Serializable {
def convert(obj: T): U
}
private[python] object Converter extends Logging {
- def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
+ def getInstance(converterClass: Option[String],
+ defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
@@ -49,7 +51,7 @@ private[python] object Converter extends Logging {
logError(s"Failed to load converter: $cc")
throw err
}
- }.getOrElse { new DefaultConverter }
+ }.getOrElse { defaultConverter }
}
}
@@ -57,7 +59,9 @@ private[python] object Converter extends Logging {
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
-private[python] class DefaultConverter extends Converter[Any, Any] {
+private[python] class WritableToJavaConverter(
+ conf: Broadcast[SerializableWritable[Configuration]],
+ batchSize: Int) extends Converter[Any, Any] {
/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
- case byw: BytesWritable => byw.getBytes
+ case byw: BytesWritable =>
+ val bytes = new Array[Byte](byw.getLength)
+ System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
+ bytes
case n: NullWritable => null
- case aw: ArrayWritable => aw.get().map(convertWritable(_))
- case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
- (convertWritable(k), convertWritable(v))
- }.toMap)
+ case aw: ArrayWritable =>
+ // Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
+ // Since we can't determine element types for empty arrays, we will not attempt to
+ // convert to primitive arrays (which get pickled to Python arrays). Users may want
+ // write custom converters for arrays if they know the element types a priori.
+ aw.get().map(convertWritable(_))
+ case mw: MapWritable =>
+ val map = new java.util.HashMap[Any, Any]()
+ mw.foreach { case (k, v) =>
+ map.put(convertWritable(k), convertWritable(v))
+ }
+ map
+ case w: Writable =>
+ if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
case other => other
}
}
- def convert(obj: Any): Any = {
+ override def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
}
}
+/**
+ * A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
+ * types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
+ * to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
+ * [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
+ * PySpark RDD `saveAsNewAPIHadoopFile` doctest.
+ */
+private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
+
+ /**
+ * Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
+ * supported out-of-the-box.
+ */
+ private def convertToWritable(obj: Any): Writable = {
+ import collection.JavaConversions._
+ obj match {
+ case i: java.lang.Integer => new IntWritable(i)
+ case d: java.lang.Double => new DoubleWritable(d)
+ case l: java.lang.Long => new LongWritable(l)
+ case f: java.lang.Float => new FloatWritable(f)
+ case s: java.lang.String => new Text(s)
+ case b: java.lang.Boolean => new BooleanWritable(b)
+ case aob: Array[Byte] => new BytesWritable(aob)
+ case null => NullWritable.get()
+ case map: java.util.Map[_, _] =>
+ val mapWritable = new MapWritable()
+ map.foreach { case (k, v) =>
+ mapWritable.put(convertToWritable(k), convertToWritable(v))
+ }
+ mapWritable
+ case other => throw new SparkException(
+ s"Data of type ${other.getClass.getName} cannot be used")
+ }
+ }
+
+ override def convert(obj: Any): Writable = obj match {
+ case writable: Writable => writable
+ case other => convertToWritable(other)
+ }
+}
+
/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {
@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
- * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
+ * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f551a59ee3..a9d758bf99 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,15 +23,18 @@ import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
+import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
import net.razorvine.pickle.{Pickler, Unpickler}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.{InputFormat, JobConf}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
import org.apache.spark._
+import org.apache.spark.SparkContext._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
@@ -365,19 +368,17 @@ private[spark] object PythonRDD extends Logging {
valueClassMaybeNull: String,
keyConverterClass: String,
valueConverterClass: String,
- minSplits: Int) = {
+ minSplits: Int,
+ batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -394,17 +395,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
- val conf = PythonHadoopUtil.mapToConf(confAsMap)
- val baseConf = sc.hadoopConfiguration()
- val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
+ val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -421,15 +421,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
@@ -439,18 +440,14 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
- val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
- val rdd = if (path.isDefined) {
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
}
- rdd
}
/**
@@ -467,17 +464,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
- val conf = PythonHadoopUtil.mapToConf(confAsMap)
- val baseConf = sc.hadoopConfiguration()
- val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
+ val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -494,15 +490,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
@@ -512,18 +509,14 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
- val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
- val rdd = if (path.isDefined) {
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
}
- rdd
}
def writeUTF(str: String, dataOut: DataOutputStream) {
@@ -562,6 +555,152 @@ private[spark] object PythonRDD extends Logging {
}
}
+ private def getMergedConf(confAsMap: java.util.HashMap[String, String],
+ baseConf: Configuration): Configuration = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ PythonHadoopUtil.mergeConfs(baseConf, conf)
+ }
+
+ private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
+ valueConverterClass: String = null): (Class[_], Class[_]) = {
+ // Peek at an element to figure out key/value types. Since Writables are not serializable,
+ // we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
+ // and then convert locally.
+ val (key, value) = rdd.first()
+ val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ (kc.convert(key).getClass, vc.convert(value).getClass)
+ }
+
+ private def getKeyValueTypes(keyClass: String, valueClass: String):
+ Option[(Class[_], Class[_])] = {
+ for {
+ k <- Option(keyClass)
+ v <- Option(valueClass)
+ } yield (Class.forName(k), Class.forName(v))
+ }
+
+ private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
+ defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
+ val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
+ val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
+ (keyConverter, valueConverter)
+ }
+
+ /**
+ * Convert an RDD of key-value pairs from internal types to serializable types suitable for
+ * output, or vice versa.
+ */
+ private def convertRDD[K, V](rdd: RDD[(K, V)],
+ keyConverterClass: String,
+ valueConverterClass: String,
+ defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+ val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+ defaultConverter)
+ PythonHadoopUtil.convertRDD(rdd, kc, vc)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs as a Hadoop SequenceFile using the Writable types
+ * we convert from the RDD's key and value types. Note that keys and values can't be
+ * [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
+ * `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
+ */
+ def saveAsSequenceFile[K, V, C <: CompressionCodec](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ compressionCodecClass: String) = {
+ saveAsHadoopFile(
+ pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ null, null, null, null, new java.util.HashMap(), compressionCodecClass)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using old Hadoop
+ * `OutputFormat` in mapred package. Keys and values are converted to suitable output
+ * types using either user specified converters or, if not specified,
+ * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+ * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+ * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+ * this RDD.
+ */
+ def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ outputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String],
+ compressionCodecClass: String) = {
+ val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+ val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+ inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+ val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+ val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using new Hadoop
+ * `OutputFormat` in mapreduce package. Keys and values are converted to suitable output
+ * types using either user specified converters or, if not specified,
+ * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+ * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+ * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+ * this RDD.
+ */
+ def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ outputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String]) = {
+ val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+ val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+ inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+ val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using a Hadoop conf
+ * converted from the passed-in `confAsMap`. The conf should set relevant output params (
+ * e.g., output path, output format, etc), in the same way as it would be configured for
+ * a Hadoop MapReduce job. Both old and new Hadoop OutputFormat APIs are supported
+ * (mapred vs. mapreduce). Keys/values are converted for output using either user specified
+ * converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
+ */
+ def saveAsHadoopDataset[K, V](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ confAsMap: java.util.HashMap[String, String],
+ keyConverterClass: String,
+ valueConverterClass: String,
+ useNewAPI: Boolean) = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
+ keyConverterClass, valueConverterClass, new JavaToWritableConverter)
+ if (useNewAPI) {
+ converted.saveAsNewAPIHadoopDataset(conf)
+ } else {
+ converted.saveAsHadoopDataset(new JobConf(conf))
+ }
+ }
+
/**
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
* PySpark.
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 9a012e7254..efc9009c08 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -17,13 +17,14 @@
package org.apache.spark.api.python
-import scala.util.Try
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
-import scala.util.Success
+import scala.collection.JavaConversions._
import scala.util.Failure
-import net.razorvine.pickle.Pickler
+import scala.util.Try
+import net.razorvine.pickle.{Unpickler, Pickler}
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
@@ -65,20 +66,52 @@ private[python] object SerDeUtil extends Logging {
* by PySpark. By default, if serialization fails, toString is called and the string
* representation is serialized
*/
- def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = {
+ def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
val (keyFailed, valueFailed) = checkPickle(rdd.first())
rdd.mapPartitions { iter =>
val pickle = new Pickler
- iter.map { case (k, v) =>
- if (keyFailed && valueFailed) {
- pickle.dumps(Array(k.toString, v.toString))
- } else if (keyFailed) {
- pickle.dumps(Array(k.toString, v))
- } else if (!keyFailed && valueFailed) {
- pickle.dumps(Array(k, v.toString))
+ val cleaned = iter.map { case (k, v) =>
+ val key = if (keyFailed) k.toString else k
+ val value = if (valueFailed) v.toString else v
+ Array[Any](key, value)
+ }
+ if (batchSize > 1) {
+ cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+ } else {
+ cleaned.map(pickle.dumps(_))
+ }
+ }
+ }
+
+ /**
+ * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
+ */
+ def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = {
+ def isPair(obj: Any): Boolean = {
+ Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+ obj.asInstanceOf[Array[_]].length == 2
+ }
+ pyRDD.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ val unpickled =
+ if (batchSerialized) {
+ iter.flatMap { batch =>
+ unpickle.loads(batch) match {
+ case objs: java.util.List[_] => collectionAsScalaIterable(objs)
+ case other => throw new SparkException(
+ s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
+ }
+ }
} else {
- pickle.dumps(Array(k, v))
+ iter.map(unpickle.loads(_))
}
+ unpickled.map {
+ case obj if isPair(obj) =>
+ // we only accept (K, V)
+ val arr = obj.asInstanceOf[Array[_]]
+ (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
+ case other => throw new SparkException(
+ s"RDD element of type ${other.getClass.getName} cannot be used")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index f0e3fb9aff..d11db978b8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -17,15 +17,16 @@
package org.apache.spark.api.python
-import org.apache.spark.SparkContext
-import org.apache.hadoop.io._
-import scala.Array
import java.io.{DataOutput, DataInput}
+import java.nio.charset.Charset
+
+import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.{SparkContext, SparkException}
/**
- * A class to test MsgPack serialization on the Scala side, that will be deserialized
+ * A class to test Pyrolite serialization on the Scala side, that will be deserialized
* in Python
* @param str
* @param int
@@ -54,7 +55,13 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
}
}
-class TestConverter extends Converter[Any, Any] {
+private[python] class TestInputKeyConverter extends Converter[Any, Any] {
+ override def convert(obj: Any) = {
+ obj.asInstanceOf[IntWritable].get().toChar
+ }
+}
+
+private[python] class TestInputValueConverter extends Converter[Any, Any] {
import collection.JavaConversions._
override def convert(obj: Any) = {
val m = obj.asInstanceOf[MapWritable]
@@ -62,6 +69,38 @@ class TestConverter extends Converter[Any, Any] {
}
}
+private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
+ override def convert(obj: Any) = {
+ new Text(obj.asInstanceOf[Int].toString)
+ }
+}
+
+private[python] class TestOutputValueConverter extends Converter[Any, Any] {
+ import collection.JavaConversions._
+ override def convert(obj: Any) = {
+ new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head)
+ }
+}
+
+private[python] class DoubleArrayWritable extends ArrayWritable(classOf[DoubleWritable])
+
+private[python] class DoubleArrayToWritableConverter extends Converter[Any, Writable] {
+ override def convert(obj: Any) = obj match {
+ case arr if arr.getClass.isArray && arr.getClass.getComponentType == classOf[Double] =>
+ val daw = new DoubleArrayWritable
+ daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_)))
+ daw
+ case other => throw new SparkException(s"Data of type $other is not supported")
+ }
+}
+
+private[python] class WritableToDoubleArrayConverter extends Converter[Any, Array[Double]] {
+ override def convert(obj: Any): Array[Double] = obj match {
+ case daw : DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get())
+ case other => throw new SparkException(s"Data of type $other is not supported")
+ }
+}
+
/**
* This object contains method to generate SequenceFile test data and write it to a
* given directory (probably a temp directory)
@@ -97,7 +136,8 @@ object WriteInputFormatTestDataGenerator {
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
- sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
+ sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
+ ).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) =>
@@ -106,19 +146,20 @@ object WriteInputFormatTestDataGenerator {
// Create test data for ArrayWritable
val data = Seq(
- (1, Array(1.0, 2.0, 3.0)),
+ (1, Array()),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
- (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
- }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
+ val va = new DoubleArrayWritable
+ va.set(v.map(new DoubleWritable(_)))
+ (new IntWritable(k), va)
+ }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, DoubleArrayWritable]](arrPath)
// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
- (1, Map(2.0 -> "aa")),
- (2, Map(3.0 -> "bb")),
+ (1, Map()),
(2, Map(1.0 -> "cc")),
(3, Map(2.0 -> "dd")),
(2, Map(1.0 -> "aa")),
@@ -126,9 +167,9 @@ object WriteInputFormatTestDataGenerator {
)
sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
val mw = new MapWritable()
- val k = m.keys.head
- val v = m.values.head
- mw.put(new DoubleWritable(k), new Text(v))
+ m.foreach { case (k, v) =>
+ mw.put(new DoubleWritable(k), new Text(v))
+ }
(new IntWritable(i), mw)
}.saveAsSequenceFile(mapPath)