aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-07-01 15:16:04 -0700
committerReynold Xin <rxin@databricks.com>2016-07-01 15:16:04 -0700
commitd601894c0494d415e7f330e02168c43a2dacfb02 (patch)
tree8c365e808a5f4c6836a20a5c510332e380a1ee2f /sql
parente4fa58c43ce2bf8d76bffb0d9dc1132f8d0eae6a (diff)
downloadspark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.gz
spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.bz2
spark-d601894c0494d415e7f330e02168c43a2dacfb02.zip
[SPARK-16335][SQL] Structured streaming should fail if source directory does not exist
## What changes were proposed in this pull request? In structured streaming, Spark does not report errors when the specified directory does not exist. This is a behavior different from the batch mode. This patch changes the behavior to fail if the directory does not exist (when the path is not a glob pattern). ## How was this patch tested? Updated unit tests to reflect the new behavior. Author: Reynold Xin <rxin@databricks.com> Closes #14002 from rxin/SPARK-16335.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala38
2 files changed, 27 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index a4110d7b11..6dc27c1952 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -203,6 +203,18 @@ case class DataSource(
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
+
+ // Check whether the path exists if it is not a glob pattern.
+ // For glob pattern, we do not check it because the glob pattern might only make sense
+ // once the streaming job starts and some upstream source starts dropping data.
+ val hdfsPath = new Path(path)
+ if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ if (!fs.exists(hdfsPath)) {
+ throw new AnalysisException(s"Path does not exist: $path")
+ }
+ }
+
val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6c04846f00..8a34cf95f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -179,18 +179,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() }
}
- test("FileStreamSource schema: path doesn't exist, no schema") {
- val e = intercept[IllegalArgumentException] {
- createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None)
+ test("FileStreamSource schema: path doesn't exist (without schema) should throw exception") {
+ withTempDir { dir =>
+ intercept[AnalysisException] {
+ val userSchema = new StructType().add(new StructField("value", IntegerType))
+ val schema = createFileStreamSourceAndGetSchema(
+ format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = None)
+ }
}
- assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path
}
- test("FileStreamSource schema: path doesn't exist, with schema") {
- val userSchema = new StructType().add(new StructField("value", IntegerType))
- val schema = createFileStreamSourceAndGetSchema(
- format = None, path = Some("/a/b/c"), schema = Some(userSchema))
- assert(schema === userSchema)
+ test("FileStreamSource schema: path doesn't exist (with schema) should throw exception") {
+ withTempDir { dir =>
+ intercept[AnalysisException] {
+ val userSchema = new StructType().add(new StructField("value", IntegerType))
+ val schema = createFileStreamSourceAndGetSchema(
+ format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = Some(userSchema))
+ }
+ }
}
@@ -225,20 +231,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// =============== Parquet file stream schema tests ================
- ignore("FileStreamSource schema: parquet, no existing files, no schema") {
- withTempDir { src =>
- withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- val e = intercept[AnalysisException] {
- createFileStreamSourceAndGetSchema(
- format = Some("parquet"),
- path = Some(new File(src, "1").getCanonicalPath),
- schema = None)
- }
- assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
- }
- }
- }
-
test("FileStreamSource schema: parquet, existing files, no schema") {
withTempDir { src =>
Seq("a", "b", "c").toDS().as("userColumn").toDF().write