aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala10
-rw-r--r--python/pyspark/sql/streaming.py11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala38
4 files changed, 36 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index bb1793d451..90c71cc6cf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -232,6 +232,10 @@ class SparkHadoopUtil extends Logging {
recurse(baseStatus)
}
+ def isGlobPath(pattern: Path): Boolean = {
+ pattern.toString.exists("{}[]*?\\".toSet.contains)
+ }
+
def globPath(pattern: Path): Seq[Path] = {
val fs = pattern.getFileSystem(conf)
Option(fs.globStatus(pattern)).map { statuses =>
@@ -240,11 +244,7 @@ class SparkHadoopUtil extends Logging {
}
def globPathIfNecessary(pattern: Path): Seq[Path] = {
- if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
- globPath(pattern)
- } else {
- Seq(pattern)
- }
+ if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern)
}
/**
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index bffe398247..8bac347e13 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -315,7 +315,7 @@ class DataStreamReader(OptionUtils):
>>> json_sdf = spark.readStream.format("json")\
.schema(sdf_schema)\
- .load(os.path.join(tempfile.mkdtemp(),'data'))
+ .load(tempfile.mkdtemp())
>>> json_sdf.isStreaming
True
>>> json_sdf.schema == sdf_schema
@@ -382,8 +382,7 @@ class DataStreamReader(OptionUtils):
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
- >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \
- schema = sdf_schema)
+ >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
True
>>> json_sdf.schema == sdf_schema
@@ -411,8 +410,7 @@ class DataStreamReader(OptionUtils):
.. note:: Experimental.
- >>> parquet_sdf = spark.readStream.schema(sdf_schema)\
- .parquet(os.path.join(tempfile.mkdtemp()))
+ >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
>>> parquet_sdf.isStreaming
True
>>> parquet_sdf.schema == sdf_schema
@@ -512,8 +510,7 @@ class DataStreamReader(OptionUtils):
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
- >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \
- schema = sdf_schema)
+ >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
True
>>> csv_sdf.schema == sdf_schema
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index a4110d7b11..6dc27c1952 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -203,6 +203,18 @@ case class DataSource(
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
+
+ // Check whether the path exists if it is not a glob pattern.
+ // For glob pattern, we do not check it because the glob pattern might only make sense
+ // once the streaming job starts and some upstream source starts dropping data.
+ val hdfsPath = new Path(path)
+ if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ if (!fs.exists(hdfsPath)) {
+ throw new AnalysisException(s"Path does not exist: $path")
+ }
+ }
+
val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6c04846f00..8a34cf95f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -179,18 +179,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() }
}
- test("FileStreamSource schema: path doesn't exist, no schema") {
- val e = intercept[IllegalArgumentException] {
- createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None)
+ test("FileStreamSource schema: path doesn't exist (without schema) should throw exception") {
+ withTempDir { dir =>
+ intercept[AnalysisException] {
+ val userSchema = new StructType().add(new StructField("value", IntegerType))
+ val schema = createFileStreamSourceAndGetSchema(
+ format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = None)
+ }
}
- assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path
}
- test("FileStreamSource schema: path doesn't exist, with schema") {
- val userSchema = new StructType().add(new StructField("value", IntegerType))
- val schema = createFileStreamSourceAndGetSchema(
- format = None, path = Some("/a/b/c"), schema = Some(userSchema))
- assert(schema === userSchema)
+ test("FileStreamSource schema: path doesn't exist (with schema) should throw exception") {
+ withTempDir { dir =>
+ intercept[AnalysisException] {
+ val userSchema = new StructType().add(new StructField("value", IntegerType))
+ val schema = createFileStreamSourceAndGetSchema(
+ format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = Some(userSchema))
+ }
+ }
}
@@ -225,20 +231,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// =============== Parquet file stream schema tests ================
- ignore("FileStreamSource schema: parquet, no existing files, no schema") {
- withTempDir { src =>
- withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- val e = intercept[AnalysisException] {
- createFileStreamSourceAndGetSchema(
- format = Some("parquet"),
- path = Some(new File(src, "1").getCanonicalPath),
- schema = None)
- }
- assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
- }
- }
- }
-
test("FileStreamSource schema: parquet, existing files, no schema") {
withTempDir { src =>
Seq("a", "b", "c").toDS().as("userColumn").toDF().write