From bb57bfe97d9fb077885065b8e804b85d4c493faf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 18:17:38 -0800 Subject: [SPARK-18657][SPARK-18668] Make StreamingQuery.id persists across restart and not auto-generate StreamingQuery.name ## What changes were proposed in this pull request? Here are the major changes in this PR. - Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`. - Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`). - Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default. - Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`. Implementation details - Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`). - Added the `id` as the new `StreamMetadata`. - When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`. - All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name` TODO - [x] Test handling of name=null in json generation of StreamingQueryProgress - [x] Test handling of name=null in json generation of StreamingQueryListener events - [x] Test python API of runId ## How was this patch tested? Updated unit tests and new unit tests Author: Tathagata Das Closes #16113 from tdas/SPARK-18657. --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) (limited to 'project/MimaExcludes.scala') diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index f3e5a21d77..82d50f9891 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -108,6 +108,9 @@ object MimaExcludes { // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"), + // [SPARK-18657] Add StreamingQuery.runId + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId"), + // [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"), -- cgit v1.2.3