aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-23 12:49:36 -0700
committerReynold Xin <rxin@databricks.com>2016-04-23 12:49:36 -0700
commit890abd1279014d692548c9f3b557483644a0ee32 (patch)
tree128e27e4b2b2a2ba63c70124b94605038a8860f5
parent5c8a0ec99bded2271481f8d6cf5443fea5da4bbd (diff)
downloadspark-890abd1279014d692548c9f3b557483644a0ee32.tar.gz
spark-890abd1279014d692548c9f3b557483644a0ee32.tar.bz2
spark-890abd1279014d692548c9f3b557483644a0ee32.zip
[SPARK-14869][SQL] Don't mask exceptions in ResolveRelations
## What changes were proposed in this pull request? In order to support running SQL directly on files, we added some code in ResolveRelations to catch the exception thrown by catalog.lookupRelation and ignore it. This unfortunately masks all the exceptions. This patch changes the logic to simply test the table's existence. ## How was this patch tested? I manually hacked some bugs into Spark and made sure the exceptions were being propagated up. Author: Reynold Xin <rxin@databricks.com> Closes #12634 from rxin/SPARK-14869.
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala18
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala4
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala2
10 files changed, 26 insertions, 18 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9bd3975405..9244c5621b 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1858,7 +1858,7 @@ test_that("approxQuantile() on a DataFrame", {
test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
- expect_equal(grepl("Table or View not found", retError), TRUE)
+ expect_equal(grepl("Table or view not found", retError), TRUE)
expect_equal(grepl("blah", retError), TRUE)
})
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 6e798a53ad..179dab11a2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -32,6 +32,8 @@ trait CatalystConf {
def optimizerInSetConversionThreshold: Int
def maxCaseBranchesForCodegen: Int
+ def runSQLonFile: Boolean
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
@@ -49,6 +51,6 @@ case class SimpleCatalystConf(
groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
- maxCaseBranchesForCodegen: Int = 20)
- extends CatalystConf {
-}
+ maxCaseBranchesForCodegen: Int = 20,
+ runSQLonFile: Boolean = true)
+ extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 24558d5b8c..50957e8661 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -412,7 +412,7 @@ class Analyzer(
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
- u.failAnalysis(s"Table or View not found: ${u.tableName}")
+ u.failAnalysis(s"Table or view not found: ${u.tableName}")
}
}
@@ -420,12 +420,18 @@ class Analyzer(
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
- try {
+ val table = u.tableIdentifier
+ if (table.database.isDefined && conf.runSQLonFile &&
+ (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
+ // If the table does not exist, and the database part is specified, and we support
+ // running SQL directly on files, then let's just return the original UnresolvedRelation.
+ // It is possible we are matching a query like "select * from parquet.`/path/to/query`".
+ // The plan will get resolved later.
+ // Note that we are testing (!db_exists || !table_exists) because the catalog throws
+ // an exception from tableExists if the database does not exist.
+ u
+ } else {
lookupTableFromCatalog(u)
- } catch {
- case _: AnalysisException if u.tableIdentifier.database.isDefined =>
- // delay the exception into CheckAnalysis, then it could be resolved as data source.
- u
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index a50b9a1e1a..6b737d6b78 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper {
case p if p.analyzed => // Skip already analyzed sub-plans
case u: UnresolvedRelation =>
- u.failAnalysis(s"Table or View not found: ${u.tableIdentifier}")
+ u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
case operator: LogicalPlan =>
operator transformExpressionsUp {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 36f4f29068..b8f0e458fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -62,7 +62,7 @@ class InMemoryCatalog extends ExternalCatalog {
private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new AnalysisException(
- s"Table or View not found: '$table' does not exist in database '$db'")
+ s"Table or view not found: '$table' does not exist in database '$db'")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c1bd51632d..acd85db7c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -642,7 +642,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
- def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
+ override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c423b84957..04ad920749 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -98,7 +98,7 @@ private[sql] class SessionState(ctx: SQLContext) {
override val extendedResolutionRules =
PreInsertCastAndRename ::
DataSourceAnalysis ::
- (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+ (if (conf.runSQLonFile) new ResolveDataSource(ctx) :: Nil else Nil)
override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 84f0c0083b..29521afdd8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1825,12 +1825,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
val e1 = intercept[AnalysisException] {
sql("select * from in_valid_table")
}
- assert(e1.message.contains("Table or View not found"))
+ assert(e1.message.contains("Table or view not found"))
val e2 = intercept[AnalysisException] {
sql("select * from no_db.no_table").show()
}
- assert(e2.message.contains("Table or View not found"))
+ assert(e2.message.contains("Table or view not found"))
val e3 = intercept[AnalysisException] {
sql("select * from json.invalid_file")
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index cc8b41542e..3fa2f884e2 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -230,7 +230,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
runCliWithin(timeout = 2.minute,
errorResponses = Seq("AnalysisException"))(
"select * from nonexistent_table;"
- -> "Error in query: Table or View not found: nonexistent_table;"
+ -> "Error in query: Table or view not found: nonexistent_table;"
)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index b0877823c0..a22e19207e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -91,7 +91,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
catalog.PreInsertionCasts ::
PreInsertCastAndRename ::
DataSourceAnalysis ::
- (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+ (if (conf.runSQLonFile) new ResolveDataSource(ctx) :: Nil else Nil)
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}