aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-07-01 15:16:04 -0700
committerReynold Xin <rxin@databricks.com>2016-07-01 15:16:04 -0700
commitd601894c0494d415e7f330e02168c43a2dacfb02 (patch)
tree8c365e808a5f4c6836a20a5c510332e380a1ee2f /python
parente4fa58c43ce2bf8d76bffb0d9dc1132f8d0eae6a (diff)
downloadspark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.gz
spark-d601894c0494d415e7f330e02168c43a2dacfb02.tar.bz2
spark-d601894c0494d415e7f330e02168c43a2dacfb02.zip
[SPARK-16335][SQL] Structured streaming should fail if source directory does not exist
## What changes were proposed in this pull request? In structured streaming, Spark does not report errors when the specified directory does not exist. This is a behavior different from the batch mode. This patch changes the behavior to fail if the directory does not exist (when the path is not a glob pattern). ## How was this patch tested? Updated unit tests to reflect the new behavior. Author: Reynold Xin <rxin@databricks.com> Closes #14002 from rxin/SPARK-16335.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/streaming.py11
1 files changed, 4 insertions, 7 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index bffe398247..8bac347e13 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -315,7 +315,7 @@ class DataStreamReader(OptionUtils):
>>> json_sdf = spark.readStream.format("json")\
.schema(sdf_schema)\
- .load(os.path.join(tempfile.mkdtemp(),'data'))
+ .load(tempfile.mkdtemp())
>>> json_sdf.isStreaming
True
>>> json_sdf.schema == sdf_schema
@@ -382,8 +382,7 @@ class DataStreamReader(OptionUtils):
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
- >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \
- schema = sdf_schema)
+ >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
True
>>> json_sdf.schema == sdf_schema
@@ -411,8 +410,7 @@ class DataStreamReader(OptionUtils):
.. note:: Experimental.
- >>> parquet_sdf = spark.readStream.schema(sdf_schema)\
- .parquet(os.path.join(tempfile.mkdtemp()))
+ >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
>>> parquet_sdf.isStreaming
True
>>> parquet_sdf.schema == sdf_schema
@@ -512,8 +510,7 @@ class DataStreamReader(OptionUtils):
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
- >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \
- schema = sdf_schema)
+ >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
True
>>> csv_sdf.schema == sdf_schema