diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/pom.xml | 5 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 32 |
2 files changed, 37 insertions, 0 deletions
diff --git a/core/pom.xml b/core/pom.xml index a1bdd8ec68..d87e2bca03 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -266,6 +266,11 @@ <artifactId>junit-interface</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>pyrolite</artifactId> + <version>2.0</version> + </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 32f1100406..f9d86fed34 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -25,6 +25,8 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import net.razorvine.pickle.{Pickler, Unpickler} + import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast @@ -284,6 +286,36 @@ private[spark] object PythonRDD { file.close() } + /** + * Convert an RDD of serialized Python dictionaries to Scala Maps + * TODO: Support more Python types. + */ + def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + // TODO: Figure out why flatMap is necessay for pyspark + iter.flatMap { row => + unpickle.loads(row) match { + case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap) + // Incase the partition doesn't have a collection + case obj: JMap[String @unchecked, _] => Seq(obj.toMap) + } + } + } + } + + /** + * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by + * PySpark. + */ + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { + jRDD.rdd.mapPartitions { iter => + val pickle = new Pickler + iter.map { row => + pickle.dumps(row) + } + } + } } private |