aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-07-07 18:37:38 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-07 18:37:38 -0700
commit4352a2fdaa64efee7158eabef65703460ff284ec (patch)
tree2c4a946d15c8f3584d4cad6c14a9562187733256 /sql
parentf0496ee10847db921a028a34f70385f9b740b3f3 (diff)
downloadspark-4352a2fdaa64efee7158eabef65703460ff284ec.tar.gz
spark-4352a2fdaa64efee7158eabef65703460ff284ec.tar.bz2
spark-4352a2fdaa64efee7158eabef65703460ff284ec.zip
[SPARK-2376][SQL] Selecting list values inside nested JSON objects raises java.lang.IllegalArgumentException
JIRA: https://issues.apache.org/jira/browse/SPARK-2376 Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1320 from yhuai/SPARK-2376 and squashes the following commits: 0107417 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2376 480803d [Yin Huai] Correctly handling JSON arrays in PySpark.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala45
1 files changed, 30 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 8f9f54f610..8bcfc7c064 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -17,6 +17,11 @@
package org.apache.spark.sql
+import java.util.{Map => JMap, List => JList, Set => JSet}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
import net.razorvine.pickle.Pickler
import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
@@ -27,10 +32,9 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
+import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType}
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
-import java.util.{Map => JMap}
/**
* :: AlphaComponent ::
@@ -359,6 +363,28 @@ class SchemaRDD(
case (obj, (name, dataType)) =>
dataType match {
case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
+ case array @ ArrayType(struct: StructType) =>
+ val arrayValues = obj match {
+ case seq: Seq[Any] =>
+ seq.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava
+ case list: JList[Any] =>
+ list.map(element => rowToMap(element.asInstanceOf[Row], struct))
+ case set: JSet[Any] =>
+ set.map(element => rowToMap(element.asInstanceOf[Row], struct))
+ case array if array != null && array.getClass.isArray =>
+ array.asInstanceOf[Array[Any]].map {
+ element => rowToMap(element.asInstanceOf[Row], struct)
+ }
+ case other => other
+ }
+ map.put(name, arrayValues)
+ case array: ArrayType => {
+ val arrayValues = obj match {
+ case seq: Seq[Any] => seq.asJava
+ case other => other
+ }
+ map.put(name, arrayValues)
+ }
case other => map.put(name, obj)
}
}
@@ -366,22 +392,11 @@ class SchemaRDD(
map
}
- // TODO: Actually, the schema of a row should be represented by a StructType instead of
- // a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to
- // construct the Map for python.
- val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map(
- field => (field.name, field.dataType))
+ val rowSchema = StructType.fromAttributes(this.queryExecution.analyzed.output)
this.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
- val map: JMap[String, Any] = new java.util.HashMap
- row.zip(fields).foreach { case (obj, (name, dataType)) =>
- dataType match {
- case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
- case other => map.put(name, obj)
- }
- }
- map
+ rowToMap(row, rowSchema)
}.grouped(10).map(batched => pickle.dumps(batched.toArray))
}
}