diff options
author | Reynold Xin <rxin@databricks.com> | 2016-07-01 15:16:04 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-01 15:16:04 -0700 |
commit | d601894c0494d415e7f330e02168c43a2dacfb02 (patch) | |
tree | 8c365e808a5f4c6836a20a5c510332e380a1ee2f /core | |
parent | e4fa58c43ce2bf8d76bffb0d9dc1132f8d0eae6a (diff) | |
download | spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.gz spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.bz2 spark-d601894c0494d415e7f330e02168c43a2dacfb02.zip |
[SPARK-16335][SQL] Structured streaming should fail if source directory does not exist
## What changes were proposed in this pull request?
In structured streaming, Spark does not report errors when the specified directory does not exist. This is a behavior different from the batch mode. This patch changes the behavior to fail if the directory does not exist (when the path is not a glob pattern).
## How was this patch tested?
Updated unit tests to reflect the new behavior.
Author: Reynold Xin <rxin@databricks.com>
Closes #14002 from rxin/SPARK-16335.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 10 |
1 files changed, 5 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index bb1793d451..90c71cc6cf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -232,6 +232,10 @@ class SparkHadoopUtil extends Logging { recurse(baseStatus) } + def isGlobPath(pattern: Path): Boolean = { + pattern.toString.exists("{}[]*?\\".toSet.contains) + } + def globPath(pattern: Path): Seq[Path] = { val fs = pattern.getFileSystem(conf) Option(fs.globStatus(pattern)).map { statuses => @@ -240,11 +244,7 @@ class SparkHadoopUtil extends Logging { } def globPathIfNecessary(pattern: Path): Seq[Path] = { - if (pattern.toString.exists("{}[]*?\\".toSet.contains)) { - globPath(pattern) - } else { - Seq(pattern) - } + if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) } /** |