aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala28
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala13
10 files changed, 91 insertions, 9 deletions
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index e1b42b0804..67d8b23cd7 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -1428,7 +1428,7 @@ test_that("sampleBy() on a DataFrame", {
test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
- expect_equal(grepl("Table Not Found: blah", retError), TRUE)
+ expect_equal(grepl("Table not found: blah", retError), TRUE)
})
test_that("Method as.data.frame as a synonym for collect()", {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 016dc293f4..beabacfc88 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -256,7 +256,7 @@ class Analyzer(
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
- u.failAnalysis(s"Table Not Found: ${u.tableName}")
+ u.failAnalysis(s"Table not found: ${u.tableName}")
}
}
@@ -264,7 +264,13 @@ class Analyzer(
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
- getTable(u)
+ try {
+ getTable(u)
+ } catch {
+ case _: AnalysisException if u.tableIdentifier.database.isDefined =>
+ // delay the exception into CheckAnalysis, then it could be resolved as data source.
+ u
+ }
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7701fd0451..ab215407f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -49,6 +49,9 @@ trait CheckAnalysis {
plan.foreachUp {
case p if p.analyzed => // Skip already analyzed sub-plans
+ case u: UnresolvedRelation =>
+ u.failAnalysis(s"Table not found: ${u.tableIdentifier}")
+
case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 24af8483a7..0a1fa74bed 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -78,7 +78,7 @@ class AnalysisSuite extends AnalysisTest {
test("resolve relations") {
assertAnalysisError(
- UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe"))
+ UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index b08cc8e830..6f2892085a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -432,6 +432,12 @@ private[spark] object SQLConf {
val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2",
defaultValue = Some(true), doc = "<TODO>")
+ val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
+ defaultValue = Some(true),
+ isPublic = false,
+ doc = "When true, we could use `datasource`.`path` as table in SQL query"
+ )
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
@@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
+ private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e83657a605..a107639947 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -193,7 +193,7 @@ class SQLContext private[sql](
override val extendedResolutionRules =
ExtractPythonUDFs ::
PreInsertCastAndRename ::
- Nil
+ (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
override val extendedCheckRules = Seq(
datasources.PreWriteCheck(catalog)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b00e5680fe..abc016bf02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,13 +17,37 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.{AnalysisException, SaveMode}
-import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries}
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
+
+/**
+ * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
+ */
+private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
+ try {
+ val resolved = ResolvedDataSource(
+ sqlContext,
+ userSpecifiedSchema = None,
+ partitionColumns = Array(),
+ provider = u.tableIdentifier.database.get,
+ options = Map("path" -> u.tableIdentifier.table))
+ val plan = LogicalRelation(resolved.relation)
+ u.alias.map(a => Subquery(u.alias.get, plan)).getOrElse(plan)
+ } catch {
+ case e: ClassNotFoundException => u
+ case e: Exception =>
+ // the provider is valid, but failed to create a logical plan
+ u.failAnalysis(e.getMessage)
+ }
+ }
+}
/**
* A rule to do pre-insert data type casting and field renaming. Before we insert into
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index a35a7f41dd..298c322906 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1796,6 +1796,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("run sql directly on files") {
+ val df = sqlContext.range(100)
+ withTempPath(f => {
+ df.write.json(f.getCanonicalPath)
+ checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
+ df)
+ })
+
+ val e1 = intercept[AnalysisException] {
+ sql("select * from in_valid_table")
+ }
+ assert(e1.message.contains("Table not found"))
+
+ val e2 = intercept[AnalysisException] {
+ sql("select * from no_db.no_table")
+ }
+ assert(e2.message.contains("Table not found"))
+
+ val e3 = intercept[AnalysisException] {
+ sql("select * from json.invalid_file")
+ }
+ assert(e3.message.contains("No input paths specified"))
+ }
+
test("SortMergeJoin returns wrong results when using UnsafeRows") {
// This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737.
// This bug will be triggered when Tungsten is enabled and there are multiple
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 38c195bc7d..61f611638f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser}
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
+import org.apache.spark.sql.execution.datasources.{ResolveDataSource, DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand}
import org.apache.spark.sql.hive.client._
@@ -473,7 +473,7 @@ class HiveContext private[hive](
ExtractPythonUDFs ::
ResolveHiveWindowFunction ::
PreInsertCastAndRename ::
- Nil
+ (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
override val extendedCheckRules = Seq(
PreWriteCheck(catalog)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c929ba5068..396150be76 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1281,6 +1281,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("run sql directly on files") {
+ val df = sqlContext.range(100)
+ withTempPath(f => {
+ df.write.parquet(f.getCanonicalPath)
+ checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select a.id from parquet.`${f.getCanonicalPath}` as a"),
+ df)
+ })
+ }
+
test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt") {