aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2014-06-09 22:21:03 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-09 22:21:03 -0700
commitf971d6cb60d642178d6544217a25fa16ece34889 (patch)
treecba008802eda1755ff58eec8ad462d894f48d265 /core
parent6f2db8c2f51911f88a601ec5bf1509ea0e8173ed (diff)
downloadspark-f971d6cb60d642178d6544217a25fa16ece34889.tar.gz
spark-f971d6cb60d642178d6544217a25fa16ece34889.tar.bz2
spark-f971d6cb60d642178d6544217a25fa16ece34889.zip
SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it. This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark. # Overview The basics are as follows: 1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark 2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives) 3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString``` 4. ```PickleSerializer``` on the Python side deserializes. This works "out the box" for simple ```Writable```s: * ```Text``` * ```IntWritable```, ```DoubleWritable```, ```FloatWritable``` * ```NullWritable``` * ```BooleanWritable``` * ```BytesWritable``` * ```MapWritable``` It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added). I've tested it out with ```ESInputFormat``` as an example and it works very nicely: ```python conf = {"es.resource" : "index/type" } rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) rdd.first() ``` I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box. # Some things still outstanding: 1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~ 2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~ 3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~ 4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR Author: Nick Pentreath <nick.pentreath@gmail.com> Closes #455 from MLnick/pyspark-inputformats and squashes the following commits: 268df7e [Nick Pentreath] Documentation changes mer @pwendell comments 761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry. 4c972d8 [Nick Pentreath] Add license headers d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats cde6af9 [Nick Pentreath] Parameterize converter trait 5ebacfa [Nick Pentreath] Update docs for PySpark input formats a985492 [Nick Pentreath] Move Converter examples to own package 365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface. eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests 1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight 3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python b65606f [Nick Pentreath] Add converter interface 5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None 085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs 43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide 94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods 1a4a1d6 [Nick Pentreath] Address @mateiz style comments 01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase 9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 84fe8e3 [Nick Pentreath] Python programming guide space formatting d0f52b6 [Nick Pentreath] Python programming guide 7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 93ef995 [Nick Pentreath] Add back context.py changes 9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py 077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py 5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 35b8e3a [Nick Pentreath] Another fix for test ordering bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e001b94 [Nick Pentreath] Fix test failures due to ordering 78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide 64eb051 [Nick Pentreath] Scalastyle fix e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests 1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir 17a656b [Nick Pentreath] remove binary sequencefile for tests f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark 450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 31a2fff [Nick Pentreath] Scalastyle fixes fc5099e [Nick Pentreath] Add Apache license headers 4e08983 [Nick Pentreath] Clean up docs for PySpark context methods b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies 951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats f6aac55 [Nick Pentreath] Bring back msgpack 9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering 7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging 25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps 65360d5 [Nick Pentreath] Adding test SequenceFiles 0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats d72bf18 [Nick Pentreath] msgpack dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e67212a [Nick Pentreath] Add back msgpack dependency f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 97ef708 [Nick Pentreath] Remove old writeToStream 2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data. 174f520 [Nick Pentreath] Add back graphx settings 703ee65 [Nick Pentreath] Add back msgpack 619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats eb40036 [Nick Pentreath] Remove unused comment lines 4d7ef2e [Nick Pentreath] Fix indentation f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments 0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer 4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names 818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD 4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up 4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala129
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala179
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala87
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala150
4 files changed, 544 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
new file mode 100644
index 0000000000..adaa1ef6cf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.Logging
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * A trait for use with reading custom classes in PySpark. Implement this trait and add custom
+ * transformation code by overriding the convert method.
+ */
+@Experimental
+trait Converter[T, U] extends Serializable {
+ def convert(obj: T): U
+}
+
+private[python] object Converter extends Logging {
+
+ def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
+ converterClass.map { cc =>
+ Try {
+ val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
+ logInfo(s"Loaded converter: $cc")
+ c
+ } match {
+ case Success(c) => c
+ case Failure(err) =>
+ logError(s"Failed to load converter: $cc")
+ throw err
+ }
+ }.getOrElse { new DefaultConverter }
+ }
+}
+
+/**
+ * A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
+ * Other objects are passed through without conversion.
+ */
+private[python] class DefaultConverter extends Converter[Any, Any] {
+
+ /**
+ * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
+ * object representation
+ */
+ private def convertWritable(writable: Writable): Any = {
+ import collection.JavaConversions._
+ writable match {
+ case iw: IntWritable => iw.get()
+ case dw: DoubleWritable => dw.get()
+ case lw: LongWritable => lw.get()
+ case fw: FloatWritable => fw.get()
+ case t: Text => t.toString
+ case bw: BooleanWritable => bw.get()
+ case byw: BytesWritable => byw.getBytes
+ case n: NullWritable => null
+ case aw: ArrayWritable => aw.get().map(convertWritable(_))
+ case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
+ (convertWritable(k), convertWritable(v))
+ }.toMap)
+ case other => other
+ }
+ }
+
+ def convert(obj: Any): Any = {
+ obj match {
+ case writable: Writable =>
+ convertWritable(writable)
+ case _ =>
+ obj
+ }
+ }
+}
+
+/** Utilities for working with Python objects <-> Hadoop-related objects */
+private[python] object PythonHadoopUtil {
+
+ /**
+ * Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
+ */
+ def mapToConf(map: java.util.Map[String, String]): Configuration = {
+ import collection.JavaConversions._
+ val conf = new Configuration()
+ map.foreach{ case (k, v) => conf.set(k, v) }
+ conf
+ }
+
+ /**
+ * Merges two configurations, returns a copy of left with keys from right overwriting
+ * any matching keys in left
+ */
+ def mergeConfs(left: Configuration, right: Configuration): Configuration = {
+ import collection.JavaConversions._
+ val copy = new Configuration(left)
+ right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
+ copy
+ }
+
+ /**
+ * Converts an RDD of key-value pairs, where key and/or value could be instances of
+ * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
+ */
+ def convertRDD[K, V](rdd: RDD[(K, V)],
+ keyConverter: Converter[Any, Any],
+ valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+ rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index d1df99300c..f6570d3357 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -28,6 +28,9 @@ import scala.util.Try
import net.razorvine.pickle.{Pickler, Unpickler}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.{InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
@@ -266,7 +269,7 @@ private object SpecialLengths {
val TIMING_DATA = -3
}
-private[spark] object PythonRDD {
+private[spark] object PythonRDD extends Logging {
val UTF8 = Charset.forName("UTF-8")
/**
@@ -346,6 +349,180 @@ private[spark] object PythonRDD {
}
}
+ /**
+ * Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
+ * key and value class.
+ * A key and/or value converter class can optionally be passed in
+ * (see [[org.apache.spark.api.python.Converter]])
+ */
+ def sequenceFile[K, V](
+ sc: JavaSparkContext,
+ path: String,
+ keyClassMaybeNull: String,
+ valueClassMaybeNull: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ minSplits: Int) = {
+ val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
+ val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
+ implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
+ implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
+ val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
+ val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
+
+ val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
+ val keyConverter = Converter.getInstance(Option(keyConverterClass))
+ val valueConverter = Converter.getInstance(Option(valueConverterClass))
+ val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
+ JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ }
+
+ /**
+ * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
+ * key and value class.
+ * A key and/or value converter class can optionally be passed in
+ * (see [[org.apache.spark.api.python.Converter]])
+ */
+ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+ sc: JavaSparkContext,
+ path: String,
+ inputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String]) = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ val baseConf = sc.hadoopConfiguration()
+ val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ val rdd =
+ newAPIHadoopRDDFromClassNames[K, V, F](sc,
+ Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
+ val keyConverter = Converter.getInstance(Option(keyConverterClass))
+ val valueConverter = Converter.getInstance(Option(valueConverterClass))
+ val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
+ JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ }
+
+ /**
+ * Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
+ * passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
+ * key and value class.
+ * A key and/or value converter class can optionally be passed in
+ * (see [[org.apache.spark.api.python.Converter]])
+ */
+ def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+ sc: JavaSparkContext,
+ inputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String]) = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ val rdd =
+ newAPIHadoopRDDFromClassNames[K, V, F](sc,
+ None, inputFormatClass, keyClass, valueClass, conf)
+ val keyConverter = Converter.getInstance(Option(keyConverterClass))
+ val valueConverter = Converter.getInstance(Option(valueConverterClass))
+ val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
+ JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ }
+
+ private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
+ sc: JavaSparkContext,
+ path: Option[String] = None,
+ inputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ conf: Configuration) = {
+ implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
+ implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
+ implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
+ val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
+ val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
+ val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
+ val rdd = if (path.isDefined) {
+ sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
+ } else {
+ sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
+ }
+ rdd
+ }
+
+ /**
+ * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
+ * key and value class.
+ * A key and/or value converter class can optionally be passed in
+ * (see [[org.apache.spark.api.python.Converter]])
+ */
+ def hadoopFile[K, V, F <: InputFormat[K, V]](
+ sc: JavaSparkContext,
+ path: String,
+ inputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String]) = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ val baseConf = sc.hadoopConfiguration()
+ val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+ val rdd =
+ hadoopRDDFromClassNames[K, V, F](sc,
+ Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
+ val keyConverter = Converter.getInstance(Option(keyConverterClass))
+ val valueConverter = Converter.getInstance(Option(valueConverterClass))
+ val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
+ JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ }
+
+ /**
+ * Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
+ * that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
+ * key and value class
+ * A key and/or value converter class can optionally be passed in
+ * (see [[org.apache.spark.api.python.Converter]])
+ */
+ def hadoopRDD[K, V, F <: InputFormat[K, V]](
+ sc: JavaSparkContext,
+ inputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ keyConverterClass: String,
+ valueConverterClass: String,
+ confAsMap: java.util.HashMap[String, String]) = {
+ val conf = PythonHadoopUtil.mapToConf(confAsMap)
+ val rdd =
+ hadoopRDDFromClassNames[K, V, F](sc,
+ None, inputFormatClass, keyClass, valueClass, conf)
+ val keyConverter = Converter.getInstance(Option(keyConverterClass))
+ val valueConverter = Converter.getInstance(Option(valueConverterClass))
+ val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
+ JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+ }
+
+ private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
+ sc: JavaSparkContext,
+ path: Option[String] = None,
+ inputFormatClass: String,
+ keyClass: String,
+ valueClass: String,
+ conf: Configuration) = {
+ implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
+ implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
+ implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
+ val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
+ val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
+ val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
+ val rdd = if (path.isDefined) {
+ sc.sc.hadoopFile(path.get, fc, kc, vc)
+ } else {
+ sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
+ }
+ rdd
+ }
+
def writeUTF(str: String, dataOut: DataOutputStream) {
val bytes = str.getBytes(UTF8)
dataOut.writeInt(bytes.length)
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
new file mode 100644
index 0000000000..9a012e7254
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import scala.util.Try
+import org.apache.spark.rdd.RDD
+import org.apache.spark.Logging
+import scala.util.Success
+import scala.util.Failure
+import net.razorvine.pickle.Pickler
+
+
+/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
+private[python] object SerDeUtil extends Logging {
+
+ private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
+ val pickle = new Pickler
+ val kt = Try {
+ pickle.dumps(t._1)
+ }
+ val vt = Try {
+ pickle.dumps(t._2)
+ }
+ (kt, vt) match {
+ case (Failure(kf), Failure(vf)) =>
+ logWarning(s"""
+ |Failed to pickle Java object as key: ${t._1.getClass.getSimpleName}, falling back
+ |to 'toString'. Error: ${kf.getMessage}""".stripMargin)
+ logWarning(s"""
+ |Failed to pickle Java object as value: ${t._2.getClass.getSimpleName}, falling back
+ |to 'toString'. Error: ${vf.getMessage}""".stripMargin)
+ (true, true)
+ case (Failure(kf), _) =>
+ logWarning(s"""
+ |Failed to pickle Java object as key: ${t._1.getClass.getSimpleName}, falling back
+ |to 'toString'. Error: ${kf.getMessage}""".stripMargin)
+ (true, false)
+ case (_, Failure(vf)) =>
+ logWarning(s"""
+ |Failed to pickle Java object as value: ${t._2.getClass.getSimpleName}, falling back
+ |to 'toString'. Error: ${vf.getMessage}""".stripMargin)
+ (false, true)
+ case _ =>
+ (false, false)
+ }
+ }
+
+ /**
+ * Convert an RDD of key-value pairs to an RDD of serialized Python objects, that is usable
+ * by PySpark. By default, if serialization fails, toString is called and the string
+ * representation is serialized
+ */
+ def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = {
+ val (keyFailed, valueFailed) = checkPickle(rdd.first())
+ rdd.mapPartitions { iter =>
+ val pickle = new Pickler
+ iter.map { case (k, v) =>
+ if (keyFailed && valueFailed) {
+ pickle.dumps(Array(k.toString, v.toString))
+ } else if (keyFailed) {
+ pickle.dumps(Array(k.toString, v))
+ } else if (!keyFailed && valueFailed) {
+ pickle.dumps(Array(k, v.toString))
+ } else {
+ pickle.dumps(Array(k, v))
+ }
+ }
+ }
+ }
+
+}
+
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
new file mode 100644
index 0000000000..f0e3fb9aff
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.SparkContext
+import org.apache.hadoop.io._
+import scala.Array
+import java.io.{DataOutput, DataInput}
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * A class to test MsgPack serialization on the Scala side, that will be deserialized
+ * in Python
+ * @param str
+ * @param int
+ * @param double
+ */
+case class TestWritable(var str: String, var int: Int, var double: Double) extends Writable {
+ def this() = this("", 0, 0.0)
+
+ def getStr = str
+ def setStr(str: String) { this.str = str }
+ def getInt = int
+ def setInt(int: Int) { this.int = int }
+ def getDouble = double
+ def setDouble(double: Double) { this.double = double }
+
+ def write(out: DataOutput) = {
+ out.writeUTF(str)
+ out.writeInt(int)
+ out.writeDouble(double)
+ }
+
+ def readFields(in: DataInput) = {
+ str = in.readUTF()
+ int = in.readInt()
+ double = in.readDouble()
+ }
+}
+
+class TestConverter extends Converter[Any, Any] {
+ import collection.JavaConversions._
+ override def convert(obj: Any) = {
+ val m = obj.asInstanceOf[MapWritable]
+ seqAsJavaList(m.keySet.map(w => w.asInstanceOf[DoubleWritable].get()).toSeq)
+ }
+}
+
+/**
+ * This object contains method to generate SequenceFile test data and write it to a
+ * given directory (probably a temp directory)
+ */
+object WriteInputFormatTestDataGenerator {
+ import SparkContext._
+
+ def main(args: Array[String]) {
+ val path = args(0)
+ val sc = new JavaSparkContext("local[4]", "test-writables")
+ generateData(path, sc)
+ }
+
+ def generateData(path: String, jsc: JavaSparkContext) {
+ val sc = jsc.sc
+
+ val basePath = s"$path/sftestdata/"
+ val textPath = s"$basePath/sftext/"
+ val intPath = s"$basePath/sfint/"
+ val doublePath = s"$basePath/sfdouble/"
+ val arrPath = s"$basePath/sfarray/"
+ val mapPath = s"$basePath/sfmap/"
+ val classPath = s"$basePath/sfclass/"
+ val bytesPath = s"$basePath/sfbytes/"
+ val boolPath = s"$basePath/sfbool/"
+ val nullPath = s"$basePath/sfnull/"
+
+ /*
+ * Create test data for IntWritable, DoubleWritable, Text, BytesWritable,
+ * BooleanWritable and NullWritable
+ */
+ val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
+ sc.parallelize(intKeys).saveAsSequenceFile(intPath)
+ sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
+ sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
+ sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
+ val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
+ sc.parallelize(bools).saveAsSequenceFile(boolPath)
+ sc.parallelize(intKeys).map{ case (k, v) =>
+ (new IntWritable(k), NullWritable.get())
+ }.saveAsSequenceFile(nullPath)
+
+ // Create test data for ArrayWritable
+ val data = Seq(
+ (1, Array(1.0, 2.0, 3.0)),
+ (2, Array(3.0, 4.0, 5.0)),
+ (3, Array(4.0, 5.0, 6.0))
+ )
+ sc.parallelize(data, numSlices = 2)
+ .map{ case (k, v) =>
+ (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
+ }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
+
+ // Create test data for MapWritable, with keys DoubleWritable and values Text
+ val mapData = Seq(
+ (1, Map(2.0 -> "aa")),
+ (2, Map(3.0 -> "bb")),
+ (2, Map(1.0 -> "cc")),
+ (3, Map(2.0 -> "dd")),
+ (2, Map(1.0 -> "aa")),
+ (1, Map(3.0 -> "bb"))
+ )
+ sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
+ val mw = new MapWritable()
+ val k = m.keys.head
+ val v = m.values.head
+ mw.put(new DoubleWritable(k), new Text(v))
+ (new IntWritable(i), mw)
+ }.saveAsSequenceFile(mapPath)
+
+ // Create test data for arbitrary custom writable TestWritable
+ val testClass = Seq(
+ ("1", TestWritable("test1", 123, 54.0)),
+ ("2", TestWritable("test2", 456, 8762.3)),
+ ("1", TestWritable("test3", 123, 423.1)),
+ ("3", TestWritable("test56", 456, 423.5)),
+ ("2", TestWritable("test2", 123, 5435.2))
+ )
+ val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
+ rdd.saveAsNewAPIHadoopFile(classPath,
+ classOf[Text], classOf[TestWritable],
+ classOf[SequenceFileOutputFormat[Text, TestWritable]])
+ }
+
+
+}