aboutsummaryrefslogtreecommitdiff
path: root/docs/structured-streaming-programming-guide.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/structured-streaming-programming-guide.md')
-rw-r--r--docs/structured-streaming-programming-guide.md18
1 files changed, 9 insertions, 9 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 995ac77a4f..7988472378 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -539,7 +539,7 @@ spark = SparkSession. ...
# Read text from socket
socketDF = spark \
- .readStream() \
+ .readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
@@ -552,7 +552,7 @@ socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
- .readStream() \
+ .readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
@@ -971,7 +971,7 @@ Here is the compatibility matrix.
<br/><br/>
Update mode uses watermark to drop old aggregation state.
<br/><br/>
- Complete mode does drop not old aggregation state since by definition this mode
+ Complete mode does not drop old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
@@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF \
- .writeStream() \
+ .writeStream \
.format("console") \
.start()
# Write new data to Parquet files
noAggDF \
- .writeStream() \
+ .writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
@@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count()
# Print updated aggregations to console
aggDF \
- .writeStream() \
+ .writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Have all the aggregates in an in memory table. The query name will be the table name
aggDF \
- .writeStream() \
+ .writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
@@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu
<div data-lang="python" markdown="1">
{% highlight python %}
-query = df.writeStream().format("console").start() # get the query object
+query = df.writeStream.format("console").start() # get the query object
query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
@@ -1658,7 +1658,7 @@ aggDF
{% highlight python %}
aggDF \
- .writeStream() \
+ .writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \