diff options
author | Yin Huai <huai@cse.ohio-state.edu> | 2014-06-17 19:14:59 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-17 19:14:59 -0700 |
commit | d2f4f30b12f99358953e2781957468e2cfe3c916 (patch) | |
tree | 405b949a2968dba2c73874bd2fefc9d10206e731 /sql/catalyst | |
parent | b2ebf429e24566c29850c570f8d76943151ad78c (diff) | |
download | spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.gz spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.bz2 spark-d2f4f30b12f99358953e2781957468e2cfe3c916.zip |
[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2060
Programming guide: http://yhuai.github.io/site/sql-programming-guide.html
Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes #999 from yhuai/newJson and squashes the following commits:
227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ce8eedd [Yin Huai] rxin's comments.
bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
94ffdaa [Yin Huai] Remove "get" from method names.
ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
79ea9ba [Yin Huai] Fix typos.
5428451 [Yin Huai] Newline
1f908ce [Yin Huai] Remove extra line.
d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
7ea750e [Yin Huai] marmbrus's comments.
6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
83013fb [Yin Huai] Update Java Example.
e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map.
6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4fbddf0 [Yin Huai] Programming guide.
9df8c5a [Yin Huai] Python API.
7027634 [Yin Huai] Java API.
cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset.
d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ab810b0 [Yin Huai] Make JsonRDD private.
6df0891 [Yin Huai] Apache header.
8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema.
8ffed79 [Yin Huai] Update the example.
a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution.
65b87f0 [Yin Huai] Fix sampling...
8846af5 [Yin Huai] API doc.
52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
0387523 [Yin Huai] Address PR comments.
666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
a2313a6 [Yin Huai] Address PR comments.
f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used.
0576406 [Yin Huai] Add Apache license header.
af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD.
f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/pom.xml | 28 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 25 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 51 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala | 3 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala | 3 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 5 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala | 3 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala) | 9 |
8 files changed, 108 insertions, 19 deletions
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 6c78c34486..01d7b56908 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -66,6 +66,34 @@ <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> </plugin> + + <!-- + This plugin forces the generation of jar containing catalyst test classes, + so that the tests classes of external modules can use them. The two execution profiles + are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally, + 'mvn compile' should not compile test classes and therefore should not need this. + However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559) + causes the compilation to fail if catalyst test-jar is not generated. Hence, the + second execution profile for 'mvn compile'. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + <execution> + <id>test-jar-on-compile</id> + <phase>compile</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index d291814c8a..66bff660ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.types._ +object HiveTypeCoercion { + // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types. + // The conversion for integral and floating point types have a linear widening hierarchy: + val numericPrecedence = + Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType) + // Boolean is only wider than Void + val booleanPrecedence = Seq(NullType, BooleanType) + val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil +} + /** * A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that * participate in operations into compatible ones. Most of these rules are based on Hive semantics, @@ -116,19 +126,18 @@ trait HiveTypeCoercion { * * Additionally, all types when UNION-ed with strings will be promoted to strings. * Other string conversions are handled by PromoteStrings. + * + * Widening types might result in loss of precision in the following cases: + * - IntegerType to FloatType + * - LongType to FloatType + * - LongType to DoubleType */ object WidenTypes extends Rule[LogicalPlan] { - // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types. - // The conversion for integral and floating point types have a linear widening hierarchy: - val numericPrecedence = - Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType) - // Boolean is only wider than Void - val booleanPrecedence = Seq(NullType, BooleanType) - val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = { // Try and find a promotion rule that contains both types in question. - val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2)) + val applicableConversion = + HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2)) // If found return the widest common type, otherwise None applicableConversion.map(_.filter(t => t == t1 || t == t2).last) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 8199a80f5d..00e2d3bc24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType} abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] { self: PlanType with Product => @@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy case other => Nil }.toSeq } + + protected def generateSchemaString(schema: Seq[Attribute]): String = { + val builder = new StringBuilder + builder.append("root\n") + val prefix = " |" + schema.foreach { attribute => + val name = attribute.name + val dataType = attribute.dataType + dataType match { + case fields: StructType => + builder.append(s"$prefix-- $name: $StructType\n") + generateSchemaString(fields, s"$prefix |", builder) + case ArrayType(fields: StructType) => + builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n") + generateSchemaString(fields, s"$prefix |", builder) + case ArrayType(elementType: DataType) => + builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n") + case _ => builder.append(s"$prefix-- $name: $dataType\n") + } + } + + builder.toString() + } + + protected def generateSchemaString( + schema: StructType, + prefix: String, + builder: StringBuilder): StringBuilder = { + schema.fields.foreach { + case StructField(name, fields: StructType, _) => + builder.append(s"$prefix-- $name: $StructType\n") + generateSchemaString(fields, s"$prefix |", builder) + case StructField(name, ArrayType(fields: StructType), _) => + builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n") + generateSchemaString(fields, s"$prefix |", builder) + case StructField(name, ArrayType(elementType: DataType), _) => + builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n") + case StructField(name, fieldType: DataType, _) => + builder.append(s"$prefix-- $name: $fieldType\n") + } + + builder + } + + /** Returns the output schema in the tree format. */ + def schemaString: String = generateSchemaString(output) + + /** Prints out the schema in the tree format */ + def printSchema(): Unit = println(schemaString) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 714f01843c..4896f1b955 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ -class CombiningLimitsSuite extends OptimizerTest { +class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 6efc0e211e..cea97c584f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types._ @@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ -class ConstantFoldingSuite extends OptimizerTest { +class ConstantFoldingSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 1f67c80e54..ebb123c1f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.LeftOuter -import org.apache.spark.sql.catalyst.plans.RightOuter +import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ -class FilterPushdownSuite extends OptimizerTest { +class FilterPushdownSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala index df1409fe7b..22992fb6f5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.rules._ /* Implicit conversions */ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -class SimplifyCaseConversionExpressionsSuite extends OptimizerTest { +class SimplifyCaseConversionExpressionsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 89982d5cd8..7e9f47ef21 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.optimizer +package org.apache.spark.sql.catalyst.plans import org.scalatest.FunSuite -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ /** - * Provides helper methods for comparing plans produced by optimization rules with the expected - * result + * Provides helper methods for comparing plans. */ -class OptimizerTest extends FunSuite { +class PlanTest extends FunSuite { /** * Since attribute references are given globally unique ids during analysis, |