diff options
author | Reynold Xin <rxin@databricks.com> | 2016-07-01 15:16:04 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-01 15:16:04 -0700 |
commit | d601894c0494d415e7f330e02168c43a2dacfb02 (patch) | |
tree | 8c365e808a5f4c6836a20a5c510332e380a1ee2f /python/pyspark/sql | |
parent | e4fa58c43ce2bf8d76bffb0d9dc1132f8d0eae6a (diff) | |
download | spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.gz spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.bz2 spark-d601894c0494d415e7f330e02168c43a2dacfb02.zip |
[SPARK-16335][SQL] Structured streaming should fail if source directory does not exist
## What changes were proposed in this pull request?
In structured streaming, Spark does not report errors when the specified directory does not exist. This is a behavior different from the batch mode. This patch changes the behavior to fail if the directory does not exist (when the path is not a glob pattern).
## How was this patch tested?
Updated unit tests to reflect the new behavior.
Author: Reynold Xin <rxin@databricks.com>
Closes #14002 from rxin/SPARK-16335.
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r-- | python/pyspark/sql/streaming.py | 11 |
1 files changed, 4 insertions, 7 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index bffe398247..8bac347e13 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -315,7 +315,7 @@ class DataStreamReader(OptionUtils): >>> json_sdf = spark.readStream.format("json")\ .schema(sdf_schema)\ - .load(os.path.join(tempfile.mkdtemp(),'data')) + .load(tempfile.mkdtemp()) >>> json_sdf.isStreaming True >>> json_sdf.schema == sdf_schema @@ -382,8 +382,7 @@ class DataStreamReader(OptionUtils): it uses the value specified in ``spark.sql.columnNameOfCorruptRecord``. - >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \ - schema = sdf_schema) + >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming True >>> json_sdf.schema == sdf_schema @@ -411,8 +410,7 @@ class DataStreamReader(OptionUtils): .. note:: Experimental. - >>> parquet_sdf = spark.readStream.schema(sdf_schema)\ - .parquet(os.path.join(tempfile.mkdtemp())) + >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) >>> parquet_sdf.isStreaming True >>> parquet_sdf.schema == sdf_schema @@ -512,8 +510,7 @@ class DataStreamReader(OptionUtils): * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. - >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \ - schema = sdf_schema) + >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming True >>> csv_sdf.schema == sdf_schema |