diff options
author | Reynold Xin <rxin@databricks.com> | 2016-07-01 15:16:04 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-01 15:16:04 -0700 |
commit | d601894c0494d415e7f330e02168c43a2dacfb02 (patch) | |
tree | 8c365e808a5f4c6836a20a5c510332e380a1ee2f /sql/core | |
parent | e4fa58c43ce2bf8d76bffb0d9dc1132f8d0eae6a (diff) | |
download | spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.gz spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.bz2 spark-d601894c0494d415e7f330e02168c43a2dacfb02.zip |
[SPARK-16335][SQL] Structured streaming should fail if source directory does not exist
## What changes were proposed in this pull request?
In structured streaming, Spark does not report errors when the specified directory does not exist. This is a behavior different from the batch mode. This patch changes the behavior to fail if the directory does not exist (when the path is not a glob pattern).
## How was this patch tested?
Updated unit tests to reflect the new behavior.
Author: Reynold Xin <rxin@databricks.com>
Closes #14002 from rxin/SPARK-16335.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 12 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 38 |
2 files changed, 27 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a4110d7b11..6dc27c1952 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -203,6 +203,18 @@ case class DataSource( val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) + + // Check whether the path exists if it is not a glob pattern. + // For glob pattern, we do not check it because the glob pattern might only make sense + // once the streaming job starts and some upstream source starts dropping data. + val hdfsPath = new Path(path) + if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + if (!fs.exists(hdfsPath)) { + throw new AnalysisException(s"Path does not exist: $path") + } + } + val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) val isTextSource = providingClass == classOf[text.TextFileFormat] // If the schema inference is disabled, only text sources require schema to be specified diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6c04846f00..8a34cf95f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -179,18 +179,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() } } - test("FileStreamSource schema: path doesn't exist, no schema") { - val e = intercept[IllegalArgumentException] { - createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None) + test("FileStreamSource schema: path doesn't exist (without schema) should throw exception") { + withTempDir { dir => + intercept[AnalysisException] { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = None) + } } - assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path } - test("FileStreamSource schema: path doesn't exist, with schema") { - val userSchema = new StructType().add(new StructField("value", IntegerType)) - val schema = createFileStreamSourceAndGetSchema( - format = None, path = Some("/a/b/c"), schema = Some(userSchema)) - assert(schema === userSchema) + test("FileStreamSource schema: path doesn't exist (with schema) should throw exception") { + withTempDir { dir => + intercept[AnalysisException] { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = Some(userSchema)) + } + } } @@ -225,20 +231,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // =============== Parquet file stream schema tests ================ - ignore("FileStreamSource schema: parquet, no existing files, no schema") { - withTempDir { src => - withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - val e = intercept[AnalysisException] { - createFileStreamSourceAndGetSchema( - format = Some("parquet"), - path = Some(new File(src, "1").getCanonicalPath), - schema = None) - } - assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) - } - } - } - test("FileStreamSource schema: parquet, existing files, no schema") { withTempDir { src => Seq("a", "b", "c").toDS().as("userColumn").toDF().write |