diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 12 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 38 |
2 files changed, 27 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a4110d7b11..6dc27c1952 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -203,6 +203,18 @@ case class DataSource( val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) + + // Check whether the path exists if it is not a glob pattern. + // For glob pattern, we do not check it because the glob pattern might only make sense + // once the streaming job starts and some upstream source starts dropping data. + val hdfsPath = new Path(path) + if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + if (!fs.exists(hdfsPath)) { + throw new AnalysisException(s"Path does not exist: $path") + } + } + val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) val isTextSource = providingClass == classOf[text.TextFileFormat] // If the schema inference is disabled, only text sources require schema to be specified diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6c04846f00..8a34cf95f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -179,18 +179,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() } } - test("FileStreamSource schema: path doesn't exist, no schema") { - val e = intercept[IllegalArgumentException] { - createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None) + test("FileStreamSource schema: path doesn't exist (without schema) should throw exception") { + withTempDir { dir => + intercept[AnalysisException] { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = None) + } } - assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path } - test("FileStreamSource schema: path doesn't exist, with schema") { - val userSchema = new StructType().add(new StructField("value", IntegerType)) - val schema = createFileStreamSourceAndGetSchema( - format = None, path = Some("/a/b/c"), schema = Some(userSchema)) - assert(schema === userSchema) + test("FileStreamSource schema: path doesn't exist (with schema) should throw exception") { + withTempDir { dir => + intercept[AnalysisException] { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = Some(userSchema)) + } + } } @@ -225,20 +231,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // =============== Parquet file stream schema tests ================ - ignore("FileStreamSource schema: parquet, no existing files, no schema") { - withTempDir { src => - withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - val e = intercept[AnalysisException] { - createFileStreamSourceAndGetSchema( - format = Some("parquet"), - path = Some(new File(src, "1").getCanonicalPath), - schema = None) - } - assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) - } - } - } - test("FileStreamSource schema: parquet, existing files, no schema") { withTempDir { src => Seq("a", "b", "c").toDS().as("userColumn").toDF().write |