aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala32
2 files changed, 37 insertions, 0 deletions
diff --git a/core/pom.xml b/core/pom.xml
index a1bdd8ec68..d87e2bca03 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -266,6 +266,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>pyrolite</artifactId>
+ <version>2.0</version>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 32f1100406..f9d86fed34 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -25,6 +25,8 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import net.razorvine.pickle.{Pickler, Unpickler}
+
import org.apache.spark._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
@@ -284,6 +286,36 @@ private[spark] object PythonRDD {
file.close()
}
+ /**
+ * Convert an RDD of serialized Python dictionaries to Scala Maps
+ * TODO: Support more Python types.
+ */
+ def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
+ pyRDD.rdd.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ // TODO: Figure out why flatMap is necessay for pyspark
+ iter.flatMap { row =>
+ unpickle.loads(row) match {
+ case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
+ // Incase the partition doesn't have a collection
+ case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
+ }
+ }
+ }
+ }
+
+ /**
+ * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
+ * PySpark.
+ */
+ def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
+ jRDD.rdd.mapPartitions { iter =>
+ val pickle = new Pickler
+ iter.map { row =>
+ pickle.dumps(row)
+ }
+ }
+ }
}
private