diff options
4 files changed, 78 insertions, 48 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e2d5f42f9c..e2a14edc54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -205,7 +205,16 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) + + if (globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + // Sufficient to check head of the globPath seq for non-glob scenario + if (!fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } + globPath }.toArray // If they gave a schema, then we try and figure out the types of the partition columns diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index d03597ee5d..f60c5ea759 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1397,4 +1397,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(e.getStackTrace.head.getClassName != classOf[QueryExecution].getName) } + + test("SPARK-13774: Check error message for non existent path without globbed paths") { + val e = intercept[AnalysisException] (sqlContext.read.format("csv"). + load("/xyz/file2", "/xyz/file21", "/abc/files555", "a")).getMessage() + assert(e.startsWith("Path does not exist")) + } + + test("SPARK-13774: Check error message for not existent globbed paths") { + val e = intercept[AnalysisException] (sqlContext.read.format("text"). + load( "/xyz/*")).getMessage() + assert(e.startsWith("Path does not exist")) + + val e1 = intercept[AnalysisException] (sqlContext.read.json("/mnt/*/*-xyz.json").rdd). + getMessage() + assert(e1.startsWith("Path does not exist")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9f2233d5d8..2733ae7d98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1744,7 +1744,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e3 = intercept[AnalysisException] { sql("select * from json.invalid_file") } - assert(e3.message.contains("Unable to infer schema")) + assert(e3.message.contains("Path does not exist")) } test("SortMergeJoin returns wrong results when using UnsafeRows") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index a80c35cd81..3f3d0692b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -693,23 +693,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("SPARK-6024 wide schema support") { withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") { withTable("wide_schema") { - // We will need 80 splits for this schema if the threshold is 4000. - val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) - - // Manually create a metastore data source table. - sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("wide_schema"), - userSpecifiedSchema = Some(schema), - partitionColumns = Array.empty[String], - bucketSpec = None, - provider = "json", - options = Map("path" -> "just a dummy path"), - isExternal = false) - - invalidateTable("wide_schema") - - val actualSchema = table("wide_schema").schema - assert(schema === actualSchema) + withTempDir( tempDir => { + // We will need 80 splits for this schema if the threshold is 4000. + val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) + + // Manually create a metastore data source table. + sessionState.catalog.createDataSourceTable( + tableIdent = TableIdentifier("wide_schema"), + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + bucketSpec = None, + provider = "json", + options = Map("path" -> tempDir.getCanonicalPath), + isExternal = false) + + invalidateTable("wide_schema") + + val actualSchema = table("wide_schema").schema + assert(schema === actualSchema) + }) } } } @@ -899,35 +901,38 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sqlContext.sql("""drop database if exists testdb8156 CASCADE""") } + test("skip hive metadata on table creation") { - val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - - sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("not_skip_hive_metadata"), - userSpecifiedSchema = Some(schema), - partitionColumns = Array.empty[String], - bucketSpec = None, - provider = "parquet", - options = Map("path" -> "just a dummy path", "skipHiveMetadata" -> "false"), - isExternal = false) - - // As a proxy for verifying that the table was stored in Hive compatible format, we verify that - // each column of the table is of native type StringType. - assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) - - sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("skip_hive_metadata"), - userSpecifiedSchema = Some(schema), - partitionColumns = Array.empty[String], - bucketSpec = None, - provider = "parquet", - options = Map("path" -> "just a dummy path", "skipHiveMetadata" -> "true"), - isExternal = false) - - // As a proxy for verifying that the table was stored in SparkSQL format, we verify that - // the table has a column type as array of StringType. - assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) + withTempDir(tempPath => { + val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) + + sessionState.catalog.createDataSourceTable( + tableIdent = TableIdentifier("not_skip_hive_metadata"), + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + bucketSpec = None, + provider = "parquet", + options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false"), + isExternal = false) + + // As a proxy for verifying that the table was stored in Hive compatible format, + // we verify that each column of the table is of native type StringType. + assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) + + sessionState.catalog.createDataSourceTable( + tableIdent = TableIdentifier("skip_hive_metadata"), + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + bucketSpec = None, + provider = "parquet", + options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"), + isExternal = false) + + // As a proxy for verifying that the table was stored in SparkSQL format, we verify that + // the table has a column type as array of StringType. + assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) + }) } } |