aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-06-17 19:14:59 -0700
committerReynold Xin <rxin@apache.org>2014-06-17 19:14:59 -0700
commitd2f4f30b12f99358953e2781957468e2cfe3c916 (patch)
tree405b949a2968dba2c73874bd2fefc9d10206e731 /sql/catalyst
parentb2ebf429e24566c29850c570f8d76943151ad78c (diff)
downloadspark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.gz
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.bz2
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.zip
[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2060 Programming guide: http://yhuai.github.io/site/sql-programming-guide.html Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext Author: Yin Huai <huai@cse.ohio-state.edu> Closes #999 from yhuai/newJson and squashes the following commits: 227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ce8eedd [Yin Huai] rxin's comments. bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 94ffdaa [Yin Huai] Remove "get" from method names. ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 79ea9ba [Yin Huai] Fix typos. 5428451 [Yin Huai] Newline 1f908ce [Yin Huai] Remove extra line. d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 7ea750e [Yin Huai] marmbrus's comments. 6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 83013fb [Yin Huai] Update Java Example. e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map. 6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4fbddf0 [Yin Huai] Programming guide. 9df8c5a [Yin Huai] Python API. 7027634 [Yin Huai] Java API. cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset. d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ab810b0 [Yin Huai] Make JsonRDD private. 6df0891 [Yin Huai] Apache header. 8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema. 8ffed79 [Yin Huai] Update the example. a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution. 65b87f0 [Yin Huai] Fix sampling... 8846af5 [Yin Huai] API doc. 52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 0387523 [Yin Huai] Address PR comments. 666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson a2313a6 [Yin Huai] Address PR comments. f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used. 0576406 [Yin Huai] Add Apache license header. af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD. f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/pom.xml28
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala25
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala51
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala)9
8 files changed, 108 insertions, 19 deletions
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 6c78c34486..01d7b56908 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -66,6 +66,34 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
+
+ <!--
+ This plugin forces the generation of jar containing catalyst test classes,
+ so that the tests classes of external modules can use them. The two execution profiles
+ are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally,
+ 'mvn compile' should not compile test classes and therefore should not need this.
+ However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559)
+ causes the compilation to fail if catalyst test-jar is not generated. Hence, the
+ second execution profile for 'mvn compile'.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>test-jar-on-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index d291814c8a..66bff660ca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._
+object HiveTypeCoercion {
+ // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
+ // The conversion for integral and floating point types have a linear widening hierarchy:
+ val numericPrecedence =
+ Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
+ // Boolean is only wider than Void
+ val booleanPrecedence = Seq(NullType, BooleanType)
+ val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
+}
+
/**
* A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
* participate in operations into compatible ones. Most of these rules are based on Hive semantics,
@@ -116,19 +126,18 @@ trait HiveTypeCoercion {
*
* Additionally, all types when UNION-ed with strings will be promoted to strings.
* Other string conversions are handled by PromoteStrings.
+ *
+ * Widening types might result in loss of precision in the following cases:
+ * - IntegerType to FloatType
+ * - LongType to FloatType
+ * - LongType to DoubleType
*/
object WidenTypes extends Rule[LogicalPlan] {
- // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
- // The conversion for integral and floating point types have a linear widening hierarchy:
- val numericPrecedence =
- Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
- // Boolean is only wider than Void
- val booleanPrecedence = Seq(NullType, BooleanType)
- val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
// Try and find a promotion rule that contains both types in question.
- val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2))
+ val applicableConversion =
+ HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
// If found return the widest common type, otherwise None
applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 8199a80f5d..00e2d3bc24 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType}
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType with Product =>
@@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
case other => Nil
}.toSeq
}
+
+ protected def generateSchemaString(schema: Seq[Attribute]): String = {
+ val builder = new StringBuilder
+ builder.append("root\n")
+ val prefix = " |"
+ schema.foreach { attribute =>
+ val name = attribute.name
+ val dataType = attribute.dataType
+ dataType match {
+ case fields: StructType =>
+ builder.append(s"$prefix-- $name: $StructType\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case ArrayType(fields: StructType) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case ArrayType(elementType: DataType) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+ case _ => builder.append(s"$prefix-- $name: $dataType\n")
+ }
+ }
+
+ builder.toString()
+ }
+
+ protected def generateSchemaString(
+ schema: StructType,
+ prefix: String,
+ builder: StringBuilder): StringBuilder = {
+ schema.fields.foreach {
+ case StructField(name, fields: StructType, _) =>
+ builder.append(s"$prefix-- $name: $StructType\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case StructField(name, ArrayType(fields: StructType), _) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case StructField(name, ArrayType(elementType: DataType), _) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+ case StructField(name, fieldType: DataType, _) =>
+ builder.append(s"$prefix-- $name: $fieldType\n")
+ }
+
+ builder
+ }
+
+ /** Returns the output schema in the tree format. */
+ def schemaString: String = generateSchemaString(output)
+
+ /** Prints out the schema in the tree format */
+ def printSchema(): Unit = println(schemaString)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index 714f01843c..4896f1b955 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
@@ -18,11 +18,12 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class CombiningLimitsSuite extends OptimizerTest {
+class CombiningLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 6efc0e211e..cea97c584f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types._
@@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class ConstantFoldingSuite extends OptimizerTest {
+class ConstantFoldingSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 1f67c80e54..ebb123c1f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.LeftOuter
-import org.apache.spark.sql.catalyst.plans.RightOuter
+import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class FilterPushdownSuite extends OptimizerTest {
+class FilterPushdownSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
index df1409fe7b..22992fb6f5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-class SimplifyCaseConversionExpressionsSuite extends OptimizerTest {
+class SimplifyCaseConversionExpressionsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 89982d5cd8..7e9f47ef21 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.optimizer
+package org.apache.spark.sql.catalyst.plans
import org.scalatest.FunSuite
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
/**
- * Provides helper methods for comparing plans produced by optimization rules with the expected
- * result
+ * Provides helper methods for comparing plans.
*/
-class OptimizerTest extends FunSuite {
+class PlanTest extends FunSuite {
/**
* Since attribute references are given globally unique ids during analysis,