aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-06-02 13:22:43 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-02 13:22:43 -0700
commit9aff6f3b1915523432b1921fdd30fa015ed5d670 (patch)
tree10295f61fdeee607d38c5ba71126c358c5da44f1 /sql
parent8900c8d8ff1614b5ec5a2ce213832fa13462b4d4 (diff)
downloadspark-9aff6f3b1915523432b1921fdd30fa015ed5d670.tar.gz
spark-9aff6f3b1915523432b1921fdd30fa015ed5d670.tar.bz2
spark-9aff6f3b1915523432b1921fdd30fa015ed5d670.zip
[SPARK-15515][SQL] Error Handling in Running SQL Directly On Files
#### What changes were proposed in this pull request? This PR is to address the following issues: - **ISSUE 1:** For ORC source format, we are reporting the strange error message when we did not enable Hive support: ```SQL SQL Example: select id from `org.apache.spark.sql.hive.orc`.`file_path` Error Message: Table or view not found: `org.apache.spark.sql.hive.orc`.`file_path` ``` Instead, we should issue the error message like: ``` Expected Error Message: The ORC data source must be used with Hive support enabled ``` - **ISSUE 2:** For the Avro format, we report the strange error message like: The example query is like ```SQL SQL Example: select id from `avro`.`file_path` select id from `com.databricks.spark.avro`.`file_path` Error Message: Table or view not found: `com.databricks.spark.avro`.`file_path` ``` The desired message should be like: ``` Expected Error Message: Failed to find data source: avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro" ``` - ~~**ISSUE 3:** Unable to detect incompatibility libraries for Spark 2.0 in Data Source Resolution. We report a strange error message:~~ **Update**: The latest code changes contains - For JDBC format, we added an extra checking in the rule `ResolveRelations` of `Analyzer`. Without the PR, Spark will return the error message like: `Option 'url' not specified`. Now, we are reporting `Unsupported data source type for direct query on files: jdbc` - Make data source format name case incensitive so that error handling behaves consistent with the normal cases. - Added the test cases for all the supported formats. #### How was this patch tested? Added test cases to cover all the above issues Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #13283 from gatorsmile/runSQLAgainstFile.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala53
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala48
6 files changed, 134 insertions, 34 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 93f1ad01bf..5f17fdf946 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -132,28 +132,20 @@ case class DataSource(
// Found the data source using fully qualified path
dataSource
case Failure(error) =>
- if (error.isInstanceOf[ClassNotFoundException]) {
- val className = error.getMessage
- if (spark2RemovedClasses.contains(className)) {
- throw new ClassNotFoundException(s"$className is removed in Spark 2.0. " +
- "Please check if your library is compatible with Spark 2.0")
- }
- }
- if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
- throw new ClassNotFoundException(
- "The ORC data source must be used with Hive support enabled.", error)
+ if (provider.toLowerCase == "orc" ||
+ provider.startsWith("org.apache.spark.sql.hive.orc")) {
+ throw new AnalysisException(
+ "The ORC data source must be used with Hive support enabled")
+ } else if (provider.toLowerCase == "avro" ||
+ provider == "com.databricks.spark.avro") {
+ throw new AnalysisException(
+ s"Failed to find data source: ${provider.toLowerCase}. Please use Spark " +
+ "package http://spark-packages.org/package/databricks/spark-avro")
} else {
- if (provider == "avro" || provider == "com.databricks.spark.avro") {
- throw new ClassNotFoundException(
- s"Failed to find data source: $provider. Please use Spark package " +
- "http://spark-packages.org/package/databricks/spark-avro",
- error)
- } else {
- throw new ClassNotFoundException(
- s"Failed to find data source: $provider. Please find packages at " +
- "http://spark-packages.org",
- error)
- }
+ throw new ClassNotFoundException(
+ s"Failed to find data source: $provider. Please find packages at " +
+ "http://spark-packages.org",
+ error)
}
}
} catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b622f85941..9afd715016 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import scala.util.control.NonFatal
+
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -28,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
/**
- * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
+ * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
*/
private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
@@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
sparkSession,
paths = u.tableIdentifier.table :: Nil,
className = u.tableIdentifier.database.get)
+
+ val notSupportDirectQuery = try {
+ !classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+ } catch {
+ case NonFatal(e) => false
+ }
+ if (notSupportDirectQuery) {
+ throw new AnalysisException("Unsupported data source type for direct query on files: " +
+ s"${u.tableIdentifier.database.get}")
+ }
val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
} catch {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1a7f6ebbb2..4fcd6bc0d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1838,20 +1838,61 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
df)
})
- val e1 = intercept[AnalysisException] {
+ var e = intercept[AnalysisException] {
sql("select * from in_valid_table")
}
- assert(e1.message.contains("Table or view not found"))
+ assert(e.message.contains("Table or view not found"))
- val e2 = intercept[AnalysisException] {
+ e = intercept[AnalysisException] {
sql("select * from no_db.no_table").show()
}
- assert(e2.message.contains("Table or view not found"))
+ assert(e.message.contains("Table or view not found"))
- val e3 = intercept[AnalysisException] {
+ e = intercept[AnalysisException] {
sql("select * from json.invalid_file")
}
- assert(e3.message.contains("Path does not exist"))
+ assert(e.message.contains("Path does not exist"))
+
+ e = intercept[AnalysisException] {
+ sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`")
+ }
+ assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
+
+ e = intercept[AnalysisException] {
+ sql(s"select id from `com.databricks.spark.avro`.`file_path`")
+ }
+ assert(e.message.contains("Failed to find data source: com.databricks.spark.avro. " +
+ "Please use Spark package http://spark-packages.org/package/databricks/spark-avro"))
+
+ // data source type is case insensitive
+ e = intercept[AnalysisException] {
+ sql(s"select id from Avro.`file_path`")
+ }
+ assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
+ "http://spark-packages.org/package/databricks/spark-avro"))
+
+ e = intercept[AnalysisException] {
+ sql(s"select id from avro.`file_path`")
+ }
+ assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
+ "http://spark-packages.org/package/databricks/spark-avro"))
+
+ e = intercept[AnalysisException] {
+ sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")
+ }
+ assert(e.message.contains("Table or view not found: " +
+ "`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`"))
+
+ e = intercept[AnalysisException] {
+ sql(s"select id from `Jdbc`.`file_path`")
+ }
+ assert(e.message.contains("Unsupported data source type for direct query on files: Jdbc"))
+
+ e = intercept[AnalysisException] {
+ sql(s"select id from `org.apache.spark.sql.execution.datasources.jdbc`.`file_path`")
+ }
+ assert(e.message.contains("Unsupported data source type for direct query on files: " +
+ "org.apache.spark.sql.execution.datasources.jdbc"))
}
test("SortMergeJoin returns wrong results when using UnsafeRows") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index f07c33042a..85ba33e58a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.sources
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -42,9 +42,10 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
}
test("should fail to load ORC without Hive Support") {
- intercept[ClassNotFoundException] {
+ val e = intercept[AnalysisException] {
spark.read.format("orc").load()
}
+ assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 320aaea1e4..5ea1f32433 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.sources
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.datasources.DataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
@@ -60,13 +61,22 @@ class ResolvedDataSourceSuite extends SparkFunSuite {
classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
}
+ test("csv") {
+ assert(
+ getProvidingClass("csv") ===
+ classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
+ assert(
+ getProvidingClass("com.databricks.spark.csv") ===
+ classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
+ }
+
test("error message for unknown data sources") {
- val error1 = intercept[ClassNotFoundException] {
+ val error1 = intercept[AnalysisException] {
getProvidingClass("avro")
}
assert(error1.getMessage.contains("spark-packages"))
- val error2 = intercept[ClassNotFoundException] {
+ val error2 = intercept[AnalysisException] {
getProvidingClass("com.databricks.spark.avro")
}
assert(error2.getMessage.contains("spark-packages"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b5691450ca..24de223cf8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1247,11 +1247,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("run sql directly on files") {
+ test("run sql directly on files - parquet") {
val df = spark.range(100).toDF()
withTempPath(f => {
df.write.parquet(f.getCanonicalPath)
- checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
+ // data source type is case insensitive
+ checkAnswer(sql(s"select id from Parquet.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
df)
@@ -1260,6 +1261,49 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
})
}
+ test("run sql directly on files - orc") {
+ val df = spark.range(100).toDF()
+ withTempPath(f => {
+ df.write.orc(f.getCanonicalPath)
+ // data source type is case insensitive
+ checkAnswer(sql(s"select id from ORC.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select id from `org.apache.spark.sql.hive.orc`.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select a.id from orc.`${f.getCanonicalPath}` as a"),
+ df)
+ })
+ }
+
+ test("run sql directly on files - csv") {
+ val df = spark.range(100).toDF()
+ withTempPath(f => {
+ df.write.csv(f.getCanonicalPath)
+ // data source type is case insensitive
+ checkAnswer(sql(s"select cast(_c0 as int) id from CSV.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(
+ sql(s"select cast(_c0 as int) id from `com.databricks.spark.csv`.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select cast(a._c0 as int) id from csv.`${f.getCanonicalPath}` as a"),
+ df)
+ })
+ }
+
+ test("run sql directly on files - json") {
+ val df = spark.range(100).toDF()
+ withTempPath(f => {
+ df.write.json(f.getCanonicalPath)
+ // data source type is case insensitive
+ checkAnswer(sql(s"select id from jsoN.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
+ df)
+ checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
+ df)
+ })
+ }
+
test("SPARK-8976 Wrong Result for Rollup #1") {
checkAnswer(sql(
"SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),