diff options
author | Nick Pentreath <nick.pentreath@gmail.com> | 2014-06-09 22:21:03 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-06-09 22:21:03 -0700 |
commit | f971d6cb60d642178d6544217a25fa16ece34889 (patch) | |
tree | cba008802eda1755ff58eec8ad462d894f48d265 /core | |
parent | 6f2db8c2f51911f88a601ec5bf1509ea0e8173ed (diff) | |
download | spark-f971d6cb60d642178d6544217a25fa16ece34889.tar.gz spark-f971d6cb60d642178d6544217a25fa16ece34889.tar.bz2 spark-f971d6cb60d642178d6544217a25fa16ece34889.zip |
SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it.
This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark.
# Overview
The basics are as follows:
1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark
2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives)
3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString```
4. ```PickleSerializer``` on the Python side deserializes.
This works "out the box" for simple ```Writable```s:
* ```Text```
* ```IntWritable```, ```DoubleWritable```, ```FloatWritable```
* ```NullWritable```
* ```BooleanWritable```
* ```BytesWritable```
* ```MapWritable```
It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added).
I've tested it out with ```ESInputFormat``` as an example and it works very nicely:
```python
conf = {"es.resource" : "index/type" }
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
rdd.first()
```
I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box.
# Some things still outstanding:
1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~
2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~
3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~
4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR
Author: Nick Pentreath <nick.pentreath@gmail.com>
Closes #455 from MLnick/pyspark-inputformats and squashes the following commits:
268df7e [Nick Pentreath] Documentation changes mer @pwendell comments
761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry.
4c972d8 [Nick Pentreath] Add license headers
d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
cde6af9 [Nick Pentreath] Parameterize converter trait
5ebacfa [Nick Pentreath] Update docs for PySpark input formats
a985492 [Nick Pentreath] Move Converter examples to own package
365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface.
eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests
1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight
3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python
b65606f [Nick Pentreath] Add converter interface
5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None
085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs
43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide
94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods
1a4a1d6 [Nick Pentreath] Address @mateiz style comments
01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase
9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
84fe8e3 [Nick Pentreath] Python programming guide space formatting
d0f52b6 [Nick Pentreath] Python programming guide
7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
93ef995 [Nick Pentreath] Add back context.py changes
9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py
077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py
5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
35b8e3a [Nick Pentreath] Another fix for test ordering
bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
e001b94 [Nick Pentreath] Fix test failures due to ordering
78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide
64eb051 [Nick Pentreath] Scalastyle fix
e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring
c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests
1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir
17a656b [Nick Pentreath] remove binary sequencefile for tests
f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark
450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
31a2fff [Nick Pentreath] Scalastyle fixes
fc5099e [Nick Pentreath] Add Apache license headers
4e08983 [Nick Pentreath] Clean up docs for PySpark context methods
b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies
951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
f6aac55 [Nick Pentreath] Bring back msgpack
9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge
a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering
7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging
25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps
65360d5 [Nick Pentreath] Adding test SequenceFiles
0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
d72bf18 [Nick Pentreath] msgpack
dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
e67212a [Nick Pentreath] Add back msgpack dependency
f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
97ef708 [Nick Pentreath] Remove old writeToStream
2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data.
174f520 [Nick Pentreath] Add back graphx settings
703ee65 [Nick Pentreath] Add back msgpack
619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
eb40036 [Nick Pentreath] Remove unused comment lines
4d7ef2e [Nick Pentreath] Fix indentation
f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments
0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer
4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names
818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD
4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up
4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code
d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
Diffstat (limited to 'core')
4 files changed, 544 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala new file mode 100644 index 0000000000..adaa1ef6cf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io._ +import scala.util.{Failure, Success, Try} +import org.apache.spark.annotation.Experimental + + +/** + * :: Experimental :: + * A trait for use with reading custom classes in PySpark. Implement this trait and add custom + * transformation code by overriding the convert method. + */ +@Experimental +trait Converter[T, U] extends Serializable { + def convert(obj: T): U +} + +private[python] object Converter extends Logging { + + def getInstance(converterClass: Option[String]): Converter[Any, Any] = { + converterClass.map { cc => + Try { + val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]] + logInfo(s"Loaded converter: $cc") + c + } match { + case Success(c) => c + case Failure(err) => + logError(s"Failed to load converter: $cc") + throw err + } + }.getOrElse { new DefaultConverter } + } +} + +/** + * A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects. + * Other objects are passed through without conversion. + */ +private[python] class DefaultConverter extends Converter[Any, Any] { + + /** + * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or + * object representation + */ + private def convertWritable(writable: Writable): Any = { + import collection.JavaConversions._ + writable match { + case iw: IntWritable => iw.get() + case dw: DoubleWritable => dw.get() + case lw: LongWritable => lw.get() + case fw: FloatWritable => fw.get() + case t: Text => t.toString + case bw: BooleanWritable => bw.get() + case byw: BytesWritable => byw.getBytes + case n: NullWritable => null + case aw: ArrayWritable => aw.get().map(convertWritable(_)) + case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) => + (convertWritable(k), convertWritable(v)) + }.toMap) + case other => other + } + } + + def convert(obj: Any): Any = { + obj match { + case writable: Writable => + convertWritable(writable) + case _ => + obj + } + } +} + +/** Utilities for working with Python objects <-> Hadoop-related objects */ +private[python] object PythonHadoopUtil { + + /** + * Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]] + */ + def mapToConf(map: java.util.Map[String, String]): Configuration = { + import collection.JavaConversions._ + val conf = new Configuration() + map.foreach{ case (k, v) => conf.set(k, v) } + conf + } + + /** + * Merges two configurations, returns a copy of left with keys from right overwriting + * any matching keys in left + */ + def mergeConfs(left: Configuration, right: Configuration): Configuration = { + import collection.JavaConversions._ + val copy = new Configuration(left) + right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue)) + copy + } + + /** + * Converts an RDD of key-value pairs, where key and/or value could be instances of + * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)] + */ + def convertRDD[K, V](rdd: RDD[(K, V)], + keyConverter: Converter[Any, Any], + valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = { + rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) } + } + +} diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index d1df99300c..f6570d3357 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -28,6 +28,9 @@ import scala.util.Try import net.razorvine.pickle.{Pickler, Unpickler} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.{InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast @@ -266,7 +269,7 @@ private object SpecialLengths { val TIMING_DATA = -3 } -private[spark] object PythonRDD { +private[spark] object PythonRDD extends Logging { val UTF8 = Charset.forName("UTF-8") /** @@ -346,6 +349,180 @@ private[spark] object PythonRDD { } } + /** + * Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def sequenceFile[K, V]( + sc: JavaSparkContext, + path: String, + keyClassMaybeNull: String, + valueClassMaybeNull: String, + keyConverterClass: String, + valueConverterClass: String, + minSplits: Int) = { + val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") + val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + + val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + /** + * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( + sc: JavaSparkContext, + path: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val baseConf = sc.hadoopConfiguration() + val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) + val rdd = + newAPIHadoopRDDFromClassNames[K, V, F](sc, + Some(path), inputFormatClass, keyClass, valueClass, mergedConf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + /** + * Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is + * passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + sc: JavaSparkContext, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val rdd = + newAPIHadoopRDDFromClassNames[K, V, F](sc, + None, inputFormatClass, keyClass, valueClass, conf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]]( + sc: JavaSparkContext, + path: Option[String] = None, + inputFormatClass: String, + keyClass: String, + valueClass: String, + conf: Configuration) = { + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + val fc = fcm.runtimeClass.asInstanceOf[Class[F]] + val rdd = if (path.isDefined) { + sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf) + } else { + sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc) + } + rdd + } + + /** + * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + sc: JavaSparkContext, + path: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val baseConf = sc.hadoopConfiguration() + val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) + val rdd = + hadoopRDDFromClassNames[K, V, F](sc, + Some(path), inputFormatClass, keyClass, valueClass, mergedConf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + /** + * Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map + * that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], + * key and value class + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def hadoopRDD[K, V, F <: InputFormat[K, V]]( + sc: JavaSparkContext, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val rdd = + hadoopRDDFromClassNames[K, V, F](sc, + None, inputFormatClass, keyClass, valueClass, conf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]]( + sc: JavaSparkContext, + path: Option[String] = None, + inputFormatClass: String, + keyClass: String, + valueClass: String, + conf: Configuration) = { + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + val fc = fcm.runtimeClass.asInstanceOf[Class[F]] + val rdd = if (path.isDefined) { + sc.sc.hadoopFile(path.get, fc, kc, vc) + } else { + sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc) + } + rdd + } + def writeUTF(str: String, dataOut: DataOutputStream) { val bytes = str.getBytes(UTF8) dataOut.writeInt(bytes.length) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala new file mode 100644 index 0000000000..9a012e7254 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import scala.util.Try +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging +import scala.util.Success +import scala.util.Failure +import net.razorvine.pickle.Pickler + + +/** Utilities for serialization / deserialization between Python and Java, using Pickle. */ +private[python] object SerDeUtil extends Logging { + + private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = { + val pickle = new Pickler + val kt = Try { + pickle.dumps(t._1) + } + val vt = Try { + pickle.dumps(t._2) + } + (kt, vt) match { + case (Failure(kf), Failure(vf)) => + logWarning(s""" + |Failed to pickle Java object as key: ${t._1.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${kf.getMessage}""".stripMargin) + logWarning(s""" + |Failed to pickle Java object as value: ${t._2.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${vf.getMessage}""".stripMargin) + (true, true) + case (Failure(kf), _) => + logWarning(s""" + |Failed to pickle Java object as key: ${t._1.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${kf.getMessage}""".stripMargin) + (true, false) + case (_, Failure(vf)) => + logWarning(s""" + |Failed to pickle Java object as value: ${t._2.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${vf.getMessage}""".stripMargin) + (false, true) + case _ => + (false, false) + } + } + + /** + * Convert an RDD of key-value pairs to an RDD of serialized Python objects, that is usable + * by PySpark. By default, if serialization fails, toString is called and the string + * representation is serialized + */ + def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = { + val (keyFailed, valueFailed) = checkPickle(rdd.first()) + rdd.mapPartitions { iter => + val pickle = new Pickler + iter.map { case (k, v) => + if (keyFailed && valueFailed) { + pickle.dumps(Array(k.toString, v.toString)) + } else if (keyFailed) { + pickle.dumps(Array(k.toString, v)) + } else if (!keyFailed && valueFailed) { + pickle.dumps(Array(k, v.toString)) + } else { + pickle.dumps(Array(k, v)) + } + } + } + } + +} + diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala new file mode 100644 index 0000000000..f0e3fb9aff --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import org.apache.spark.SparkContext +import org.apache.hadoop.io._ +import scala.Array +import java.io.{DataOutput, DataInput} +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat +import org.apache.spark.api.java.JavaSparkContext + +/** + * A class to test MsgPack serialization on the Scala side, that will be deserialized + * in Python + * @param str + * @param int + * @param double + */ +case class TestWritable(var str: String, var int: Int, var double: Double) extends Writable { + def this() = this("", 0, 0.0) + + def getStr = str + def setStr(str: String) { this.str = str } + def getInt = int + def setInt(int: Int) { this.int = int } + def getDouble = double + def setDouble(double: Double) { this.double = double } + + def write(out: DataOutput) = { + out.writeUTF(str) + out.writeInt(int) + out.writeDouble(double) + } + + def readFields(in: DataInput) = { + str = in.readUTF() + int = in.readInt() + double = in.readDouble() + } +} + +class TestConverter extends Converter[Any, Any] { + import collection.JavaConversions._ + override def convert(obj: Any) = { + val m = obj.asInstanceOf[MapWritable] + seqAsJavaList(m.keySet.map(w => w.asInstanceOf[DoubleWritable].get()).toSeq) + } +} + +/** + * This object contains method to generate SequenceFile test data and write it to a + * given directory (probably a temp directory) + */ +object WriteInputFormatTestDataGenerator { + import SparkContext._ + + def main(args: Array[String]) { + val path = args(0) + val sc = new JavaSparkContext("local[4]", "test-writables") + generateData(path, sc) + } + + def generateData(path: String, jsc: JavaSparkContext) { + val sc = jsc.sc + + val basePath = s"$path/sftestdata/" + val textPath = s"$basePath/sftext/" + val intPath = s"$basePath/sfint/" + val doublePath = s"$basePath/sfdouble/" + val arrPath = s"$basePath/sfarray/" + val mapPath = s"$basePath/sfmap/" + val classPath = s"$basePath/sfclass/" + val bytesPath = s"$basePath/sfbytes/" + val boolPath = s"$basePath/sfbool/" + val nullPath = s"$basePath/sfnull/" + + /* + * Create test data for IntWritable, DoubleWritable, Text, BytesWritable, + * BooleanWritable and NullWritable + */ + val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa")) + sc.parallelize(intKeys).saveAsSequenceFile(intPath) + sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) + sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) + sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath) + val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) + sc.parallelize(bools).saveAsSequenceFile(boolPath) + sc.parallelize(intKeys).map{ case (k, v) => + (new IntWritable(k), NullWritable.get()) + }.saveAsSequenceFile(nullPath) + + // Create test data for ArrayWritable + val data = Seq( + (1, Array(1.0, 2.0, 3.0)), + (2, Array(3.0, 4.0, 5.0)), + (3, Array(4.0, 5.0, 6.0)) + ) + sc.parallelize(data, numSlices = 2) + .map{ case (k, v) => + (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) + }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath) + + // Create test data for MapWritable, with keys DoubleWritable and values Text + val mapData = Seq( + (1, Map(2.0 -> "aa")), + (2, Map(3.0 -> "bb")), + (2, Map(1.0 -> "cc")), + (3, Map(2.0 -> "dd")), + (2, Map(1.0 -> "aa")), + (1, Map(3.0 -> "bb")) + ) + sc.parallelize(mapData, numSlices = 2).map{ case (i, m) => + val mw = new MapWritable() + val k = m.keys.head + val v = m.values.head + mw.put(new DoubleWritable(k), new Text(v)) + (new IntWritable(i), mw) + }.saveAsSequenceFile(mapPath) + + // Create test data for arbitrary custom writable TestWritable + val testClass = Seq( + ("1", TestWritable("test1", 123, 54.0)), + ("2", TestWritable("test2", 456, 8762.3)), + ("1", TestWritable("test3", 123, 423.1)), + ("3", TestWritable("test56", 456, 423.5)), + ("2", TestWritable("test2", 123, 5435.2)) + ) + val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) } + rdd.saveAsNewAPIHadoopFile(classPath, + classOf[Text], classOf[TestWritable], + classOf[SequenceFileOutputFormat[Text, TestWritable]]) + } + + +} |