aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-18459][SPARK-18460][STRUCTUREDSTREAMING] Rename triggerId to batchId ↵Tathagata Das2016-11-161-3/+3
| | | | | | | | | | | | | | | | | and add triggerDetails to json in StreamingQueryStatus ## What changes were proposed in this pull request? SPARK-18459: triggerId seems like a number that should be increasing with each trigger, whether or not there is data in it. However, actually, triggerId increases only where there is a batch of data in a trigger. So its better to rename it to batchId. SPARK-18460: triggerDetails was missing from json representation. Fixed it. ## How was this patch tested? Updated existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15895 from tdas/SPARK-18459.
* [SPARK-17829][SQL] Stable format for offset logTyson Condie2016-11-091-6/+6
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie <tcondie@gmail.com> Author: Tyson Condie <tcondie@clash.local> Closes #15626 from tcondie/spark-8360.
* [SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column ↵hyukjinkwon2016-11-011-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | to JSON string ## What changes were proposed in this pull request? This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python. It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function. The usage is as below: ``` scala val df = Seq(Tuple1(Tuple1(1))).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` bash +--------+ | json| +--------+ |{"_1":1}| +--------+ ``` ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #15354 from HyukjinKwon/SPARK-17764.
* [SQL][DOC] updating doc for JSON source to link to jsonlines.orgFelix Cheung2016-10-261-1/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? API and programming guide doc changes for Scala, Python and R. ## How was this patch tested? manual test Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #15629 from felixcheung/jsondoc.
* [SPARK-17926][SQL][STREAMING] Added json for statusesTathagata Das2016-10-211-6/+5
| | | | | | | | | | | | | ## What changes were proposed in this pull request? StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`. ## How was this patch tested? New unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15476 from tdas/SPARK-17926.
* [SPARK-17731][SQL][STREAMING] Metrics for structured streamingTathagata Das2016-10-131-0/+301
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. ### New APIs - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15307 from tdas/SPARK-17731.
* [MINOR][PYSPARK][DOCS] Fix examples in PySpark documentationhyukjinkwon2016-09-281-3/+3
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to fix wrongly indented examples in PySpark documentation ``` - >>> json_sdf = spark.readStream.format("json")\ - .schema(sdf_schema)\ - .load(tempfile.mkdtemp()) + >>> json_sdf = spark.readStream.format("json") \\ + ... .schema(sdf_schema) \\ + ... .load(tempfile.mkdtemp()) ``` ``` - people.filter(people.age > 30).join(department, people.deptId == department.id)\ + people.filter(people.age > 30).join(department, people.deptId == department.id) \\ ``` ``` - >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ - LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] + >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), + ... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] ``` ``` - >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \ - LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] + >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), + ... LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] ``` ``` - ... for x in iterator: - ... print(x) + ... for x in iterator: + ... print(x) ``` ## How was this patch tested? Manually tested. **Before** ![2016-09-26 8 36 02](https://cloud.githubusercontent.com/assets/6477701/18834471/05c7a478-8431-11e6-94bb-09aa37b12ddb.png) ![2016-09-26 9 22 16](https://cloud.githubusercontent.com/assets/6477701/18834472/06c8735c-8431-11e6-8775-78631eab0411.png) <img width="601" alt="2016-09-27 2 29 27" src="https://cloud.githubusercontent.com/assets/6477701/18861294/29c0d5b4-84bf-11e6-99c5-3c9d913c125d.png"> <img width="1056" alt="2016-09-27 2 29 58" src="https://cloud.githubusercontent.com/assets/6477701/18861298/31694cd8-84bf-11e6-9e61-9888cb8c2089.png"> <img width="1079" alt="2016-09-27 2 30 05" src="https://cloud.githubusercontent.com/assets/6477701/18861301/359722da-84bf-11e6-97f9-5f5365582d14.png"> **After** ![2016-09-26 9 29 47](https://cloud.githubusercontent.com/assets/6477701/18834467/0367f9da-8431-11e6-86d9-a490d3297339.png) ![2016-09-26 9 30 24](https://cloud.githubusercontent.com/assets/6477701/18834463/f870fae0-8430-11e6-9482-01fc47898492.png) <img width="515" alt="2016-09-27 2 28 19" src="https://cloud.githubusercontent.com/assets/6477701/18861305/3ff88b88-84bf-11e6-902c-9f725e8a8b10.png"> <img width="652" alt="2016-09-27 3 50 59" src="https://cloud.githubusercontent.com/assets/6477701/18863053/592fbc74-84ca-11e6-8dbf-99cf57947de8.png"> <img width="709" alt="2016-09-27 3 51 03" src="https://cloud.githubusercontent.com/assets/6477701/18863060/601607be-84ca-11e6-80aa-a401df41c321.png"> Author: hyukjinkwon <gurwls223@gmail.com> Closes #15242 from HyukjinKwon/minor-example-pyspark.
* [SPARK-17583][SQL] Remove uesless rowSeparator variable and set ↵hyukjinkwon2016-09-211-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | auto-expanding buffer as default for maxCharsPerColumn option in CSV ## What changes were proposed in this pull request? This PR includes the changes below: 1. Upgrade Univocity library from 2.1.1 to 2.2.1 This includes some performance improvement and also enabling auto-extending buffer in `maxCharsPerColumn` option in CSV. Please refer the [release notes](https://github.com/uniVocity/univocity-parsers/releases). 2. Remove useless `rowSeparator` variable existing in `CSVOptions` We have this unused variable in [CSVOptions.scala#L127](https://github.com/apache/spark/blob/29952ed096fd2a0a19079933ff691671d6f00835/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala#L127) but it seems possibly causing confusion that it actually does not care of `\r\n`. For example, we have an issue open about this, [SPARK-17227](https://issues.apache.org/jira/browse/SPARK-17227), describing this variable. This variable is virtually not being used because we rely on `LineRecordReader` in Hadoop which deals with only both `\n` and `\r\n`. 3. Set the default value of `maxCharsPerColumn` to auto-expending. We are setting 1000000 for the length of each column. It'd be more sensible we allow auto-expending rather than fixed length by default. To make sure, using `-1` is being described in the release note, [2.2.0](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.2.0). ## How was this patch tested? N/A Author: hyukjinkwon <gurwls223@gmail.com> Closes #15138 from HyukjinKwon/SPARK-17583.
* [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV cast null values properlyLiwei Lin2016-09-181-1/+2
| | | | | | | | | | | | | | | | | | | | ## Problem CSV in Spark 2.0.0: - does not read null values back correctly for certain data types such as `Boolean`, `TimestampType`, `DateType` -- this is a regression comparing to 1.6; - does not read empty values (specified by `options.nullValue`) as `null`s for `StringType` -- this is compatible with 1.6 but leads to problems like SPARK-16903. ## What changes were proposed in this pull request? This patch makes changes to read all empty values back as `null`s. ## How was this patch tested? New test cases. Author: Liwei Lin <lwlin7@gmail.com> Closes #14118 from lw-lin/csv-cast-null.
* [SPARK-17264][SQL] DataStreamWriter should document that it only supports ↵Sean Owen2016-08-301-1/+1
| | | | | | | | | | | | | | | | Parquet for now ## What changes were proposed in this pull request? Clarify that only parquet files are supported by DataStreamWriter now ## How was this patch tested? (Doc build -- no functional changes to test) Author: Sean Owen <sowen@cloudera.com> Closes #14860 from srowen/SPARK-17264.
* [SPARK-17215][SQL] Method `SQLContext.parseDataType(dataTypeString: String)` ↵jiangxingbo2016-08-241-1/+3
| | | | | | | | | | | | | | | | | could be removed. ## What changes were proposed in this pull request? Method `SQLContext.parseDataType(dataTypeString: String)` could be removed, we should use `SparkSession.parseDataType(dataTypeString: String)` instead. This require updating PySpark. ## How was this patch tested? Existing test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #14790 from jiangxb1987/parseDataType.
* [SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and ↵hyukjinkwon2016-08-241-8/+22
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | dateFormat/timestampFormat option for CSV and JSON ## What changes were proposed in this pull request? ### Default - ISO 8601 Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below: - CSV ``` // TimestampType 1414459800000000 // DateType 16673 ``` - Json ``` // TimestampType 1970-01-01 11:46:40.0 // DateType 1970-01-01 ``` So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed. So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)). - For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`) ``` 1970-01-01T02:00:01.000-01:00 ``` - For `Date` it becomes as below (`yyyy-MM-dd`) ``` 1970-01-01 ``` ### Custom date format option - `dateFormat` This PR also adds the support to write and read dates and timestamps in a formatted string as below: - **DateType** - With `dateFormat` option (e.g. `yyyy/MM/dd`) ``` +----------+ | date| +----------+ |2015/08/26| |2014/10/27| |2016/01/28| +----------+ ``` ### Custom date format option - `timestampFormat` - **TimestampType** - With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`) ``` +----------------+ | date| +----------------+ |2015/08/26 18:00| |2014/10/27 18:30| |2016/01/28 20:00| +----------------+ ``` ## How was this patch tested? Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14279 from HyukjinKwon/SPARK-16216-json-csv.
* [SPARK-16772] Correct API doc references to PySpark classes + formatting fixesNicholas Chammas2016-07-281-4/+4
| | | | | | | | | | | | | | | | | | ## What's Been Changed The PR corrects several broken or missing class references in the Python API docs. It also correct formatting problems. For example, you can see [here](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.SQLContext.registerFunction) how Sphinx is not picking up the reference to `DataType`. That's because the reference is relative to the current module, whereas `DataType` is in a different module. You can also see [here](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame) how the formatting for byte, tinyint, and so on is italic instead of monospace. That's because in ReST single backticks just make things italic, unlike in Markdown. ## Testing I tested this PR by [building the Python docs](https://github.com/apache/spark/tree/master/docs#generating-the-documentation-html) and reviewing the results locally in my browser. I confirmed that the broken or missing class references were resolved, and that the formatting was corrected. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Closes #14393 from nchammas/python-docstring-fixes.
* [SPARK-16335][SQL] Structured streaming should fail if source directory does ↵Reynold Xin2016-07-011-7/+4
| | | | | | | | | | | | | | not exist ## What changes were proposed in this pull request? In structured streaming, Spark does not report errors when the specified directory does not exist. This is a behavior different from the batch mode. This patch changes the behavior to fail if the directory does not exist (when the path is not a glob pattern). ## How was this patch tested? Updated unit tests to reflect the new behavior. Author: Reynold Xin <rxin@databricks.com> Closes #14002 from rxin/SPARK-16335.
* [SPARK-16313][SQL] Spark should not silently drop exceptions in file listingReynold Xin2016-06-301-1/+1
| | | | | | | | | | | | ## What changes were proposed in this pull request? Spark silently drops exceptions during file listing. This is a very bad behavior because it can mask legitimate errors and the resulting plan will silently have 0 rows. This patch changes it to not silently drop the errors. ## How was this patch tested? Manually verified. Author: Reynold Xin <rxin@databricks.com> Closes #13987 from rxin/SPARK-16313.
* [SPARK-16266][SQL][STREAING] Moved DataStreamReader/Writer from pyspark.sql ↵Tathagata Das2016-06-281-4/+498
| | | | | | | | | | | | | | | | | | to pyspark.sql.streaming ## What changes were proposed in this pull request? - Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming to make them consistent with scala packaging - Exposed the necessary classes in sql.streaming package so that they appear in the docs - Added pyspark.sql.streaming module to the docs ## How was this patch tested? - updated unit tests. - generated docs for testing visibility of pyspark.sql.streaming classes. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13955 from tdas/SPARK-16266.
* [SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQueryTathagata Das2016-06-151-40/+39
| | | | | | | | | | Renamed for simplicity, so that its obvious that its related to streaming. Existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13673 from tdas/SPARK-15953.
* [SPARK-15935][PYSPARK] Fix a wrong format tag in the error messageShixiong Zhu2016-06-141-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? A follow up PR for #13655 to fix a wrong format tag. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13665 from zsxwing/fix.
* [SPARK-15933][SQL][STREAMING] Refactored DF reader-writer to use readStream ↵Tathagata Das2016-06-141-4/+4
| | | | | | | | | | | | | | | | and writeStream for streaming DFs ## What changes were proposed in this pull request? Currently, the DataFrameReader/Writer has method that are needed for streaming and non-streaming DFs. This is quite awkward because each method in them through runtime exception for one case or the other. So rather having half the methods throw runtime exceptions, its just better to have a different reader/writer API for streams. - [x] Python API!! ## How was this patch tested? Existing unit tests + two sets of unit tests for DataFrameReader/Writer and DataStreamReader/Writer. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13653 from tdas/SPARK-15933.
* [SPARK-15935][PYSPARK] Enable test for sql/streaming.py and fix these testsShixiong Zhu2016-06-141-20/+41
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR just enables tests for sql/streaming.py and also fixes the failures. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13655 from zsxwing/python-streaming-test.
* [MINOR] Fix Typos 'an -> a'Zheng RuiFeng2016-06-061-1/+1
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `an -> a` Use cmds like `find . -name '*.R' | xargs -i sh -c "grep -in ' an [^aeiou]' {} && echo {}"` to generate candidates, and review them one by one. ## How was this patch tested? manual tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #13515 from zhengruifeng/an_a.
* [SPARK-15686][SQL] Move user-facing streaming classes into sql.streamingReynold Xin2016-06-011-1/+2
| | | | | | | | | | | | ## What changes were proposed in this pull request? This patch moves all user-facing structured streaming classes into sql.streaming. As part of this, I also added some since version annotation to methods and classes that don't have them. ## How was this patch tested? Updated tests to reflect the moves. Author: Reynold Xin <rxin@databricks.com> Closes #13429 from rxin/SPARK-15686.
* [SPARK-14896][SQL] Deprecate HiveContext in pythonAndrew Or2016-05-041-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? See title. ## How was this patch tested? PySpark tests. Author: Andrew Or <andrew@databricks.com> Closes #12917 from andrewor14/deprecate-hive-context-python.
* [SPARK-14555] Second cut of Python API for Structured StreamingBurak Yavuz2016-04-281-11/+124
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR adds Python APIs for: - `ContinuousQueryManager` - `ContinuousQueryException` The `ContinuousQueryException` is a very basic wrapper, it doesn't provide the functionality that the Scala side provides, but it follows the same pattern for `AnalysisException`. For `ContinuousQueryManager`, all APIs are provided except for registering listeners. This PR also attempts to fix test flakiness by stopping all active streams just before tests. ## How was this patch tested? Python Doc tests and unit tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #12673 from brkyvz/pyspark-cqm.
* [SPARK-14555] First cut of Python API for Structured StreamingBurak Yavuz2016-04-201-0/+124
## What changes were proposed in this pull request? This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes: - ContinuousQuery - Trigger - ProcessingTime in pyspark under `pyspark.sql.streaming`. In addition, it contains the new methods added under: - `DataFrameWriter` a) `startStream` b) `trigger` c) `queryName` - `DataFrameReader` a) `stream` - `DataFrame` a) `isStreaming` This PR doesn't contain all methods exposed for `ContinuousQuery`, for example: - `exception` - `sourceStatuses` - `sinkStatus` They may be added in a follow up. This PR also contains some very minor doc fixes in the Scala side. ## How was this patch tested? Python doc tests TODO: - [ ] verify Python docs look good Author: Burak Yavuz <brkyvz@gmail.com> Author: Burak Yavuz <burak@databricks.com> Closes #12320 from brkyvz/stream-python.