From 3783539d7ab83a2a632a9f35ca66ae39d01c28b6 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 17 Mar 2017 16:16:22 -0700 Subject: [SPARK-19873][SS] Record num shuffle partitions in offset log and enforce in next batch. ## What changes were proposed in this pull request? If the user changes the shuffle partition number between batches, Streaming aggregation will fail. Here are some possible cases: - Change "spark.sql.shuffle.partitions" - Use "repartition" and change the partition number in codes - RangePartitioner doesn't generate deterministic partitions. Right now it's safe as we disallow sort before aggregation. Not sure if we will add some operators using RangePartitioner in future. ## How was this patch tested? - Unit tests - Manual tests - forward compatibility tested by using the new `OffsetSeqMetadata` json with Spark v2.1.0 Author: Kunal Khamar Closes #17216 from kunalkhamar/num-partitions. --- .../structured-streaming/checkpoint-version-2.1.0/metadata | 1 + .../checkpoint-version-2.1.0/offsets/0 | 3 +++ .../checkpoint-version-2.1.0/offsets/1 | 3 +++ .../checkpoint-version-2.1.0/state/0/0/1.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/0/2.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/1/1.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/1/2.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/2/1.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/2/2.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/3/1.delta | Bin 0 -> 73 bytes .../checkpoint-version-2.1.0/state/0/3/2.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/4/1.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/4/2.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/5/1.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/5/2.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/6/1.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/6/2.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/7/1.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/7/2.delta | Bin 0 -> 79 bytes .../checkpoint-version-2.1.0/state/0/8/1.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/8/2.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/9/1.delta | Bin 0 -> 46 bytes .../checkpoint-version-2.1.0/state/0/9/2.delta | Bin 0 -> 79 bytes 23 files changed, 7 insertions(+) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta (limited to 'sql/core/src/test/resources') diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata new file mode 100644 index 0000000000..3492220e36 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata @@ -0,0 +1 @@ +{"id":"dddc5e7f-1e71-454c-8362-de184444fb5a"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 new file mode 100644 index 0000000000..cbde042e79 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1489180207737} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 new file mode 100644 index 0000000000..10b5774746 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1489180209261} +2 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta new file mode 100644 index 0000000000..7dc49cb3e4 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta new file mode 100644 index 0000000000..8b566e81f4 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta new file mode 100644 index 0000000000..ca2a7ed033 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta new file mode 100644 index 0000000000..361f2db605 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta new file mode 100644 index 0000000000..4c8804c61a Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta new file mode 100644 index 0000000000..7d3e07fe03 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta new file mode 100644 index 0000000000..fe521b8c07 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta new file mode 100644 index 0000000000..e69925caba Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta new file mode 100644 index 0000000000..36397a3dda Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta new file mode 100644 index 0000000000..6352978051 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta new file mode 100644 index 0000000000..0c9b6ac5c8 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta differ -- cgit v1.2.3