aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2017-03-12 08:29:37 +0000
committerSean Owen <sowen@cloudera.com>2017-03-12 08:29:37 +0000
commite29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d (patch)
treef1512df30b47dc27f922d175a9d7dc344c6ac29f
parentf6fdf92d0dce2cb3340f3e2ff768e09ef69176cd (diff)
downloadspark-e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d.tar.gz
spark-e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d.tar.bz2
spark-e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d.zip
[DOCS][SS] fix structured streaming python example
## What changes were proposed in this pull request? - SS python example: `TypeError: 'xxx' object is not callable` - some other doc issue. ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17257 from uncleGen/docs-ss-python.
-rw-r--r--docs/structured-streaming-programming-guide.md18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
3 files changed, 11 insertions, 11 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 995ac77a4f..7988472378 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -539,7 +539,7 @@ spark = SparkSession. ...
# Read text from socket
socketDF = spark \
- .readStream() \
+ .readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
@@ -552,7 +552,7 @@ socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
- .readStream() \
+ .readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
@@ -971,7 +971,7 @@ Here is the compatibility matrix.
<br/><br/>
Update mode uses watermark to drop old aggregation state.
<br/><br/>
- Complete mode does drop not old aggregation state since by definition this mode
+ Complete mode does not drop old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
@@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF \
- .writeStream() \
+ .writeStream \
.format("console") \
.start()
# Write new data to Parquet files
noAggDF \
- .writeStream() \
+ .writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
@@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count()
# Print updated aggregations to console
aggDF \
- .writeStream() \
+ .writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Have all the aggregates in an in memory table. The query name will be the table name
aggDF \
- .writeStream() \
+ .writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
@@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu
<div data-lang="python" markdown="1">
{% highlight python %}
-query = df.writeStream().format("console").start() # get the query object
+query = df.writeStream.format("console").start() # get the query object
query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
@@ -1658,7 +1658,7 @@ aggDF
{% highlight python %}
aggDF \
- .writeStream() \
+ .writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 411a15ffce..a9e64c6400 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -97,7 +97,7 @@ class FileStreamSource(
}
seenFiles.purge()
- logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs")
+ logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
/**
* Returns the maximum offset that can be retrieved from the source.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index ed9305875c..905b1c52af 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -230,7 +230,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* - It must pass the user-provided file filter.
* - It must be newer than the ignore threshold. It is assumed that files older than the ignore
* threshold have already been considered or are existing files before start
- * (when newFileOnly = true).
+ * (when newFilesOnly = true).
* - It must not be present in the recently selected files that this class remembers.
* - It must not be newer than the time of the batch (i.e. `currentTime` for which this
* file is being tested. This can occur if the driver was recovered, and the missing batches