diff options
author | uncleGen <hustyugm@gmail.com> | 2017-03-12 08:29:37 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-03-12 08:29:37 +0000 |
commit | e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d (patch) | |
tree | f1512df30b47dc27f922d175a9d7dc344c6ac29f | |
parent | f6fdf92d0dce2cb3340f3e2ff768e09ef69176cd (diff) | |
download | spark-e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d.tar.gz spark-e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d.tar.bz2 spark-e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d.zip |
[DOCS][SS] fix structured streaming python example
## What changes were proposed in this pull request?
- SS python example: `TypeError: 'xxx' object is not callable`
- some other doc issue.
## How was this patch tested?
Jenkins.
Author: uncleGen <hustyugm@gmail.com>
Closes #17257 from uncleGen/docs-ss-python.
3 files changed, 11 insertions, 11 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 995ac77a4f..7988472378 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -539,7 +539,7 @@ spark = SparkSession. ... # Read text from socket socketDF = spark \ - .readStream() \ + .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ @@ -552,7 +552,7 @@ socketDF.printSchema() # Read all the csv files written atomically in a directory userSchema = StructType().add("name", "string").add("age", "integer") csvDF = spark \ - .readStream() \ + .readStream \ .option("sep", ";") \ .schema(userSchema) \ .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory") @@ -971,7 +971,7 @@ Here is the compatibility matrix. <br/><br/> Update mode uses watermark to drop old aggregation state. <br/><br/> - Complete mode does drop not old aggregation state since by definition this mode + Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table. </td> </tr> @@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10") # Print new data to console noAggDF \ - .writeStream() \ + .writeStream \ .format("console") \ .start() # Write new data to Parquet files noAggDF \ - .writeStream() \ + .writeStream \ .format("parquet") \ .option("checkpointLocation", "path/to/checkpoint/dir") \ .option("path", "path/to/destination/dir") \ @@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count() # Print updated aggregations to console aggDF \ - .writeStream() \ + .writeStream \ .outputMode("complete") \ .format("console") \ .start() # Have all the aggregates in an in memory table. The query name will be the table name aggDF \ - .writeStream() \ + .writeStream \ .queryName("aggregates") \ .outputMode("complete") \ .format("memory") \ @@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu <div data-lang="python" markdown="1"> {% highlight python %} -query = df.writeStream().format("console").start() # get the query object +query = df.writeStream.format("console").start() # get the query object query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data @@ -1658,7 +1658,7 @@ aggDF {% highlight python %} aggDF \ - .writeStream() \ + .writeStream \ .outputMode("complete") \ .option("checkpointLocation", "path/to/HDFS/dir") \ .format("memory") \ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 411a15ffce..a9e64c6400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -97,7 +97,7 @@ class FileStreamSource( } seenFiles.purge() - logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs") + logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") /** * Returns the maximum offset that can be retrieved from the source. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index ed9305875c..905b1c52af 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -230,7 +230,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * - It must pass the user-provided file filter. * - It must be newer than the ignore threshold. It is assumed that files older than the ignore * threshold have already been considered or are existing files before start - * (when newFileOnly = true). + * (when newFilesOnly = true). * - It must not be present in the recently selected files that this class remembers. * - It must not be newer than the time of the batch (i.e. `currentTime` for which this * file is being tested. This can occur if the driver was recovered, and the missing batches |