aboutsummaryrefslogtreecommitdiff
path: root/docs/structured-streaming-programming-guide.md
diff options
context:
space:
mode:
authorDmitriy Sokolov <silentsokolov@gmail.com>2016-08-30 11:23:37 +0100
committerSean Owen <sowen@cloudera.com>2016-08-30 11:23:37 +0100
commitd4eee9932edf1a489d7fe9120a0f003150834df6 (patch)
tree2b05ac9cfaf1e76ca0b44e579d54ec8b3b7494f2 /docs/structured-streaming-programming-guide.md
parentbefab9c1c6b59ad90f63a7d10e12b186be897f15 (diff)
downloadspark-d4eee9932edf1a489d7fe9120a0f003150834df6.tar.gz
spark-d4eee9932edf1a489d7fe9120a0f003150834df6.tar.bz2
spark-d4eee9932edf1a489d7fe9120a0f003150834df6.zip
[MINOR][DOCS] Fix minor typos in python example code
## What changes were proposed in this pull request? Fix minor typos python example code in streaming programming guide ## How was this patch tested? N/A Author: Dmitriy Sokolov <silentsokolov@gmail.com> Closes #14805 from silentsokolov/fix-typos.
Diffstat (limited to 'docs/structured-streaming-programming-guide.md')
-rw-r--r--docs/structured-streaming-programming-guide.md79
1 files changed, 39 insertions, 40 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 8a88e06ebd..cdc3975d7c 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -59,9 +59,9 @@ from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
-spark = SparkSession\
- .builder()\
- .appName("StructuredNetworkWordCount")\
+spark = SparkSession \
+ .builder() \
+ .appName("StructuredNetworkWordCount") \
.getOrCreate()
{% endhighlight %}
@@ -124,22 +124,22 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
{% highlight python %}
# Create DataFrame representing the stream of input lines from connection to localhost:9999
-lines = spark\
- .readStream\
- .format('socket')\
- .option('host', 'localhost')\
- .option('port', 9999)\
+lines = spark \
+ .readStream \
+ .format("socket") \
+ .option("host", "localhost") \
+ .option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
- split(lines.value, ' ')
- ).alias('word')
+ split(lines.value, " ")
+ ).alias("word")
)
# Generate running word count
-wordCounts = words.groupBy('word').count()
+wordCounts = words.groupBy("word").count()
{% endhighlight %}
This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
@@ -180,10 +180,10 @@ query.awaitTermination();
{% highlight python %}
# Start running the query that prints the running counts to the console
-query = wordCounts\
- .writeStream\
- .outputMode('complete')\
- .format('console')\
+query = wordCounts \
+ .writeStream \
+ .outputMode("complete") \
+ .format("console") \
.start()
query.awaitTermination()
@@ -488,7 +488,7 @@ spark = SparkSession. ...
# Read text from socket
socketDF = spark \
- .readStream() \
+ .readStream() \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
@@ -504,7 +504,7 @@ csvDF = spark \
.readStream() \
.option("sep", ";") \
.schema(userSchema) \
- .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
+ .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -596,8 +596,7 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
<div data-lang="python" markdown="1">
{% highlight python %}
-
-df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
+df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
@@ -653,11 +652,11 @@ Dataset<Row> windowedCounts = words.groupBy(
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
-words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
+words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
- window(words.timestamp, '10 minutes', '5 minutes'),
+ window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
{% endhighlight %}
@@ -704,7 +703,7 @@ streamingDf.join(staticDf, "type", "right_join"); // right outer join with a st
{% highlight python %}
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
-streamingDf.join(staticDf, "type") # inner equi-join with a static DF
+streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
{% endhighlight %}
@@ -907,25 +906,25 @@ spark.sql("select * from aggregates").show(); // interactively query in-memory
noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
-noAggDF\
- .writeStream()\
- .format("console")\
+noAggDF \
+ .writeStream() \
+ .format("console") \
.start()
# Write new data to Parquet files
-noAggDF\
- .writeStream()\
- .parquet("path/to/destination/directory")\
+noAggDF \
+ .writeStream() \
+ .parquet("path/to/destination/directory") \
.start()
# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()
# Print updated aggregations to console
-aggDF\
- .writeStream()\
- .outputMode("complete")\
- .format("console")\
+aggDF \
+ .writeStream() \
+ .outputMode("complete") \
+ .format("console") \
.start()
# Have all the aggregates in an in memory table. The query name will be the table name
@@ -1072,11 +1071,11 @@ spark.streams().awaitAnyTermination(); // block until any one of them terminat
{% highlight python %}
spark = ... # spark session
-spark.streams().active # get the list of currently active streaming queries
+spark.streams().active # get the list of currently active streaming queries
-spark.streams().get(id) # get a query object by its unique id
+spark.streams().get(id) # get a query object by its unique id
-spark.streams().awaitAnyTermination() # block until any one of them terminates
+spark.streams().awaitAnyTermination() # block until any one of them terminates
{% endhighlight %}
</div>
@@ -1116,11 +1115,11 @@ aggDF
<div data-lang="python" markdown="1">
{% highlight python %}
-aggDF\
- .writeStream()\
- .outputMode("complete")\
- .option("checkpointLocation", "path/to/HDFS/dir")\
- .format("memory")\
+aggDF \
+ .writeStream() \
+ .outputMode("complete") \
+ .option("checkpointLocation", "path/to/HDFS/dir") \
+ .format("memory") \
.start()
{% endhighlight %}