diff options
author | Michael Armbrust <michael@databricks.com> | 2014-04-04 17:23:17 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-04-04 17:23:17 -0700 |
commit | d956cc251676d67d87bd6dbfa82be864933d8136 (patch) | |
tree | dd213f0ff11a6dcd119037665be63d5234f6a447 /sql/core | |
parent | 198892fe8d39a2fad585fa2a7579d8b478456c33 (diff) | |
download | spark-d956cc251676d67d87bd6dbfa82be864933d8136.tar.gz spark-d956cc251676d67d87bd6dbfa82be864933d8136.tar.bz2 spark-d956cc251676d67d87bd6dbfa82be864933d8136.zip |
[SQL] Minor fixes.
Author: Michael Armbrust <michael@databricks.com>
Closes #315 from marmbrus/minorFixes and squashes the following commits:
b23a15d [Michael Armbrust] fix scaladoc
11062ac [Michael Armbrust] Fix registering "SELECT *" queries as tables and caching them. As some tests for this and self-joins.
3997dc9 [Michael Armbrust] Move Row extractor to catalyst.
208bf5e [Michael Armbrust] More idiomatic naming of DSL functions. * subquery => as * for join condition => on, i.e., `r.join(s, condition = 'a == 'b)` =>`r.join(s, on = 'a == 'b)`
87211ce [Michael Armbrust] Correctly handle self joins of in-memory cached tables.
69e195e [Michael Armbrust] Change != to !== in the DSL since != will always translate to != on Any.
01f2dd5 [Michael Armbrust] Correctly assign aliases to tables in SqlParser.
Diffstat (limited to 'sql/core')
5 files changed, 34 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index a62cb8aa13..fc95781448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -148,17 +148,17 @@ class SchemaRDD( * * @param otherPlan the [[SchemaRDD]] that should be joined with this one. * @param joinType One of `Inner`, `LeftOuter`, `RightOuter`, or `FullOuter`. Defaults to `Inner.` - * @param condition An optional condition for the join operation. This is equivilent to the `ON` - * clause in standard SQL. In the case of `Inner` joins, specifying a - * `condition` is equivilent to adding `where` clauses after the `join`. + * @param on An optional condition for the join operation. This is equivilent to the `ON` + * clause in standard SQL. In the case of `Inner` joins, specifying a + * `condition` is equivilent to adding `where` clauses after the `join`. * * @group Query */ def join( otherPlan: SchemaRDD, joinType: JoinType = Inner, - condition: Option[Expression] = None): SchemaRDD = - new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, condition)) + on: Option[Expression] = None): SchemaRDD = + new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, on)) /** * Sorts the results by the given expressions. @@ -195,14 +195,14 @@ class SchemaRDD( * with the same name, for example, when peforming self-joins. * * {{{ - * val x = schemaRDD.where('a === 1).subquery('x) - * val y = schemaRDD.where('a === 2).subquery('y) + * val x = schemaRDD.where('a === 1).as('x) + * val y = schemaRDD.where('a === 2).as('y) * x.join(y).where("x.a".attr === "y.a".attr), * }}} * * @group Query */ - def subquery(alias: Symbol) = + def as(alias: Symbol) = new SchemaRDD(sqlContext, Subquery(alias.name, logicalPlan)) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index acb1ee83a7..daa423cb8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical} import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { self: Product => @@ -69,6 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) + case InMemoryColumnarTableScan(output, child) => + InMemoryColumnarTableScan(output.map(_.newInstance), child) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e5902c3cae..7c6a642278 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -58,4 +58,17 @@ class CachedTableSuite extends QueryTest { TestSQLContext.uncacheTable("testData") } } + + test("SELECT Star Cached Table") { + TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar") + TestSQLContext.cacheTable("selectStar") + TestSQLContext.sql("SELECT * FROM selectStar") + TestSQLContext.uncacheTable("selectStar") + } + + test("Self-join cached") { + TestSQLContext.cacheTable("testData") + TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key") + TestSQLContext.uncacheTable("testData") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 2524a37cba..be0f4a4c73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -119,8 +119,8 @@ class DslQuerySuite extends QueryTest { } test("inner join, where, multiple matches") { - val x = testData2.where('a === 1).subquery('x) - val y = testData2.where('a === 1).subquery('y) + val x = testData2.where('a === 1).as('x) + val y = testData2.where('a === 1).as('y) checkAnswer( x.join(y).where("x.a".attr === "y.a".attr), (1,1,1,1) :: @@ -131,8 +131,8 @@ class DslQuerySuite extends QueryTest { } test("inner join, no matches") { - val x = testData2.where('a === 1).subquery('x) - val y = testData2.where('a === 2).subquery('y) + val x = testData2.where('a === 1).as('x) + val y = testData2.where('a === 2).as('y) checkAnswer( x.join(y).where("x.a".attr === "y.a".attr), Nil) @@ -140,8 +140,8 @@ class DslQuerySuite extends QueryTest { test("big inner join, 4 matches per row") { val bigData = testData.unionAll(testData).unionAll(testData).unionAll(testData) - val bigDataX = bigData.subquery('x) - val bigDataY = bigData.subquery('y) + val bigDataX = bigData.as('x) + val bigDataY = bigData.as('y) checkAnswer( bigDataX.join(bigDataY).where("x.key".attr === "y.key".attr), @@ -181,8 +181,8 @@ class DslQuerySuite extends QueryTest { } test("full outer join") { - val left = upperCaseData.where('N <= 4).subquery('left) - val right = upperCaseData.where('N >= 3).subquery('right) + val left = upperCaseData.where('N <= 4).as('left) + val right = upperCaseData.where('N >= 3).as('right) checkAnswer( left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index a62a3c4d02..fc68d6c562 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -56,8 +56,8 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { } test("self-join parquet files") { - val x = ParquetTestData.testData.subquery('x) - val y = ParquetTestData.testData.subquery('y) + val x = ParquetTestData.testData.as('x) + val y = ParquetTestData.testData.as('y) val query = x.join(y).where("x.myint".attr === "y.myint".attr) // Check to make sure that the attributes from either side of the join have unique expression |