diff options
author | Davies Liu <davies@databricks.com> | 2015-10-21 13:38:30 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-10-21 13:38:30 -0700 |
commit | f8c6bec65784de89b47e96a367d3f9790c1b3115 (patch) | |
tree | 29ebe356ff5dddfabf67bc0bca0b9fc7120e6d0b /sql/core | |
parent | 7c74ebca05f40a2d8fe8f10f24a10486ce4f76c0 (diff) | |
download | spark-f8c6bec65784de89b47e96a367d3f9790c1b3115.tar.gz spark-f8c6bec65784de89b47e96a367d3f9790c1b3115.tar.bz2 spark-f8c6bec65784de89b47e96a367d3f9790c1b3115.zip |
[SPARK-11197][SQL] run SQL on files directly
This PR introduce a new feature to run SQL directly on files without create a table, for example:
```
select id from json.`path/to/json/files` as j
```
Author: Davies Liu <davies@databricks.com>
Closes #9173 from davies/source.
Diffstat (limited to 'sql/core')
4 files changed, 63 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index b08cc8e830..6f2892085a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -432,6 +432,12 @@ private[spark] object SQLConf { val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2", defaultValue = Some(true), doc = "<TODO>") + val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles", + defaultValue = Some(true), + isPublic = false, + doc = "When true, we could use `datasource`.`path` as table in SQL query" + ) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) + private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e83657a605..a107639947 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -193,7 +193,7 @@ class SQLContext private[sql]( override val extendedResolutionRules = ExtractPythonUDFs :: PreInsertCastAndRename :: - Nil + (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) override val extendedCheckRules = Seq( datasources.PreWriteCheck(catalog) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index b00e5680fe..abc016bf02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -17,13 +17,37 @@ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries} +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation} +import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} + +/** + * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]]. + */ +private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case u: UnresolvedRelation if u.tableIdentifier.database.isDefined => + try { + val resolved = ResolvedDataSource( + sqlContext, + userSpecifiedSchema = None, + partitionColumns = Array(), + provider = u.tableIdentifier.database.get, + options = Map("path" -> u.tableIdentifier.table)) + val plan = LogicalRelation(resolved.relation) + u.alias.map(a => Subquery(u.alias.get, plan)).getOrElse(plan) + } catch { + case e: ClassNotFoundException => u + case e: Exception => + // the provider is valid, but failed to create a logical plan + u.failAnalysis(e.getMessage) + } + } +} /** * A rule to do pre-insert data type casting and field renaming. Before we insert into diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index a35a7f41dd..298c322906 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1796,6 +1796,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("run sql directly on files") { + val df = sqlContext.range(100) + withTempPath(f => { + df.write.json(f.getCanonicalPath) + checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"), + df) + }) + + val e1 = intercept[AnalysisException] { + sql("select * from in_valid_table") + } + assert(e1.message.contains("Table not found")) + + val e2 = intercept[AnalysisException] { + sql("select * from no_db.no_table") + } + assert(e2.message.contains("Table not found")) + + val e3 = intercept[AnalysisException] { + sql("select * from json.invalid_file") + } + assert(e3.message.contains("No input paths specified")) + } + test("SortMergeJoin returns wrong results when using UnsafeRows") { // This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737. // This bug will be triggered when Tungsten is enabled and there are multiple |