aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-01 18:47:41 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-01 18:47:41 -0700
commit880eabec37c69ce4e9594d7babfac291b0f93f50 (patch)
treeaeebac2510f7c3b303a5aa0b944a9af84e9d6698 /core
parent7058a5393bccc2f917189fa9b4cf7f314410b0de (diff)
downloadspark-880eabec37c69ce4e9594d7babfac291b0f93f50.tar.gz
spark-880eabec37c69ce4e9594d7babfac291b0f93f50.tar.bz2
spark-880eabec37c69ce4e9594d7babfac291b0f93f50.zip
[SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD
Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes. This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance. root |-- field1: integer (nullable = true) |-- field2: string (nullable = true) |-- field3: struct (nullable = true) | |-- field4: integer (nullable = true) | |-- field5: array (nullable = true) | | |-- element: integer (containsNull = false) |-- field6: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- field7: string (nullable = true) Then we can access them by row.field3.field5[0] or row.field6[5].field7 It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType. You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as: ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1])) Or you could use Row to create a class just like namedtuple, for example: Person = Row("name", "age") ctx.inferSchema(rdd.map(lambda x: Person(*x))) Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The `schema` should be StructType, see the API docs for details. schema = StructType([StructField("name, StringType, True), StructType("age", IntegerType, True)]) ctx.applySchema(rdd, schema) PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable. Author: Davies Liu <davies.liu@gmail.com> Closes #1598 from davies/nested and squashes the following commits: f1d15b6 [Davies Liu] verify schema with the first few rows 8852aaf [Davies Liu] check type of schema abe9e6e [Davies Liu] address comments 61b2292 [Davies Liu] add @deprecated to pythonToJavaMap 1e5b801 [Davies Liu] improve cache of classes 51aa135 [Davies Liu] use Row to infer schema e9c0d5c [Davies Liu] remove string typed schema 353a3f2 [Davies Liu] fix code style 63de8f8 [Davies Liu] fix typo c79ca67 [Davies Liu] fix serialization of nested data 6b258b5 [Davies Liu] fix pep8 9d8447c [Davies Liu] apply schema provided by string of names f5df97f [Davies Liu] refactor, address comments 9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python 84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into nested 0eaaf56 [Davies Liu] fix doc tests b3559b4 [Davies Liu] use generated Row instead of namedtuple c4ddc30 [Davies Liu] fix conflict between name of fields and variables 7f6f251 [Davies Liu] address all comments d69d397 [Davies Liu] refactor 2cc2d45 [Davies Liu] refactor 182fb46 [Davies Liu] refactor bc6e9e1 [Davies Liu] switch to new Schema API 547bf3e [Davies Liu] Merge branch 'master' into nested a435b5a [Davies Liu] add docs and code refactor 2c8debc [Davies Liu] Merge branch 'master' into nested 644665a [Davies Liu] use tuple and namedtuple for schemardd
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala69
1 files changed, 49 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 94d666aa92..fe9a9e50ef 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -25,7 +25,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
import scala.collection.JavaConversions._
import scala.language.existentials
import scala.reflect.ClassTag
-import scala.util.Try
+import scala.util.{Try, Success, Failure}
import net.razorvine.pickle.{Pickler, Unpickler}
@@ -536,25 +536,6 @@ private[spark] object PythonRDD extends Logging {
file.close()
}
- /**
- * Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
- * It is only used by pyspark.sql.
- * TODO: Support more Python types.
- */
- def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
- pyRDD.rdd.mapPartitions { iter =>
- val unpickle = new Unpickler
- iter.flatMap { row =>
- unpickle.loads(row) match {
- // in case of objects are pickled in batch mode
- case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
- // not in batch mode
- case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
- }
- }
- }
- }
-
private def getMergedConf(confAsMap: java.util.HashMap[String, String],
baseConf: Configuration): Configuration = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
@@ -701,6 +682,54 @@ private[spark] object PythonRDD extends Logging {
}
}
+
+ /**
+ * Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
+ * This function is outdated, PySpark does not use it anymore
+ */
+ @deprecated
+ def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
+ pyRDD.rdd.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ iter.flatMap { row =>
+ unpickle.loads(row) match {
+ // in case of objects are pickled in batch mode
+ case objs: JArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
+ // not in batch mode
+ case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
+ }
+ }
+ }
+ }
+
+ /**
+ * Convert an RDD of serialized Python tuple to Array (no recursive conversions).
+ * It is only used by pyspark.sql.
+ */
+ def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = {
+
+ def toArray(obj: Any): Array[_] = {
+ obj match {
+ case objs: JArrayList[_] =>
+ objs.toArray
+ case obj if obj.getClass.isArray =>
+ obj.asInstanceOf[Array[_]].toArray
+ }
+ }
+
+ pyRDD.rdd.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ iter.flatMap { row =>
+ val obj = unpickle.loads(row)
+ if (batched) {
+ obj.asInstanceOf[JArrayList[_]].map(toArray)
+ } else {
+ Seq(toArray(obj))
+ }
+ }
+ }.toJavaRDD()
+ }
+
/**
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
* PySpark.