aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-21 13:38:30 -0700
committerMichael Armbrust <michael@databricks.com>2015-10-21 13:38:30 -0700
commitf8c6bec65784de89b47e96a367d3f9790c1b3115 (patch)
tree29ebe356ff5dddfabf67bc0bca0b9fc7120e6d0b /sql/core
parent7c74ebca05f40a2d8fe8f10f24a10486ce4f76c0 (diff)
downloadspark-f8c6bec65784de89b47e96a367d3f9790c1b3115.tar.gz
spark-f8c6bec65784de89b47e96a367d3f9790c1b3115.tar.bz2
spark-f8c6bec65784de89b47e96a367d3f9790c1b3115.zip
[SPARK-11197][SQL] run SQL on files directly
This PR introduce a new feature to run SQL directly on files without create a table, for example: ``` select id from json.`path/to/json/files` as j ``` Author: Davies Liu <davies@databricks.com> Closes #9173 from davies/source.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala28
4 files changed, 63 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index b08cc8e830..6f2892085a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -432,6 +432,12 @@ private[spark] object SQLConf {
val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2",
defaultValue = Some(true), doc = "<TODO>")
+ val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
+ defaultValue = Some(true),
+ isPublic = false,
+ doc = "When true, we could use `datasource`.`path` as table in SQL query"
+ )
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
@@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
+ private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e83657a605..a107639947 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -193,7 +193,7 @@ class SQLContext private[sql](
override val extendedResolutionRules =
ExtractPythonUDFs ::
PreInsertCastAndRename ::
- Nil
+ (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
override val extendedCheckRules = Seq(
datasources.PreWriteCheck(catalog)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b00e5680fe..abc016bf02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,13 +17,37 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.{AnalysisException, SaveMode}
-import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries}
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
+
+/**
+ * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
+ */
+private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
+ try {
+ val resolved = ResolvedDataSource(
+ sqlContext,
+ userSpecifiedSchema = None,
+ partitionColumns = Array(),
+ provider = u.tableIdentifier.database.get,
+ options = Map("path" -> u.tableIdentifier.table))
+ val plan = LogicalRelation(resolved.relation)
+ u.alias.map(a => Subquery(u.alias.get, plan)).getOrElse(plan)
+ } catch {
+ case e: ClassNotFoundException => u
+ case e: Exception =>
+ // the provider is valid, but failed to create a logical plan
+ u.failAnalysis(e.getMessage)
+ }
+ }
+}
/**
* A rule to do pre-insert data type casting and field renaming. Before we insert into
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index a35a7f41dd..298c322906 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1796,6 +1796,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("run sql directly on files") {
+ val df = sqlContext.range(100)
+ withTempPath(f => {
+ df.write.json(f.getCanonicalPath)
+ checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
+ df)
+ })
+
+ val e1 = intercept[AnalysisException] {
+ sql("select * from in_valid_table")
+ }
+ assert(e1.message.contains("Table not found"))
+
+ val e2 = intercept[AnalysisException] {
+ sql("select * from no_db.no_table")
+ }
+ assert(e2.message.contains("Table not found"))
+
+ val e3 = intercept[AnalysisException] {
+ sql("select * from json.invalid_file")
+ }
+ assert(e3.message.contains("No input paths specified"))
+ }
+
test("SortMergeJoin returns wrong results when using UnsafeRows") {
// This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737.
// This bug will be triggered when Tungsten is enabled and there are multiple