aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala247
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala61
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala69
-rw-r--r--docs/programming-guide.md52
-rw-r--r--examples/src/main/python/cassandra_outputformat.py83
-rw-r--r--examples/src/main/python/hbase_inputformat.py3
-rw-r--r--examples/src/main/python/hbase_outputformat.py65
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala24
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala33
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala70
-rw-r--r--python/pyspark/context.py51
-rw-r--r--python/pyspark/rdd.py114
-rw-r--r--python/pyspark/tests.py317
14 files changed, 1085 insertions, 186 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index adaa1ef6cf..f3b05e1243 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,8 +17,9 @@
package org.apache.spark.api.python
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
* transformation code by overriding the convert method.
*/
@Experimental
-trait Converter[T, U] extends Serializable {
+trait Converter[T, + U] extends Serializable {
def convert(obj: T): U
}
private[python] object Converter extends Logging {
- def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
+ def getInstance(converterClass: Option[String],
+ defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
@@ -49,7 +51,7 @@ private[python] object Converter extends Logging {
logError(s"Failed to load converter: $cc")
throw err
}
- }.getOrElse { new DefaultConverter }
+ }.getOrElse { defaultConverter }
}
}
@@ -57,7 +59,9 @@ private[python] object Converter extends Logging {
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
-private[python] class DefaultConverter extends Converter[Any, Any] {
+private[python] class WritableToJavaConverter(
+ conf: Broadcast[SerializableWritable[Configuration]],
+ batchSize: Int) extends Converter[Any, Any] {
/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
- case byw: BytesWritable => byw.getBytes
+ case byw: BytesWritable =>
+ val bytes = new Array[Byte](byw.getLength)
+ System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
+ bytes
case n: NullWritable => null
- case aw: ArrayWritable => aw.get().map(convertWritable(_))
- case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
- (convertWritable(k), convertWritable(v))
- }.toMap)
+ case aw: ArrayWritable =>
+ // Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
+ // Since we can't determine element types for empty arrays, we will not attempt to
+ // convert to primitive arrays (which get pickled to Python arrays). Users may want
+ // write custom converters for arrays if they know the element types a priori.
+ aw.get().map(convertWritable(_))
+ case mw: MapWritable =>
+ val map = new java.util.HashMap[Any, Any]()
+ mw.foreach { case (k, v) =>
+ map.put(convertWritable(k), convertWritable(v))
+ }
+ map
+ case w: Writable =>
+ if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
case other => other
}
}
- def convert(obj: Any): Any = {
+ override def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
}
}
+/**
+ * A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
+ * types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
+ * to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
+ * [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
+ * PySpark RDD `saveAsNewAPIHadoopFile` doctest.
+ */
+private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
+
+ /**
+ * Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
+ * supported out-of-the-box.
+ */
+ private def convertToWritable(obj: Any): Writable = {
+ import collection.JavaConversions._
+ obj match {
+ case i: java.lang.Integer => new IntWritable(i)
+ case d: java.lang.Double => new DoubleWritable(d)
+ case l: java.lang.Long => new LongWritable(l)
+ case f: java.lang.Float => new FloatWritable(f)
+ case s: java.lang.String => new Text(s)
+ case b: java.lang.Boolean => new BooleanWritable(b)
+ case aob: Array[Byte] => new BytesWritable(aob)
+ case null => NullWritable.get()
+ case map: java.util.Map[_, _] =>
+ val mapWritable = new MapWritable()
+ map.foreach { case (k, v) =>
+ mapWritable.put(convertToWritable(k), convertToWritable(v))
+ }
+ mapWritable
+ case other => throw new SparkException(
+ s"Data of type ${other.getClass.getName} cannot be used")
+ }
+ }
+
+ override def convert(obj: Any): Writable = obj match {
+ case writable: Writable => writable
+ case other => convertToWritable(other)
+ }
+}
+
/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {
@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
- * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
+ * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f551a59ee3..a9d758bf99 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,15 +23,18 @@ import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
+import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
import net.razorvine.pickle.{Pickler, Unpickler}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.{InputFormat, JobConf}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
import org.apache.spark._
+import org.apache.spark.SparkContext._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
@@ -365,19 +368,17 @@ private[spark] object PythonRDD extends Logging {
valueClassMaybeNull: String,
keyConverterClass: String,
valueConverterClass: String,
- minSplits: Int) = {
+ minSplits: Int,
+ batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -394,17 +395,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
- val conf = PythonHadoopUtil.mapToConf(confAsMap)
- val baseConf = sc.hadoopConfiguration()
- val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
+ val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -421,15 +421,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
@@ -439,18 +440,14 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
- val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
- val rdd = if (path.isDefined) {
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
}
- rdd
}
/**
@@ -467,17 +464,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
- val conf = PythonHadoopUtil.mapToConf(confAsMap)
- val baseConf = sc.hadoopConfiguration()
- val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
+ val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
/**
@@ -494,15 +490,16 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
+ confAsMap: java.util.HashMap[String, String],
+ batchSize: Int) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val keyConverter = Converter.getInstance(Option(keyConverterClass))
- val valueConverter = Converter.getInstance(Option(valueConverterClass))
- val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
- JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new WritableToJavaConverter(confBroadcasted, batchSize))
+ JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
@@ -512,18 +509,14 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
- implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
- implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
- val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
- val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
- val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
- val rdd = if (path.isDefined) {
+ val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+ val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
}
- rdd
}
def writeUTF(str: String, dataOut: DataOutputStream) {
@@ -562,6 +555,152 @@ private[spark] object PythonRDD extends Logging {
}
}
+ private def getMergedConf(confAsMap: java.util.HashMap[String, String],
+ baseConf: Configuration): Configuration = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ PythonHadoopUtil.mergeConfs(baseConf, conf)
+ }
+
+ private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
+ valueConverterClass: String = null): (Class[_], Class[_]) = {
+ // Peek at an element to figure out key/value types. Since Writables are not serializable,
+ // we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
+ // and then convert locally.
+ val (key, value) = rdd.first()
+ val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ (kc.convert(key).getClass, vc.convert(value).getClass)
+ }
+
+ private def getKeyValueTypes(keyClass: String, valueClass: String):
+ Option[(Class[_], Class[_])] = {
+ for {
+ k <- Option(keyClass)
+ v <- Option(valueClass)
+ } yield (Class.forName(k), Class.forName(v))
+ }
+
+ private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
+ defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
+ val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
+ val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
+ (keyConverter, valueConverter)
+ }
+
+ /**
+ * Convert an RDD of key-value pairs from internal types to serializable types suitable for
+ * output, or vice versa.
+ */
+ private def convertRDD[K, V](rdd: RDD[(K, V)],
+ keyConverterClass: String,
+ valueConverterClass: String,
+ defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+ val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+ defaultConverter)
+ PythonHadoopUtil.convertRDD(rdd, kc, vc)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs as a Hadoop SequenceFile using the Writable types
+ * we convert from the RDD's key and value types. Note that keys and values can't be
+ * [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
+ * `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
+ */
+ def saveAsSequenceFile[K, V, C <: CompressionCodec](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ compressionCodecClass: String) = {
+ saveAsHadoopFile(
+ pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ null, null, null, null, new java.util.HashMap(), compressionCodecClass)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using old Hadoop
+ * `OutputFormat` in mapred package. Keys and values are converted to suitable output
+ * types using either user specified converters or, if not specified,
+ * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+ * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+ * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+ * this RDD.
+ */
+ def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ outputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String],
+ compressionCodecClass: String) = {
+ val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+ val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+ inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+ val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+ val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using new Hadoop
+ * `OutputFormat` in mapreduce package. Keys and values are converted to suitable output
+ * types using either user specified converters or, if not specified,
+ * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+ * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+ * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+ * this RDD.
+ */
+ def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ path: String,
+ outputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String]) = {
+ val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+ val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+ inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+ val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+ val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+ new JavaToWritableConverter)
+ val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
+ }
+
+ /**
+ * Output a Python RDD of key-value pairs to any Hadoop file system, using a Hadoop conf
+ * converted from the passed-in `confAsMap`. The conf should set relevant output params (
+ * e.g., output path, output format, etc), in the same way as it would be configured for
+ * a Hadoop MapReduce job. Both old and new Hadoop OutputFormat APIs are supported
+ * (mapred vs. mapreduce). Keys/values are converted for output using either user specified
+ * converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
+ */
+ def saveAsHadoopDataset[K, V](
+ pyRDD: JavaRDD[Array[Byte]],
+ batchSerialized: Boolean,
+ confAsMap: java.util.HashMap[String, String],
+ keyConverterClass: String,
+ valueConverterClass: String,
+ useNewAPI: Boolean) = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
+ keyConverterClass, valueConverterClass, new JavaToWritableConverter)
+ if (useNewAPI) {
+ converted.saveAsNewAPIHadoopDataset(conf)
+ } else {
+ converted.saveAsHadoopDataset(new JobConf(conf))
+ }
+ }
+
/**
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
* PySpark.
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 9a012e7254..efc9009c08 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -17,13 +17,14 @@
package org.apache.spark.api.python
-import scala.util.Try
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
-import scala.util.Success
+import scala.collection.JavaConversions._
import scala.util.Failure
-import net.razorvine.pickle.Pickler
+import scala.util.Try
+import net.razorvine.pickle.{Unpickler, Pickler}
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
@@ -65,20 +66,52 @@ private[python] object SerDeUtil extends Logging {
* by PySpark. By default, if serialization fails, toString is called and the string
* representation is serialized
*/
- def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = {
+ def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
val (keyFailed, valueFailed) = checkPickle(rdd.first())
rdd.mapPartitions { iter =>
val pickle = new Pickler
- iter.map { case (k, v) =>
- if (keyFailed && valueFailed) {
- pickle.dumps(Array(k.toString, v.toString))
- } else if (keyFailed) {
- pickle.dumps(Array(k.toString, v))
- } else if (!keyFailed && valueFailed) {
- pickle.dumps(Array(k, v.toString))
+ val cleaned = iter.map { case (k, v) =>
+ val key = if (keyFailed) k.toString else k
+ val value = if (valueFailed) v.toString else v
+ Array[Any](key, value)
+ }
+ if (batchSize > 1) {
+ cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+ } else {
+ cleaned.map(pickle.dumps(_))
+ }
+ }
+ }
+
+ /**
+ * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
+ */
+ def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = {
+ def isPair(obj: Any): Boolean = {
+ Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+ obj.asInstanceOf[Array[_]].length == 2
+ }
+ pyRDD.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ val unpickled =
+ if (batchSerialized) {
+ iter.flatMap { batch =>
+ unpickle.loads(batch) match {
+ case objs: java.util.List[_] => collectionAsScalaIterable(objs)
+ case other => throw new SparkException(
+ s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
+ }
+ }
} else {
- pickle.dumps(Array(k, v))
+ iter.map(unpickle.loads(_))
}
+ unpickled.map {
+ case obj if isPair(obj) =>
+ // we only accept (K, V)
+ val arr = obj.asInstanceOf[Array[_]]
+ (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
+ case other => throw new SparkException(
+ s"RDD element of type ${other.getClass.getName} cannot be used")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index f0e3fb9aff..d11db978b8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -17,15 +17,16 @@
package org.apache.spark.api.python
-import org.apache.spark.SparkContext
-import org.apache.hadoop.io._
-import scala.Array
import java.io.{DataOutput, DataInput}
+import java.nio.charset.Charset
+
+import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.{SparkContext, SparkException}
/**
- * A class to test MsgPack serialization on the Scala side, that will be deserialized
+ * A class to test Pyrolite serialization on the Scala side, that will be deserialized
* in Python
* @param str
* @param int
@@ -54,7 +55,13 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
}
}
-class TestConverter extends Converter[Any, Any] {
+private[python] class TestInputKeyConverter extends Converter[Any, Any] {
+ override def convert(obj: Any) = {
+ obj.asInstanceOf[IntWritable].get().toChar
+ }
+}
+
+private[python] class TestInputValueConverter extends Converter[Any, Any] {
import collection.JavaConversions._
override def convert(obj: Any) = {
val m = obj.asInstanceOf[MapWritable]
@@ -62,6 +69,38 @@ class TestConverter extends Converter[Any, Any] {
}
}
+private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
+ override def convert(obj: Any) = {
+ new Text(obj.asInstanceOf[Int].toString)
+ }
+}
+
+private[python] class TestOutputValueConverter extends Converter[Any, Any] {
+ import collection.JavaConversions._
+ override def convert(obj: Any) = {
+ new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head)
+ }
+}
+
+private[python] class DoubleArrayWritable extends ArrayWritable(classOf[DoubleWritable])
+
+private[python] class DoubleArrayToWritableConverter extends Converter[Any, Writable] {
+ override def convert(obj: Any) = obj match {
+ case arr if arr.getClass.isArray && arr.getClass.getComponentType == classOf[Double] =>
+ val daw = new DoubleArrayWritable
+ daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_)))
+ daw
+ case other => throw new SparkException(s"Data of type $other is not supported")
+ }
+}
+
+private[python] class WritableToDoubleArrayConverter extends Converter[Any, Array[Double]] {
+ override def convert(obj: Any): Array[Double] = obj match {
+ case daw : DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get())
+ case other => throw new SparkException(s"Data of type $other is not supported")
+ }
+}
+
/**
* This object contains method to generate SequenceFile test data and write it to a
* given directory (probably a temp directory)
@@ -97,7 +136,8 @@ object WriteInputFormatTestDataGenerator {
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
- sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
+ sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
+ ).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) =>
@@ -106,19 +146,20 @@ object WriteInputFormatTestDataGenerator {
// Create test data for ArrayWritable
val data = Seq(
- (1, Array(1.0, 2.0, 3.0)),
+ (1, Array()),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
- (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
- }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
+ val va = new DoubleArrayWritable
+ va.set(v.map(new DoubleWritable(_)))
+ (new IntWritable(k), va)
+ }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, DoubleArrayWritable]](arrPath)
// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
- (1, Map(2.0 -> "aa")),
- (2, Map(3.0 -> "bb")),
+ (1, Map()),
(2, Map(1.0 -> "cc")),
(3, Map(2.0 -> "dd")),
(2, Map(1.0 -> "aa")),
@@ -126,9 +167,9 @@ object WriteInputFormatTestDataGenerator {
)
sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
val mw = new MapWritable()
- val k = m.keys.head
- val v = m.values.head
- mw.put(new DoubleWritable(k), new Text(v))
+ m.foreach { case (k, v) =>
+ mw.put(new DoubleWritable(k), new Text(v))
+ }
(new IntWritable(i), mw)
}.saveAsSequenceFile(mapPath)
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 90c6971301..a88bf27add 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -383,16 +383,16 @@ Apart from text files, Spark's Python API also supports several other data forma
* `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
-* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are given below.
-
-### SequenceFile and Hadoop InputFormats
+* SequenceFile and Hadoop Input/Output Formats
**Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach.
-#### Writable Support
+**Writable Support**
-PySpark SequenceFile support loads an RDD within Java, and pickles the resulting Java objects using
-[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables are automatically converted:
+PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the
+resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile,
+PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following
+Writables are automatically converted:
<table class="table">
<tr><th>Writable Type</th><th>Python Type</th></tr>
@@ -403,32 +403,30 @@ PySpark SequenceFile support loads an RDD within Java, and pickles the resulting
<tr><td>BooleanWritable</td><td>bool</td></tr>
<tr><td>BytesWritable</td><td>bytearray</td></tr>
<tr><td>NullWritable</td><td>None</td></tr>
-<tr><td>ArrayWritable</td><td>list of primitives, or tuple of objects</td></tr>
<tr><td>MapWritable</td><td>dict</td></tr>
-<tr><td>Custom Class conforming to Java Bean conventions</td>
- <td>dict of public properties (via JavaBean getters and setters) + __class__ for the class type</td></tr>
</table>
-#### Loading SequenceFiles
+Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing,
+users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default
+converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get
+Python `array.array` for arrays of primitive types, users need to specify custom converters.
+
+**Saving and Loading SequenceFiles**
-Similarly to text files, SequenceFiles can be loaded by specifying the path. The key and value
+Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value
classes can be specified, but for standard Writables this is not required.
{% highlight python %}
->>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles")
->>> rdd.collect() # this example has DoubleWritable keys and Text values
-[(1.0, u'aa'),
- (2.0, u'bb'),
- (2.0, u'aa'),
- (3.0, u'cc'),
- (2.0, u'bb'),
- (1.0, u'aa')]
+>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+>>> rdd.saveAsSequenceFile("path/to/file")
+>>> sorted(sc.sequenceFile("path/to/file").collect())
+[(1, u'a'), (2, u'aa'), (3, u'aaa')]
{% endhighlight %}
-#### Loading Other Hadoop InputFormats
+**Saving and Loading Other Hadoop Input/Output Formats**
-PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' Hadoop APIs. If required,
-a Hadoop configuration can be passed in as a Python dict. Here is an example using the
+PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs.
+If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the
Elasticsearch ESInputFormat:
{% highlight python %}
@@ -447,8 +445,7 @@ Note that, if the InputFormat simply depends on a Hadoop configuration and/or in
the key and value classes can easily be converted according to the above table,
then this approach should work well for such cases.
-If you have custom serialized binary data (such as loading data from Cassandra / HBase) or custom
-classes that don't conform to the JavaBean requirements, then you will first need to
+If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to
transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler.
A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided
for this. Simply extend this trait and implement your transformation code in the ```convert```
@@ -456,11 +453,8 @@ method. Remember to ensure that this class, along with any dependencies required
classpath.
See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and
-the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/pythonconverters)
-for examples of using HBase and Cassandra ```InputFormat```.
-
-Future support for writing data out as ```SequenceFileOutputFormat``` and other ```OutputFormats```,
-is forthcoming.
+the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters)
+for examples of using Cassandra / HBase ```InputFormat``` and ```OutputFormat``` with custom converters.
</div>
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
new file mode 100644
index 0000000000..1dfbf98604
--- /dev/null
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create data in Cassandra fist
+(following: https://wiki.apache.org/cassandra/GettingStarted)
+
+cqlsh> CREATE KEYSPACE test
+ ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
+cqlsh> use test;
+cqlsh:test> CREATE TABLE users (
+ ... user_id int PRIMARY KEY,
+ ... fname text,
+ ... lname text
+ ... );
+
+> cassandra_outputformat <host> test users 1745 john smith
+> cassandra_outputformat <host> test users 1744 john doe
+> cassandra_outputformat <host> test users 1746 john smith
+
+cqlsh:test> SELECT * FROM users;
+
+ user_id | fname | lname
+---------+-------+-------
+ 1745 | john | smith
+ 1744 | john | doe
+ 1746 | john | smith
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 7:
+ print >> sys.stderr, """
+ Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py <args>
+ Assumes you have created the following table <cf> in Cassandra already,
+ running on <host>, in <keyspace>.
+
+ cqlsh:<keyspace>> CREATE TABLE <cf> (
+ ... user_id int PRIMARY KEY,
+ ... fname text,
+ ... lname text
+ ... );
+ """
+ exit(-1)
+
+ host = sys.argv[1]
+ keyspace = sys.argv[2]
+ cf = sys.argv[3]
+ sc = SparkContext(appName="CassandraOutputFormat")
+
+ conf = {"cassandra.output.thrift.address":host,
+ "cassandra.output.thrift.port":"9160",
+ "cassandra.output.keyspace":keyspace,
+ "cassandra.output.partitioner.class":"Murmur3Partitioner",
+ "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
+ "mapreduce.output.basename":cf,
+ "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
+ "mapreduce.job.output.key.class":"java.util.Map",
+ "mapreduce.job.output.value.class":"java.util.List"}
+ key = {"user_id" : int(sys.argv[4])}
+ sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
+ conf=conf,
+ keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
+ valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index 3289d9880a..c9fa8e171c 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -65,7 +65,8 @@ if __name__ == "__main__":
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
- valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter",
+ keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
+ valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
conf=conf)
output = hbase_rdd.collect()
for (k, v) in output:
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
new file mode 100644
index 0000000000..5e11548fd1
--- /dev/null
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create test table in HBase first:
+
+hbase(main):001:0> create 'test', 'f1'
+0 row(s) in 0.7840 seconds
+
+> hbase_outputformat <host> test row1 f1 q1 value1
+> hbase_outputformat <host> test row2 f1 q1 value2
+> hbase_outputformat <host> test row3 f1 q1 value3
+> hbase_outputformat <host> test row4 f1 q1 value4
+
+hbase(main):002:0> scan 'test'
+ROW COLUMN+CELL
+ row1 column=f1:q1, timestamp=1405659615726, value=value1
+ row2 column=f1:q1, timestamp=1405659626803, value=value2
+ row3 column=f1:q1, timestamp=1405659640106, value=value3
+ row4 column=f1:q1, timestamp=1405659650292, value=value4
+4 row(s) in 0.0780 seconds
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 7:
+ print >> sys.stderr, """
+ Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py <args>
+ Assumes you have created <table> with column family <family> in HBase running on <host> already
+ """
+ exit(-1)
+
+ host = sys.argv[1]
+ table = sys.argv[2]
+ sc = SparkContext(appName="HBaseOutputFormat")
+
+ conf = {"hbase.zookeeper.quorum": host,
+ "hbase.mapred.outputtable": table,
+ "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
+ "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
+ "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"}
+
+ sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
+ conf=conf,
+ keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
+ valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
index 29a65c7a5f..83feb5703b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.pythonconverters
import org.apache.spark.api.python.Converter
import java.nio.ByteBuffer
import org.apache.cassandra.utils.ByteBufferUtil
-import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}
+import collection.JavaConversions._
/**
@@ -44,3 +44,25 @@ class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, St
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
}
}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * Map[String, Int] to Cassandra key
+ */
+class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] {
+ override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
+ val input = obj.asInstanceOf[java.util.Map[String, Int]]
+ mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i)))
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * List[String] to Cassandra value
+ */
+class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] {
+ override def convert(obj: Any): java.util.List[ByteBuffer] = {
+ val input = obj.asInstanceOf[java.util.List[String]]
+ seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s)))
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
deleted file mode 100644
index 42ae960bd6..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.pythonconverters
-
-import org.apache.spark.api.python.Converter
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result
- * to a String
- */
-class HBaseConverter extends Converter[Any, String] {
- override def convert(obj: Any): String = {
- val result = obj.asInstanceOf[Result]
- Bytes.toStringBinary(result.value())
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
new file mode 100644
index 0000000000..273bee0a8b
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.python.Converter
+import org.apache.hadoop.hbase.client.{Put, Result}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * HBase Result to a String
+ */
+class HBaseResultToStringConverter extends Converter[Any, String] {
+ override def convert(obj: Any): String = {
+ val result = obj.asInstanceOf[Result]
+ Bytes.toStringBinary(result.value())
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * ImmutableBytesWritable to a String
+ */
+class ImmutableBytesWritableToStringConverter extends Converter[Any, String] {
+ override def convert(obj: Any): String = {
+ val key = obj.asInstanceOf[ImmutableBytesWritable]
+ Bytes.toStringBinary(key.get())
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * String to an ImmutableBytesWritable
+ */
+class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBytesWritable] {
+ override def convert(obj: Any): ImmutableBytesWritable = {
+ val bytes = Bytes.toBytes(obj.asInstanceOf[String])
+ new ImmutableBytesWritable(bytes)
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * list of Strings to HBase Put
+ */
+class StringListToPutConverter extends Converter[Any, Put] {
+ override def convert(obj: Any): Put = {
+ val output = obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray
+ val put = new Put(output(0))
+ put.add(output(1), output(2), output(3))
+ }
+}
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 830a6ee03f..7b0f8d83ae 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -60,6 +60,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
+ _default_batch_size_for_serialized_input = 10
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
@@ -378,7 +379,7 @@ class SparkContext(object):
return jm
def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
- valueConverter=None, minSplits=None):
+ valueConverter=None, minSplits=None, batchSize=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -398,14 +399,18 @@ class SparkContext(object):
@param valueConverter:
@param minSplits: minimum splits in dataset
(default min(2, sc.defaultParallelism))
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
- keyConverter, valueConverter, minSplits)
- return RDD(jrdd, self, PickleSerializer())
+ keyConverter, valueConverter, minSplits, batchSize)
+ return RDD(jrdd, self, ser)
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -425,14 +430,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -449,14 +458,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -476,14 +489,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -500,11 +517,15 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
- keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b84d976114..e8fcc900ef 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -231,6 +231,13 @@ class RDD(object):
self._jrdd_deserializer = jrdd_deserializer
self._id = jrdd.id()
+ def _toPickleSerialization(self):
+ if (self._jrdd_deserializer == PickleSerializer() or
+ self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+ return self
+ else:
+ return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+
def id(self):
"""
A unique ID for this RDD (within its SparkContext).
@@ -1030,6 +1037,113 @@ class RDD(object):
"""
return self.take(1)[0]
+ def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
+ converted for output using either user specified converters or, by default,
+ L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+ @param conf: Hadoop job configuration, passed in as a dict
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ keyConverter, valueConverter, True)
+
+ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+ keyConverter=None, valueConverter=None, conf=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
+ will be inferred if not specified. Keys and values are converted for output using either
+ user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+ C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+ of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+ @param path: path to Hadoop file
+ @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+ (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.Text", None by default)
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop job configuration, passed in as a dict (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+ outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
+
+ def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
+ converted for output using either user specified converters or, by default,
+ L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+ @param conf: Hadoop job configuration, passed in as a dict
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ keyConverter, valueConverter, False)
+
+ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+ keyConverter=None, valueConverter=None, conf=None,
+ compressionCodecClass=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the old Hadoop OutputFormat API (mapred package). Key and value types
+ will be inferred if not specified. Keys and values are converted for output using either
+ user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+ C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+ of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+ @param path: path to Hadoop file
+ @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+ (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.Text", None by default)
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: (None by default)
+ @param compressionCodecClass: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+ outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
+ jconf, compressionCodecClass)
+
+ def saveAsSequenceFile(self, path, compressionCodecClass=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
+ RDD's key and value types. The mechanism is as follows:
+ 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
+ 2. Keys and values of this Java RDD are converted to Writables and written out.
+
+ @param path: path to sequence file
+ @param compressionCodecClass: (None by default)
+ """
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+ path, compressionCodecClass)
+
def saveAsPickleFile(self, path, batchSize=10):
"""
Save this RDD as a SequenceFile of serialized objects. The serializer
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8486c8595b..c29deb9574 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -19,6 +19,7 @@
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
"""
+from array import array
from fileinput import input
from glob import glob
import os
@@ -327,6 +328,17 @@ class TestInputFormat(PySparkTestCase):
ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
self.assertEqual(doubles, ed)
+ bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.BytesWritable").collect())
+ ebs = [(1, bytearray('aa', 'utf-8')),
+ (1, bytearray('aa', 'utf-8')),
+ (2, bytearray('aa', 'utf-8')),
+ (2, bytearray('bb', 'utf-8')),
+ (2, bytearray('bb', 'utf-8')),
+ (3, bytearray('cc', 'utf-8'))]
+ self.assertEqual(bytes, ebs)
+
text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text").collect())
@@ -353,14 +365,34 @@ class TestInputFormat(PySparkTestCase):
maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable").collect())
- em = [(1, {2.0: u'aa'}),
+ em = [(1, {}),
(1, {3.0: u'bb'}),
(2, {1.0: u'aa'}),
(2, {1.0: u'cc'}),
- (2, {3.0: u'bb'}),
(3, {2.0: u'dd'})]
self.assertEqual(maps, em)
+ # arrays get pickled to tuples by default
+ tuples = sorted(self.sc.sequenceFile(
+ basepath + "/sftestdata/sfarray/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable").collect())
+ et = [(1, ()),
+ (2, (3.0, 4.0, 5.0)),
+ (3, (4.0, 5.0, 6.0))]
+ self.assertEqual(tuples, et)
+
+ # with custom converters, primitive arrays can stay as arrays
+ arrays = sorted(self.sc.sequenceFile(
+ basepath + "/sftestdata/sfarray/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+ ea = [(1, array('d')),
+ (2, array('d', [3.0, 4.0, 5.0])),
+ (3, array('d', [4.0, 5.0, 6.0]))]
+ self.assertEqual(arrays, ea)
+
clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable").collect())
@@ -369,6 +401,12 @@ class TestInputFormat(PySparkTestCase):
u'double': 54.0, u'int': 123, u'str': u'test1'})
self.assertEqual(clazz[0], ec)
+ unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
+ "org.apache.hadoop.io.Text",
+ "org.apache.spark.api.python.TestWritable",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_clazz[0], ec)
+
def test_oldhadoop(self):
basepath = self.tempdir.name
ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
@@ -379,10 +417,11 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- hello = self.sc.hadoopFile(hellopath,
- "org.apache.hadoop.mapred.TextInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "org.apache.hadoop.io.Text").collect()
+ oldconf = {"mapred.input.dir" : hellopath}
+ hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text",
+ conf=oldconf).collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)
@@ -397,10 +436,11 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- hello = self.sc.newAPIHadoopFile(hellopath,
- "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "org.apache.hadoop.io.Text").collect()
+ newconf = {"mapred.input.dir" : hellopath}
+ hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text",
+ conf=newconf).collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)
@@ -435,16 +475,267 @@ class TestInputFormat(PySparkTestCase):
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text"))
- def test_converter(self):
+ def test_converters(self):
+ # use of custom converters
basepath = self.tempdir.name
maps = sorted(self.sc.sequenceFile(
basepath + "/sftestdata/sfmap/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable",
- valueConverter="org.apache.spark.api.python.TestConverter").collect())
- em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])]
+ keyConverter="org.apache.spark.api.python.TestInputKeyConverter",
+ valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect())
+ em = [(u'\x01', []),
+ (u'\x01', [3.0]),
+ (u'\x02', [1.0]),
+ (u'\x02', [1.0]),
+ (u'\x03', [2.0])]
+ self.assertEqual(maps, em)
+
+class TestOutputFormat(PySparkTestCase):
+
+ def setUp(self):
+ PySparkTestCase.setUp(self)
+ self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+ os.unlink(self.tempdir.name)
+
+ def tearDown(self):
+ PySparkTestCase.tearDown(self)
+ shutil.rmtree(self.tempdir.name, ignore_errors=True)
+
+ def test_sequencefiles(self):
+ basepath = self.tempdir.name
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/")
+ ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect())
+ self.assertEqual(ints, ei)
+
+ ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
+ self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
+ doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect())
+ self.assertEqual(doubles, ed)
+
+ ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))]
+ self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/")
+ bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect())
+ self.assertEqual(bytes, ebs)
+
+ et = [(u'1', u'aa'),
+ (u'2', u'bb'),
+ (u'3', u'cc')]
+ self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/")
+ text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect())
+ self.assertEqual(text, et)
+
+ eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
+ self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/")
+ bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect())
+ self.assertEqual(bools, eb)
+
+ en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
+ self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/")
+ nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect())
+ self.assertEqual(nulls, en)
+
+ em = [(1, {}),
+ (1, {3.0: u'bb'}),
+ (2, {1.0: u'aa'}),
+ (2, {1.0: u'cc'}),
+ (3, {2.0: u'dd'})]
+ self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
+ maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect())
self.assertEqual(maps, em)
+ def test_oldhadoop(self):
+ basepath = self.tempdir.name
+ dict_data = [(1, {}),
+ (1, {"row1" : 1.0}),
+ (2, {"row2" : 2.0})]
+ self.sc.parallelize(dict_data).saveAsHadoopFile(
+ basepath + "/oldhadoop/",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable")
+ result = sorted(self.sc.hadoopFile(
+ basepath + "/oldhadoop/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable").collect())
+ self.assertEqual(result, dict_data)
+
+ conf = {
+ "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
+ "mapred.output.dir" : basepath + "/olddataset/"}
+ self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
+ input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+ old_dataset = sorted(self.sc.hadoopRDD(
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable",
+ conf=input_conf).collect())
+ self.assertEqual(old_dataset, dict_data)
+
+ def test_newhadoop(self):
+ basepath = self.tempdir.name
+ # use custom ArrayWritable types and converters to handle arrays
+ array_data = [(1, array('d')),
+ (1, array('d', [1.0, 2.0, 3.0])),
+ (2, array('d', [3.0, 4.0, 5.0]))]
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+ result = sorted(self.sc.newAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+ self.assertEqual(result, array_data)
+
+ conf = {"mapreduce.outputformat.class" :
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable",
+ "mapred.output.dir" : basepath + "/newdataset/"}
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+ valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+ input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+ new_dataset = sorted(self.sc.newAPIHadoopRDD(
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter",
+ conf=input_conf).collect())
+ self.assertEqual(new_dataset, array_data)
+
+ def test_newolderror(self):
+ basepath = self.tempdir.name
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+ basepath + "/newolderror/saveAsHadoopFile/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
+ self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+ basepath + "/newolderror/saveAsNewAPIHadoopFile/",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat"))
+
+ def test_bad_inputs(self):
+ basepath = self.tempdir.name
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+ basepath + "/badinputs/saveAsHadoopFile/",
+ "org.apache.hadoop.mapred.NotValidOutputFormat"))
+ self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+ basepath + "/badinputs/saveAsNewAPIHadoopFile/",
+ "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat"))
+
+ def test_converters(self):
+ # use of custom converters
+ basepath = self.tempdir.name
+ data = [(1, {3.0: u'bb'}),
+ (2, {1.0: u'aa'}),
+ (3, {2.0: u'dd'})]
+ self.sc.parallelize(data).saveAsNewAPIHadoopFile(
+ basepath + "/converters/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ keyConverter="org.apache.spark.api.python.TestOutputKeyConverter",
+ valueConverter="org.apache.spark.api.python.TestOutputValueConverter")
+ converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect())
+ expected = [(u'1', 3.0),
+ (u'2', 1.0),
+ (u'3', 2.0)]
+ self.assertEqual(converted, expected)
+
+ def test_reserialization(self):
+ basepath = self.tempdir.name
+ x = range(1, 5)
+ y = range(1001, 1005)
+ data = zip(x, y)
+ rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
+ rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
+ result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
+ self.assertEqual(result1, data)
+
+ rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
+ self.assertEqual(result2, data)
+
+ rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
+ self.assertEqual(result3, data)
+
+ conf4 = {
+ "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir" : basepath + "/reserialize/dataset"}
+ rdd.saveAsHadoopDataset(conf4)
+ result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
+ self.assertEqual(result4, data)
+
+ conf5 = {"mapreduce.outputformat.class" :
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+ rdd.saveAsNewAPIHadoopDataset(conf5)
+ result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
+ self.assertEqual(result5, data)
+
+ def test_unbatched_save_and_read(self):
+ basepath = self.tempdir.name
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
+ basepath + "/unbatched/")
+
+ unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_sequence, ei)
+
+ unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_hadoopFile, ei)
+
+ unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_newAPIHadoopFile, ei)
+
+ oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ conf=oldconf,
+ batchSize=1).collect())
+ self.assertEqual(unbatched_hadoopRDD, ei)
+
+ newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ conf=newconf,
+ batchSize=1).collect())
+ self.assertEqual(unbatched_newAPIHadoopRDD, ei)
+
+ def test_malformed_RDD(self):
+ basepath = self.tempdir.name
+ # non-batch-serialized RDD[[(K, V)]] should be rejected
+ data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
+ rdd = self.sc.parallelize(data, numSlices=len(data))
+ self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
+ basepath + "/malformed/sequence"))
class TestDaemon(unittest.TestCase):
def connect(self, port):