diff options
author | Yin Huai <huai@cse.ohio-state.edu> | 2014-07-07 18:37:38 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-07 18:37:38 -0700 |
commit | 4352a2fdaa64efee7158eabef65703460ff284ec (patch) | |
tree | 2c4a946d15c8f3584d4cad6c14a9562187733256 /sql | |
parent | f0496ee10847db921a028a34f70385f9b740b3f3 (diff) | |
download | spark-4352a2fdaa64efee7158eabef65703460ff284ec.tar.gz spark-4352a2fdaa64efee7158eabef65703460ff284ec.tar.bz2 spark-4352a2fdaa64efee7158eabef65703460ff284ec.zip |
[SPARK-2376][SQL] Selecting list values inside nested JSON objects raises java.lang.IllegalArgumentException
JIRA: https://issues.apache.org/jira/browse/SPARK-2376
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes #1320 from yhuai/SPARK-2376 and squashes the following commits:
0107417 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2376
480803d [Yin Huai] Correctly handling JSON arrays in PySpark.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 8f9f54f610..8bcfc7c064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql +import java.util.{Map => JMap, List => JList, Set => JSet} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + import net.razorvine.pickle.Pickler import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext} @@ -27,10 +32,9 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} -import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType} +import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType} import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} import org.apache.spark.api.java.JavaRDD -import java.util.{Map => JMap} /** * :: AlphaComponent :: @@ -359,6 +363,28 @@ class SchemaRDD( case (obj, (name, dataType)) => dataType match { case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct)) + case array @ ArrayType(struct: StructType) => + val arrayValues = obj match { + case seq: Seq[Any] => + seq.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava + case list: JList[Any] => + list.map(element => rowToMap(element.asInstanceOf[Row], struct)) + case set: JSet[Any] => + set.map(element => rowToMap(element.asInstanceOf[Row], struct)) + case array if array != null && array.getClass.isArray => + array.asInstanceOf[Array[Any]].map { + element => rowToMap(element.asInstanceOf[Row], struct) + } + case other => other + } + map.put(name, arrayValues) + case array: ArrayType => { + val arrayValues = obj match { + case seq: Seq[Any] => seq.asJava + case other => other + } + map.put(name, arrayValues) + } case other => map.put(name, obj) } } @@ -366,22 +392,11 @@ class SchemaRDD( map } - // TODO: Actually, the schema of a row should be represented by a StructType instead of - // a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to - // construct the Map for python. - val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map( - field => (field.name, field.dataType)) + val rowSchema = StructType.fromAttributes(this.queryExecution.analyzed.output) this.mapPartitions { iter => val pickle = new Pickler iter.map { row => - val map: JMap[String, Any] = new java.util.HashMap - row.zip(fields).foreach { case (obj, (name, dataType)) => - dataType match { - case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct)) - case other => map.put(name, obj) - } - } - map + rowToMap(row, rowSchema) }.grouped(10).map(batched => pickle.dumps(batched.toArray)) } } |