aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-06-17 19:14:59 -0700
committerReynold Xin <rxin@apache.org>2014-06-17 19:14:59 -0700
commitd2f4f30b12f99358953e2781957468e2cfe3c916 (patch)
tree405b949a2968dba2c73874bd2fefc9d10206e731 /sql
parentb2ebf429e24566c29850c570f8d76943151ad78c (diff)
downloadspark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.gz
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.bz2
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.zip
[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2060 Programming guide: http://yhuai.github.io/site/sql-programming-guide.html Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext Author: Yin Huai <huai@cse.ohio-state.edu> Closes #999 from yhuai/newJson and squashes the following commits: 227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ce8eedd [Yin Huai] rxin's comments. bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 94ffdaa [Yin Huai] Remove "get" from method names. ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 79ea9ba [Yin Huai] Fix typos. 5428451 [Yin Huai] Newline 1f908ce [Yin Huai] Remove extra line. d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 7ea750e [Yin Huai] marmbrus's comments. 6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 83013fb [Yin Huai] Update Java Example. e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map. 6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4fbddf0 [Yin Huai] Programming guide. 9df8c5a [Yin Huai] Python API. 7027634 [Yin Huai] Java API. cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset. d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ab810b0 [Yin Huai] Make JsonRDD private. 6df0891 [Yin Huai] Apache header. 8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema. 8ffed79 [Yin Huai] Update the example. a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution. 65b87f0 [Yin Huai] Fix sampling... 8846af5 [Yin Huai] API doc. 52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 0387523 [Yin Huai] Address PR comments. 666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson a2313a6 [Yin Huai] Address PR comments. f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used. 0576406 [Yin Huai] Add Apache license header. af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD. f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/pom.xml28
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala25
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala51
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala)9
-rw-r--r--sql/core/pom.xml12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala397
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala45
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala519
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala84
18 files changed, 1262 insertions, 35 deletions
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 6c78c34486..01d7b56908 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -66,6 +66,34 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
+
+ <!--
+ This plugin forces the generation of jar containing catalyst test classes,
+ so that the tests classes of external modules can use them. The two execution profiles
+ are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally,
+ 'mvn compile' should not compile test classes and therefore should not need this.
+ However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559)
+ causes the compilation to fail if catalyst test-jar is not generated. Hence, the
+ second execution profile for 'mvn compile'.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>test-jar-on-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index d291814c8a..66bff660ca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._
+object HiveTypeCoercion {
+ // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
+ // The conversion for integral and floating point types have a linear widening hierarchy:
+ val numericPrecedence =
+ Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
+ // Boolean is only wider than Void
+ val booleanPrecedence = Seq(NullType, BooleanType)
+ val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
+}
+
/**
* A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
* participate in operations into compatible ones. Most of these rules are based on Hive semantics,
@@ -116,19 +126,18 @@ trait HiveTypeCoercion {
*
* Additionally, all types when UNION-ed with strings will be promoted to strings.
* Other string conversions are handled by PromoteStrings.
+ *
+ * Widening types might result in loss of precision in the following cases:
+ * - IntegerType to FloatType
+ * - LongType to FloatType
+ * - LongType to DoubleType
*/
object WidenTypes extends Rule[LogicalPlan] {
- // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
- // The conversion for integral and floating point types have a linear widening hierarchy:
- val numericPrecedence =
- Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
- // Boolean is only wider than Void
- val booleanPrecedence = Seq(NullType, BooleanType)
- val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
// Try and find a promotion rule that contains both types in question.
- val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2))
+ val applicableConversion =
+ HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
// If found return the widest common type, otherwise None
applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 8199a80f5d..00e2d3bc24 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType}
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType with Product =>
@@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
case other => Nil
}.toSeq
}
+
+ protected def generateSchemaString(schema: Seq[Attribute]): String = {
+ val builder = new StringBuilder
+ builder.append("root\n")
+ val prefix = " |"
+ schema.foreach { attribute =>
+ val name = attribute.name
+ val dataType = attribute.dataType
+ dataType match {
+ case fields: StructType =>
+ builder.append(s"$prefix-- $name: $StructType\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case ArrayType(fields: StructType) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case ArrayType(elementType: DataType) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+ case _ => builder.append(s"$prefix-- $name: $dataType\n")
+ }
+ }
+
+ builder.toString()
+ }
+
+ protected def generateSchemaString(
+ schema: StructType,
+ prefix: String,
+ builder: StringBuilder): StringBuilder = {
+ schema.fields.foreach {
+ case StructField(name, fields: StructType, _) =>
+ builder.append(s"$prefix-- $name: $StructType\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case StructField(name, ArrayType(fields: StructType), _) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case StructField(name, ArrayType(elementType: DataType), _) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+ case StructField(name, fieldType: DataType, _) =>
+ builder.append(s"$prefix-- $name: $fieldType\n")
+ }
+
+ builder
+ }
+
+ /** Returns the output schema in the tree format. */
+ def schemaString: String = generateSchemaString(output)
+
+ /** Prints out the schema in the tree format */
+ def printSchema(): Unit = println(schemaString)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index 714f01843c..4896f1b955 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
@@ -18,11 +18,12 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class CombiningLimitsSuite extends OptimizerTest {
+class CombiningLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 6efc0e211e..cea97c584f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types._
@@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class ConstantFoldingSuite extends OptimizerTest {
+class ConstantFoldingSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 1f67c80e54..ebb123c1f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.LeftOuter
-import org.apache.spark.sql.catalyst.plans.RightOuter
+import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class FilterPushdownSuite extends OptimizerTest {
+class FilterPushdownSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
index df1409fe7b..22992fb6f5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-class SimplifyCaseConversionExpressionsSuite extends OptimizerTest {
+class SimplifyCaseConversionExpressionsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 89982d5cd8..7e9f47ef21 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.optimizer
+package org.apache.spark.sql.catalyst.plans
import org.scalatest.FunSuite
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
/**
- * Provides helper methods for comparing plans produced by optimization rules with the expected
- * result
+ * Provides helper methods for comparing plans.
*/
-class OptimizerTest extends FunSuite {
+class PlanTest extends FunSuite {
/**
* Since attribute references are given globally unique ids during analysis,
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index e65ca6be48..8210fd1f21 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -44,6 +44,13 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
@@ -54,6 +61,11 @@
<version>${parquet.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.3.0</version>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 131c130bbb..f7e03323be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -22,24 +22,22 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
-
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
+import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
import org.apache.spark.sql.columnar.InMemoryRelation
-
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
-
+import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.SparkContext
/**
* :: AlphaComponent ::
@@ -53,7 +51,7 @@ import org.apache.spark.sql.parquet.ParquetRelation
class SQLContext(@transient val sparkContext: SparkContext)
extends Logging
with SQLConf
- with dsl.ExpressionConversions
+ with ExpressionConversions
with Serializable {
self =>
@@ -99,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, parquet.ParquetRelation(path))
/**
+ * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0)
+
+ /**
+ * :: Experimental ::
+ */
+ @Experimental
+ def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
+ val json = sparkContext.textFile(path)
+ jsonRDD(json, samplingRatio)
+ }
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+ * [[SchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0)
+
+ /**
+ * :: Experimental ::
+ */
+ @Experimental
+ def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
+ new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
+
+ /**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
* This registered table can be used as the target of future `insertInto` operations.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 89eaba2d19..7c0efb4566 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.types.BooleanType
+import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap}
@@ -41,8 +41,10 @@ import java.util.{Map => JMap}
* whose elements are scala case classes into a SchemaRDD. This conversion can also be done
* explicitly using the `createSchemaRDD` function on a [[SQLContext]].
*
- * A `SchemaRDD` can also be created by loading data in from external sources, for example,
- * by using the `parquetFile` method on [[SQLContext]].
+ * A `SchemaRDD` can also be created by loading data in from external sources.
+ * Examples are loading data from Parquet files by using by using the
+ * `parquetFile` method on [[SQLContext]], and loading JSON datasets
+ * by using `jsonFile` and `jsonRDD` methods on [[SQLContext]].
*
* == SQL Queries ==
* A SchemaRDD can be registered as a table in the [[SQLContext]] that was used to create it. Once
@@ -341,14 +343,38 @@ class SchemaRDD(
*/
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
+ /**
+ * Converts a JavaRDD to a PythonRDD. It is used by pyspark.
+ */
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
- val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
+ def rowToMap(row: Row, structType: StructType): JMap[String, Any] = {
+ val fields = structType.fields.map(field => (field.name, field.dataType))
+ val map: JMap[String, Any] = new java.util.HashMap
+ row.zip(fields).foreach {
+ case (obj, (name, dataType)) =>
+ dataType match {
+ case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
+ case other => map.put(name, obj)
+ }
+ }
+
+ map
+ }
+
+ // TODO: Actually, the schema of a row should be represented by a StructType instead of
+ // a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to
+ // construct the Map for python.
+ val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map(
+ field => (field.name, field.dataType))
this.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
val map: JMap[String, Any] = new java.util.HashMap
- row.zip(fieldNames).foreach { case (obj, name) =>
- map.put(name, obj)
+ row.zip(fields).foreach { case (obj, (name, dataType)) =>
+ dataType match {
+ case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
+ case other => map.put(name, obj)
+ }
}
map
}.grouped(10).map(batched => pickle.dumps(batched.toArray))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 656be965a8..fe81721943 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -122,4 +122,10 @@ private[sql] trait SchemaRDDLike {
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
+
+ /** Returns the output schema in the tree format. */
+ def schemaString: String = queryExecution.analyzed.schemaString
+
+ /** Prints out the schema in the tree format. */
+ def printSchema(): Unit = println(schemaString)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 352260fa15..ff9842267f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
import org.apache.spark.sql.catalyst.types._
@@ -101,6 +102,25 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
/**
+ * Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonFile(path: String): JavaSchemaRDD =
+ jsonRDD(sqlContext.sparkContext.textFile(path))
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+ * [[JavaSchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
+ new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
+
+ /**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
new file mode 100644
index 0000000000..edf8677557
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import scala.collection.JavaConversions._
+import scala.math.BigDecimal
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.Logging
+
+private[sql] object JsonRDD extends Logging {
+
+ private[sql] def inferSchema(
+ json: RDD[String],
+ samplingRatio: Double = 1.0): LogicalPlan = {
+ require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
+ val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
+ val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
+ val baseSchema = createSchema(allKeys)
+
+ createLogicalPlan(json, baseSchema)
+ }
+
+ private def createLogicalPlan(
+ json: RDD[String],
+ baseSchema: StructType): LogicalPlan = {
+ val schema = nullTypeToStringType(baseSchema)
+
+ SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))
+ }
+
+ private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
+ // Resolve type conflicts
+ val resolved = allKeys.groupBy {
+ case (key, dataType) => key
+ }.map {
+ // Now, keys and types are organized in the format of
+ // key -> Set(type1, type2, ...).
+ case (key, typeSet) => {
+ val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq
+ val dataType = typeSet.map {
+ case (_, dataType) => dataType
+ }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
+
+ (fieldName, dataType)
+ }
+ }
+
+ def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType = {
+ val (topLevel, structLike) = values.partition(_.size == 1)
+ val topLevelFields = topLevel.filter {
+ name => resolved.get(prefix ++ name).get match {
+ case ArrayType(StructType(Nil)) => false
+ case ArrayType(_) => true
+ case struct: StructType => false
+ case _ => true
+ }
+ }.map {
+ a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = true)
+ }
+
+ val structFields: Seq[StructField] = structLike.groupBy(_(0)).map {
+ case (name, fields) => {
+ val nestedFields = fields.map(_.tail)
+ val structType = makeStruct(nestedFields, prefix :+ name)
+ val dataType = resolved.get(prefix :+ name).get
+ dataType match {
+ case array: ArrayType => Some(StructField(name, ArrayType(structType), nullable = true))
+ case struct: StructType => Some(StructField(name, structType, nullable = true))
+ // dataType is StringType means that we have resolved type conflicts involving
+ // primitive types and complex types. So, the type of name has been relaxed to
+ // StringType. Also, this field should have already been put in topLevelFields.
+ case StringType => None
+ }
+ }
+ }.flatMap(field => field).toSeq
+
+ StructType(
+ (topLevelFields ++ structFields).sortBy {
+ case StructField(name, _, _) => name
+ })
+ }
+
+ makeStruct(resolved.keySet.toSeq, Nil)
+ }
+
+ /**
+ * Returns the most general data type for two given data types.
+ */
+ private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
+ // Try and find a promotion rule that contains both types in question.
+ val applicableConversion = HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p
+ .contains(t2))
+
+ // If found return the widest common type, otherwise None
+ val returnType = applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
+
+ if (returnType.isDefined) {
+ returnType.get
+ } else {
+ // t1 or t2 is a StructType, ArrayType, or an unexpected type.
+ (t1, t2) match {
+ case (other: DataType, NullType) => other
+ case (NullType, other: DataType) => other
+ case (StructType(fields1), StructType(fields2)) => {
+ val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
+ case (name, fieldTypes) => {
+ val dataType = fieldTypes.map(field => field.dataType).reduce(
+ (type1: DataType, type2: DataType) => compatibleType(type1, type2))
+ StructField(name, dataType, true)
+ }
+ }
+ StructType(newFields.toSeq.sortBy {
+ case StructField(name, _, _) => name
+ })
+ }
+ case (ArrayType(elementType1), ArrayType(elementType2)) =>
+ ArrayType(compatibleType(elementType1, elementType2))
+ // TODO: We should use JsonObjectStringType to mark that values of field will be
+ // strings and every string is a Json object.
+ case (_, _) => StringType
+ }
+ }
+ }
+
+ private def typeOfPrimitiveValue(value: Any): DataType = {
+ value match {
+ case value: java.lang.String => StringType
+ case value: java.lang.Integer => IntegerType
+ case value: java.lang.Long => LongType
+ // Since we do not have a data type backed by BigInteger,
+ // when we see a Java BigInteger, we use DecimalType.
+ case value: java.math.BigInteger => DecimalType
+ case value: java.lang.Double => DoubleType
+ case value: java.math.BigDecimal => DecimalType
+ case value: java.lang.Boolean => BooleanType
+ case null => NullType
+ // Unexpected data type.
+ case _ => StringType
+ }
+ }
+
+ /**
+ * Returns the element type of an JSON array. We go through all elements of this array
+ * to detect any possible type conflict. We use [[compatibleType]] to resolve
+ * type conflicts. Right now, when the element of an array is another array, we
+ * treat the element as String.
+ */
+ private def typeOfArray(l: Seq[Any]): ArrayType = {
+ val elements = l.flatMap(v => Option(v))
+ if (elements.isEmpty) {
+ // If this JSON array is empty, we use NullType as a placeholder.
+ // If this array is not empty in other JSON objects, we can resolve
+ // the type after we have passed through all JSON objects.
+ ArrayType(NullType)
+ } else {
+ val elementType = elements.map {
+ e => e match {
+ case map: Map[_, _] => StructType(Nil)
+ // We have an array of arrays. If those element arrays do not have the same
+ // element types, we will return ArrayType[StringType].
+ case seq: Seq[_] => typeOfArray(seq)
+ case value => typeOfPrimitiveValue(value)
+ }
+ }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
+
+ ArrayType(elementType)
+ }
+ }
+
+ /**
+ * Figures out all key names and data types of values from a parsed JSON object
+ * (in the format of Map[Stirng, Any]). When the value of a key is an JSON object, we
+ * only use a placeholder (StructType(Nil)) to mark that it should be a struct
+ * instead of getting all fields of this struct because a field does not appear
+ * in this JSON object can appear in other JSON objects.
+ */
+ private def allKeysWithValueTypes(m: Map[String, Any]): Set[(String, DataType)] = {
+ m.map{
+ // Quote the key with backticks to handle cases which have dots
+ // in the field name.
+ case (key, dataType) => (s"`$key`", dataType)
+ }.flatMap {
+ case (key: String, struct: Map[String, Any]) => {
+ // The value associted with the key is an JSON object.
+ allKeysWithValueTypes(struct).map {
+ case (k, dataType) => (s"$key.$k", dataType)
+ } ++ Set((key, StructType(Nil)))
+ }
+ case (key: String, array: List[Any]) => {
+ // The value associted with the key is an array.
+ typeOfArray(array) match {
+ case ArrayType(StructType(Nil)) => {
+ // The elements of this arrays are structs.
+ array.asInstanceOf[List[Map[String, Any]]].flatMap {
+ element => allKeysWithValueTypes(element)
+ }.map {
+ case (k, dataType) => (s"$key.$k", dataType)
+ } :+ (key, ArrayType(StructType(Nil)))
+ }
+ case ArrayType(elementType) => (key, ArrayType(elementType)) :: Nil
+ }
+ }
+ case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
+ }.toSet
+ }
+
+ /**
+ * Converts a Java Map/List to a Scala Map/List.
+ * We do not use Jackson's scala module at here because
+ * DefaultScalaModule in jackson-module-scala will make
+ * the parsing very slow.
+ */
+ private def scalafy(obj: Any): Any = obj match {
+ case map: java.util.Map[String, Object] =>
+ // .map(identity) is used as a workaround of non-serializable Map
+ // generated by .mapValues.
+ // This issue is documented at https://issues.scala-lang.org/browse/SI-7005
+ map.toMap.mapValues(scalafy).map(identity)
+ case list: java.util.List[Object] =>
+ list.toList.map(scalafy)
+ case atom => atom
+ }
+
+ private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = {
+ // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
+ // ObjectMapper will not return BigDecimal when
+ // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
+ // (see NumberDeserializer.deserialize for the logic).
+ // But, we do not want to enable this feature because it will use BigDecimal
+ // for every float number, which will be slow.
+ // So, right now, we will have Infinity for those BigDecimal number.
+ // TODO: Support BigDecimal.
+ json.mapPartitions(iter => {
+ // When there is a key appearing multiple times (a duplicate key),
+ // the ObjectMapper will take the last value associated with this duplicate key.
+ // For example: for {"key": 1, "key":2}, we will get "key"->2.
+ val mapper = new ObjectMapper()
+ iter.map(record => mapper.readValue(record, classOf[java.util.Map[String, Any]]))
+ }).map(scalafy).map(_.asInstanceOf[Map[String, Any]])
+ }
+
+ private def toLong(value: Any): Long = {
+ value match {
+ case value: java.lang.Integer => value.asInstanceOf[Int].toLong
+ case value: java.lang.Long => value.asInstanceOf[Long]
+ }
+ }
+
+ private def toDouble(value: Any): Double = {
+ value match {
+ case value: java.lang.Integer => value.asInstanceOf[Int].toDouble
+ case value: java.lang.Long => value.asInstanceOf[Long].toDouble
+ case value: java.lang.Double => value.asInstanceOf[Double]
+ }
+ }
+
+ private def toDecimal(value: Any): BigDecimal = {
+ value match {
+ case value: java.lang.Integer => BigDecimal(value)
+ case value: java.lang.Long => BigDecimal(value)
+ case value: java.math.BigInteger => BigDecimal(value)
+ case value: java.lang.Double => BigDecimal(value)
+ case value: java.math.BigDecimal => BigDecimal(value)
+ }
+ }
+
+ private def toJsonArrayString(seq: Seq[Any]): String = {
+ val builder = new StringBuilder
+ builder.append("[")
+ var count = 0
+ seq.foreach {
+ element =>
+ if (count > 0) builder.append(",")
+ count += 1
+ builder.append(toString(element))
+ }
+ builder.append("]")
+
+ builder.toString()
+ }
+
+ private def toJsonObjectString(map: Map[String, Any]): String = {
+ val builder = new StringBuilder
+ builder.append("{")
+ var count = 0
+ map.foreach {
+ case (key, value) =>
+ if (count > 0) builder.append(",")
+ count += 1
+ builder.append(s"""\"${key}\":${toString(value)}""")
+ }
+ builder.append("}")
+
+ builder.toString()
+ }
+
+ private def toString(value: Any): String = {
+ value match {
+ case value: Map[String, Any] => toJsonObjectString(value)
+ case value: Seq[Any] => toJsonArrayString(value)
+ case value => Option(value).map(_.toString).orNull
+ }
+ }
+
+ private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
+ if (value == null) {
+ null
+ } else {
+ desiredType match {
+ case ArrayType(elementType) =>
+ value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
+ case StringType => toString(value)
+ case IntegerType => value.asInstanceOf[IntegerType.JvmType]
+ case LongType => toLong(value)
+ case DoubleType => toDouble(value)
+ case DecimalType => toDecimal(value)
+ case BooleanType => value.asInstanceOf[BooleanType.JvmType]
+ case NullType => null
+ }
+ }
+ }
+
+ private def asRow(json: Map[String,Any], schema: StructType): Row = {
+ val row = new GenericMutableRow(schema.fields.length)
+ schema.fields.zipWithIndex.foreach {
+ // StructType
+ case (StructField(name, fields: StructType, _), i) =>
+ row.update(i, json.get(name).flatMap(v => Option(v)).map(
+ v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
+
+ // ArrayType(StructType)
+ case (StructField(name, ArrayType(structType: StructType), _), i) =>
+ row.update(i,
+ json.get(name).flatMap(v => Option(v)).map(
+ v => v.asInstanceOf[Seq[Any]].map(
+ e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull)
+
+ // Other cases
+ case (StructField(name, dataType, _), i) =>
+ row.update(i, json.get(name).flatMap(v => Option(v)).map(
+ enforceCorrectType(_, dataType)).getOrElse(null))
+ }
+
+ row
+ }
+
+ private def nullTypeToStringType(struct: StructType): StructType = {
+ val fields = struct.fields.map {
+ case StructField(fieldName, dataType, nullable) => {
+ val newType = dataType match {
+ case NullType => StringType
+ case ArrayType(NullType) => ArrayType(StringType)
+ case struct: StructType => nullTypeToStringType(struct)
+ case other: DataType => other
+ }
+ StructField(fieldName, newType, nullable)
+ }
+ }
+
+ StructType(fields)
+ }
+
+ private def asAttributes(struct: StructType): Seq[AttributeReference] = {
+ struct.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
+ }
+
+ private def asStruct(attributes: Seq[AttributeReference]): StructType = {
+ val fields = attributes.map {
+ case AttributeReference(name, dataType, nullable) => StructField(name, dataType, nullable)
+ }
+
+ StructType(fields)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index d7f6abaf5d..ef84ead2e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -17,12 +17,10 @@
package org.apache.spark.sql
-import org.scalatest.FunSuite
-
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
-class QueryTest extends FunSuite {
+class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[SchemaRDD]] to be executed
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
index 9fff7222fe..020baf0c7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
@@ -22,6 +22,7 @@ import scala.beans.BeanProperty
import org.scalatest.FunSuite
import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.TestSQLContext
// Implicits
@@ -111,4 +112,48 @@ class JavaSQLSuite extends FunSuite {
""".stripMargin).collect.head.row ===
Seq.fill(8)(null))
}
+
+ test("loads JSON datasets") {
+ val jsonString =
+ """{"string":"this is a simple string.",
+ "integer":10,
+ "long":21474836470,
+ "bigInteger":92233720368547758070,
+ "double":1.7976931348623157E308,
+ "boolean":true,
+ "null":null
+ }""".replaceAll("\n", " ")
+ val rdd = javaCtx.parallelize(jsonString :: Nil)
+
+ var schemaRDD = javaSqlCtx.jsonRDD(rdd)
+
+ schemaRDD.registerAsTable("jsonTable1")
+
+ assert(
+ javaSqlCtx.sql("select * from jsonTable1").collect.head.row ===
+ Seq(BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string."))
+
+ val file = getTempFilePath("json")
+ val path = file.toString
+ rdd.saveAsTextFile(path)
+ schemaRDD = javaSqlCtx.jsonFile(path)
+
+ schemaRDD.registerAsTable("jsonTable2")
+
+ assert(
+ javaSqlCtx.sql("select * from jsonTable2").collect.head.row ===
+ Seq(BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string."))
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
new file mode 100644
index 0000000000..10bd9f08f0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.TestSQLContext._
+
+protected case class Schema(output: Seq[Attribute]) extends LeafNode
+
+class JsonSuite extends QueryTest {
+ import TestJsonData._
+ TestJsonData
+
+ test("Type promotion") {
+ def checkTypePromotion(expected: Any, actual: Any) {
+ assert(expected.getClass == actual.getClass,
+ s"Failed to promote ${actual.getClass} to ${expected.getClass}.")
+ assert(expected == actual,
+ s"Promoted value ${actual}(${actual.getClass}) does not equal the expected value " +
+ s"${expected}(${expected.getClass}).")
+ }
+
+ val intNumber: Int = 2147483647
+ checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType))
+ checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType))
+ checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType))
+ checkTypePromotion(BigDecimal(intNumber), enforceCorrectType(intNumber, DecimalType))
+
+ val longNumber: Long = 9223372036854775807L
+ checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType))
+ checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType))
+ checkTypePromotion(BigDecimal(longNumber), enforceCorrectType(longNumber, DecimalType))
+
+ val doubleNumber: Double = 1.7976931348623157E308d
+ checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
+ checkTypePromotion(BigDecimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType))
+ }
+
+ test("Get compatible type") {
+ def checkDataType(t1: DataType, t2: DataType, expected: DataType) {
+ var actual = compatibleType(t1, t2)
+ assert(actual == expected,
+ s"Expected $expected as the most general data type for $t1 and $t2, found $actual")
+ actual = compatibleType(t2, t1)
+ assert(actual == expected,
+ s"Expected $expected as the most general data type for $t1 and $t2, found $actual")
+ }
+
+ // NullType
+ checkDataType(NullType, BooleanType, BooleanType)
+ checkDataType(NullType, IntegerType, IntegerType)
+ checkDataType(NullType, LongType, LongType)
+ checkDataType(NullType, DoubleType, DoubleType)
+ checkDataType(NullType, DecimalType, DecimalType)
+ checkDataType(NullType, StringType, StringType)
+ checkDataType(NullType, ArrayType(IntegerType), ArrayType(IntegerType))
+ checkDataType(NullType, StructType(Nil), StructType(Nil))
+ checkDataType(NullType, NullType, NullType)
+
+ // BooleanType
+ checkDataType(BooleanType, BooleanType, BooleanType)
+ checkDataType(BooleanType, IntegerType, StringType)
+ checkDataType(BooleanType, LongType, StringType)
+ checkDataType(BooleanType, DoubleType, StringType)
+ checkDataType(BooleanType, DecimalType, StringType)
+ checkDataType(BooleanType, StringType, StringType)
+ checkDataType(BooleanType, ArrayType(IntegerType), StringType)
+ checkDataType(BooleanType, StructType(Nil), StringType)
+
+ // IntegerType
+ checkDataType(IntegerType, IntegerType, IntegerType)
+ checkDataType(IntegerType, LongType, LongType)
+ checkDataType(IntegerType, DoubleType, DoubleType)
+ checkDataType(IntegerType, DecimalType, DecimalType)
+ checkDataType(IntegerType, StringType, StringType)
+ checkDataType(IntegerType, ArrayType(IntegerType), StringType)
+ checkDataType(IntegerType, StructType(Nil), StringType)
+
+ // LongType
+ checkDataType(LongType, LongType, LongType)
+ checkDataType(LongType, DoubleType, DoubleType)
+ checkDataType(LongType, DecimalType, DecimalType)
+ checkDataType(LongType, StringType, StringType)
+ checkDataType(LongType, ArrayType(IntegerType), StringType)
+ checkDataType(LongType, StructType(Nil), StringType)
+
+ // DoubleType
+ checkDataType(DoubleType, DoubleType, DoubleType)
+ checkDataType(DoubleType, DecimalType, DecimalType)
+ checkDataType(DoubleType, StringType, StringType)
+ checkDataType(DoubleType, ArrayType(IntegerType), StringType)
+ checkDataType(DoubleType, StructType(Nil), StringType)
+
+ // DoubleType
+ checkDataType(DecimalType, DecimalType, DecimalType)
+ checkDataType(DecimalType, StringType, StringType)
+ checkDataType(DecimalType, ArrayType(IntegerType), StringType)
+ checkDataType(DecimalType, StructType(Nil), StringType)
+
+ // StringType
+ checkDataType(StringType, StringType, StringType)
+ checkDataType(StringType, ArrayType(IntegerType), StringType)
+ checkDataType(StringType, StructType(Nil), StringType)
+
+ // ArrayType
+ checkDataType(ArrayType(IntegerType), ArrayType(IntegerType), ArrayType(IntegerType))
+ checkDataType(ArrayType(IntegerType), ArrayType(LongType), ArrayType(LongType))
+ checkDataType(ArrayType(IntegerType), ArrayType(StringType), ArrayType(StringType))
+ checkDataType(ArrayType(IntegerType), StructType(Nil), StringType)
+
+ // StructType
+ checkDataType(StructType(Nil), StructType(Nil), StructType(Nil))
+ checkDataType(
+ StructType(StructField("f1", IntegerType, true) :: Nil),
+ StructType(StructField("f1", IntegerType, true) :: Nil),
+ StructType(StructField("f1", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(StructField("f1", IntegerType, true) :: Nil),
+ StructType(Nil),
+ StructType(StructField("f1", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(
+ StructField("f1", IntegerType, true) ::
+ StructField("f2", IntegerType, true) :: Nil),
+ StructType(StructField("f1", LongType, true) :: Nil) ,
+ StructType(
+ StructField("f1", LongType, true) ::
+ StructField("f2", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(
+ StructField("f1", IntegerType, true) :: Nil),
+ StructType(
+ StructField("f2", IntegerType, true) :: Nil),
+ StructType(
+ StructField("f1", IntegerType, true) ::
+ StructField("f2", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(
+ StructField("f1", IntegerType, true) :: Nil),
+ DecimalType,
+ StringType)
+ }
+
+ test("Primitive field and type inferring") {
+ val jsonSchemaRDD = jsonRDD(primitiveFieldAndType)
+
+ val expectedSchema =
+ AttributeReference("bigInteger", DecimalType, true)() ::
+ AttributeReference("boolean", BooleanType, true)() ::
+ AttributeReference("double", DoubleType, true)() ::
+ AttributeReference("integer", IntegerType, true)() ::
+ AttributeReference("long", LongType, true)() ::
+ AttributeReference("null", StringType, true)() ::
+ AttributeReference("string", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ (BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string.") :: Nil
+ )
+ }
+
+ test("Complex field and type inferring") {
+ val jsonSchemaRDD = jsonRDD(complexFieldAndType)
+
+ val expectedSchema =
+ AttributeReference("arrayOfArray1", ArrayType(ArrayType(StringType)), true)() ::
+ AttributeReference("arrayOfArray2", ArrayType(ArrayType(DoubleType)), true)() ::
+ AttributeReference("arrayOfBigInteger", ArrayType(DecimalType), true)() ::
+ AttributeReference("arrayOfBoolean", ArrayType(BooleanType), true)() ::
+ AttributeReference("arrayOfDouble", ArrayType(DoubleType), true)() ::
+ AttributeReference("arrayOfInteger", ArrayType(IntegerType), true)() ::
+ AttributeReference("arrayOfLong", ArrayType(LongType), true)() ::
+ AttributeReference("arrayOfNull", ArrayType(StringType), true)() ::
+ AttributeReference("arrayOfString", ArrayType(StringType), true)() ::
+ AttributeReference("arrayOfStruct", ArrayType(
+ StructType(StructField("field1", BooleanType, true) ::
+ StructField("field2", StringType, true) :: Nil)), true)() ::
+ AttributeReference("struct", StructType(
+ StructField("field1", BooleanType, true) ::
+ StructField("field2", DecimalType, true) :: Nil), true)() ::
+ AttributeReference("structWithArrayFields", StructType(
+ StructField("field1", ArrayType(IntegerType), true) ::
+ StructField("field2", ArrayType(StringType), true) :: Nil), true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ // Access elements of a primitive array.
+ checkAnswer(
+ sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"),
+ ("str1", "str2", null) :: Nil
+ )
+
+ // Access an array of null values.
+ checkAnswer(
+ sql("select arrayOfNull from jsonTable"),
+ Seq(Seq(null, null, null, null)) :: Nil
+ )
+
+ // Access elements of a BigInteger array (we use DecimalType internally).
+ checkAnswer(
+ sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"),
+ (BigDecimal("922337203685477580700"), BigDecimal("-922337203685477580800"), null) :: Nil
+ )
+
+ // Access elements of an array of arrays.
+ checkAnswer(
+ sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"),
+ (Seq("1", "2", "3"), Seq("str1", "str2")) :: Nil
+ )
+
+ // Access elements of an array of arrays.
+ checkAnswer(
+ sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"),
+ (Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) :: Nil
+ )
+
+ // Access elements of an array inside a filed with the type of ArrayType(ArrayType).
+ checkAnswer(
+ sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"),
+ ("str2", 2.1) :: Nil
+ )
+
+ // Access elements of an array of structs.
+ checkAnswer(
+ sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2] from jsonTable"),
+ (true :: "str1" :: Nil, false :: null :: Nil, null) :: Nil
+ )
+
+ // Access a struct and fields inside of it.
+ checkAnswer(
+ sql("select struct, struct.field1, struct.field2 from jsonTable"),
+ (
+ Seq(true, BigDecimal("92233720368547758070")),
+ true,
+ BigDecimal("92233720368547758070")) :: Nil
+ )
+
+ // Access an array field of a struct.
+ checkAnswer(
+ sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"),
+ (Seq(4, 5, 6), Seq("str1", "str2")) :: Nil
+ )
+
+ // Access elements of an array field of a struct.
+ checkAnswer(
+ sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"),
+ (5, null) :: Nil
+ )
+ }
+
+ ignore("Complex field and type inferring (Ignored)") {
+ val jsonSchemaRDD = jsonRDD(complexFieldAndType)
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ // Right now, "field1" and "field2" are treated as aliases. We should fix it.
+ checkAnswer(
+ sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
+ (true, "str1") :: Nil
+ )
+
+ // Right now, the analyzer cannot resolve arrayOfStruct.field1 and arrayOfStruct.field2.
+ // Getting all values of a specific field from an array of structs.
+ checkAnswer(
+ sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"),
+ (Seq(true, false), Seq("str1", null)) :: Nil
+ )
+ }
+
+ test("Type conflict in primitive field values") {
+ val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict)
+
+ val expectedSchema =
+ AttributeReference("num_bool", StringType, true)() ::
+ AttributeReference("num_num_1", LongType, true)() ::
+ AttributeReference("num_num_2", DecimalType, true)() ::
+ AttributeReference("num_num_3", DoubleType, true)() ::
+ AttributeReference("num_str", StringType, true)() ::
+ AttributeReference("str_bool", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ ("true", 11L, null, 1.1, "13.1", "str1") ::
+ ("12", null, BigDecimal("21474836470.9"), null, null, "true") ::
+ ("false", 21474836470L, BigDecimal("92233720368547758070"), 100, "str1", "false") ::
+ (null, 21474836570L, BigDecimal(1.1), 21474836470L, "92233720368547758070", null) :: Nil
+ )
+
+ // Number and Boolean conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_bool - 10 from jsonTable where num_bool > 11"),
+ 2
+ )
+
+ // Widening to LongType
+ checkAnswer(
+ sql("select num_num_1 - 100 from jsonTable where num_num_1 > 11"),
+ Seq(21474836370L) :: Seq(21474836470L) :: Nil
+ )
+
+ checkAnswer(
+ sql("select num_num_1 - 100 from jsonTable where num_num_1 > 10"),
+ Seq(-89) :: Seq(21474836370L) :: Seq(21474836470L) :: Nil
+ )
+
+ // Widening to DecimalType
+ checkAnswer(
+ sql("select num_num_2 + 1.2 from jsonTable where num_num_2 > 1.1"),
+ Seq(BigDecimal("21474836472.1")) :: Seq(BigDecimal("92233720368547758071.2")) :: Nil
+ )
+
+ // Widening to DoubleType
+ checkAnswer(
+ sql("select num_num_3 + 1.2 from jsonTable where num_num_3 > 1.1"),
+ Seq(101.2) :: Seq(21474836471.2) :: Nil
+ )
+
+ // Number and String conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_str + 1.2 from jsonTable where num_str > 14"),
+ 92233720368547758071.2
+ )
+
+ // String and Boolean conflict: resolve the type as string.
+ checkAnswer(
+ sql("select * from jsonTable where str_bool = 'str1'"),
+ ("true", 11L, null, 1.1, "13.1", "str1") :: Nil
+ )
+ }
+
+ ignore("Type conflict in primitive field values (Ignored)") {
+ val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict)
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ // Right now, the analyzer does not promote strings in a boolean expreesion.
+ // Number and Boolean conflict: resolve the type as boolean in this query.
+ checkAnswer(
+ sql("select num_bool from jsonTable where NOT num_bool"),
+ false
+ )
+
+ checkAnswer(
+ sql("select str_bool from jsonTable where NOT str_bool"),
+ false
+ )
+
+ // Right now, the analyzer does not know that num_bool should be treated as a boolean.
+ // Number and Boolean conflict: resolve the type as boolean in this query.
+ checkAnswer(
+ sql("select num_bool from jsonTable where num_bool"),
+ true
+ )
+
+ checkAnswer(
+ sql("select str_bool from jsonTable where str_bool"),
+ false
+ )
+
+ // Right now, we have a parsing error.
+ // Number and String conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_str + 1.2 from jsonTable where num_str > 92233720368547758060"),
+ BigDecimal("92233720368547758061.2")
+ )
+
+ // The plan of the following DSL is
+ // Project [(CAST(num_str#65:4, DoubleType) + 1.2) AS num#78]
+ // Filter (CAST(CAST(num_str#65:4, DoubleType), DecimalType) > 92233720368547758060)
+ // ExistingRdd [num_bool#61,num_num_1#62L,num_num_2#63,num_num_3#64,num_str#65,str_bool#66]
+ // We should directly cast num_str to DecimalType and also need to do the right type promotion
+ // in the Project.
+ checkAnswer(
+ jsonSchemaRDD.
+ where('num_str > BigDecimal("92233720368547758060")).
+ select('num_str + 1.2 as Symbol("num")),
+ BigDecimal("92233720368547758061.2")
+ )
+
+ // The following test will fail. The type of num_str is StringType.
+ // So, to evaluate num_str + 1.2, we first need to use Cast to convert the type.
+ // In our test data, one value of num_str is 13.1.
+ // The result of (CAST(num_str#65:4, DoubleType) + 1.2) for this value is 14.299999999999999,
+ // which is not 14.3.
+ // Number and String conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_str + 1.2 from jsonTable where num_str > 13"),
+ Seq(14.3) :: Seq(92233720368547758071.2) :: Nil
+ )
+ }
+
+ test("Type conflict in complex field values") {
+ val jsonSchemaRDD = jsonRDD(complexFieldValueTypeConflict)
+
+ val expectedSchema =
+ AttributeReference("array", ArrayType(IntegerType), true)() ::
+ AttributeReference("num_struct", StringType, true)() ::
+ AttributeReference("str_array", StringType, true)() ::
+ AttributeReference("struct", StructType(
+ StructField("field", StringType, true) :: Nil), true)() ::
+ AttributeReference("struct_array", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ (Seq(), "11", "[1,2,3]", Seq(null), "[]") ::
+ (null, """{"field":false}""", null, null, "{}") ::
+ (Seq(4, 5, 6), null, "str", Seq(null), "[7,8,9]") ::
+ (Seq(7), "{}","[str1,str2,33]", Seq("str"), """{"field":true}""") :: Nil
+ )
+ }
+
+ test("Type conflict in array elements") {
+ val jsonSchemaRDD = jsonRDD(arrayElementTypeConflict)
+
+ val expectedSchema =
+ AttributeReference("array", ArrayType(StringType), true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ Seq(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]",
+ """{"field":str}""")) :: Nil
+ )
+
+ // Treat an element as a number.
+ checkAnswer(
+ sql("select array[0] + 1 from jsonTable"),
+ 2
+ )
+ }
+
+ test("Handling missing fields") {
+ val jsonSchemaRDD = jsonRDD(missingFields)
+
+ val expectedSchema =
+ AttributeReference("a", BooleanType, true)() ::
+ AttributeReference("b", LongType, true)() ::
+ AttributeReference("c", ArrayType(IntegerType), true)() ::
+ AttributeReference("d", StructType(
+ StructField("field", BooleanType, true) :: Nil), true)() ::
+ AttributeReference("e", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+ }
+
+ test("Loading a JSON dataset from a text file") {
+ val file = getTempFilePath("json")
+ val path = file.toString
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ val jsonSchemaRDD = jsonFile(path)
+
+ val expectedSchema =
+ AttributeReference("bigInteger", DecimalType, true)() ::
+ AttributeReference("boolean", BooleanType, true)() ::
+ AttributeReference("double", DoubleType, true)() ::
+ AttributeReference("integer", IntegerType, true)() ::
+ AttributeReference("long", LongType, true)() ::
+ AttributeReference("null", StringType, true)() ::
+ AttributeReference("string", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ (BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string.") :: Nil
+ )
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
new file mode 100644
index 0000000000..065e04046e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import org.apache.spark.sql.test.TestSQLContext
+
+object TestJsonData {
+
+ val primitiveFieldAndType =
+ TestSQLContext.sparkContext.parallelize(
+ """{"string":"this is a simple string.",
+ "integer":10,
+ "long":21474836470,
+ "bigInteger":92233720368547758070,
+ "double":1.7976931348623157E308,
+ "boolean":true,
+ "null":null
+ }""" :: Nil)
+
+ val complexFieldAndType =
+ TestSQLContext.sparkContext.parallelize(
+ """{"struct":{"field1": true, "field2": 92233720368547758070},
+ "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
+ "arrayOfString":["str1", "str2"],
+ "arrayOfInteger":[1, 2147483647, -2147483648],
+ "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808],
+ "arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
+ "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
+ "arrayOfBoolean":[true, false, true],
+ "arrayOfNull":[null, null, null, null],
+ "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}],
+ "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
+ "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
+ }""" :: Nil)
+
+ val primitiveFieldValueTypeConflict =
+ TestSQLContext.sparkContext.parallelize(
+ """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
+ "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" ::
+ """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null,
+ "num_bool":12, "num_str":null, "str_bool":true}""" ::
+ """{"num_num_1":21474836470, "num_num_2":92233720368547758070, "num_num_3": 100,
+ "num_bool":false, "num_str":"str1", "str_bool":false}""" ::
+ """{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470,
+ "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil)
+
+ val complexFieldValueTypeConflict =
+ TestSQLContext.sparkContext.parallelize(
+ """{"num_struct":11, "str_array":[1, 2, 3],
+ "array":[], "struct_array":[], "struct": {}}""" ::
+ """{"num_struct":{"field":false}, "str_array":null,
+ "array":null, "struct_array":{}, "struct": null}""" ::
+ """{"num_struct":null, "str_array":"str",
+ "array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" ::
+ """{"num_struct":{}, "str_array":["str1", "str2", 33],
+ "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil)
+
+ val arrayElementTypeConflict =
+ TestSQLContext.sparkContext.parallelize(
+ """{"array": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}]}""" :: Nil)
+
+ val missingFields =
+ TestSQLContext.sparkContext.parallelize(
+ """{"a":true}""" ::
+ """{"b":21474836470}""" ::
+ """{"c":[33, 44]}""" ::
+ """{"d":{"field":true}}""" ::
+ """{"e":"str"}""" :: Nil)
+}