aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-06 13:05:22 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-06 13:05:22 -0800
commit1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0 (patch)
tree136f26601c832c9f40ee61e2f42bd20671e04e59
parentcb1f10b468e7771af75cb2288d375a87ab66d316 (diff)
downloadspark-1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0.tar.gz
spark-1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0.tar.bz2
spark-1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0.zip
[SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured Streaming log formats
## What changes were proposed in this pull request? To be able to restart StreamingQueries across Spark version, we have already made the logs (offset log, file source log, file sink log) use json. We should added tests with actual json files in the Spark such that any incompatible changes in reading the logs is immediately caught. This PR add tests for FileStreamSourceLog, FileStreamSinkLog, and OffsetSeqLog. ## How was this patch tested? new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16128 from tdas/SPARK-18671.
-rw-r--r--dev/.rat-excludes1
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala9
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala12
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact9
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/83
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/92
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact4
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/32
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/42
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt1
-rw-r--r--sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt1
-rw-r--r--sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/04
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala30
15 files changed, 114 insertions, 3 deletions
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index a3efddeaa5..6be1c72bc6 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager
.Rbuildignore
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse
+structured-streaming/*
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 13d717092a..868edb5dcd 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -81,7 +81,14 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
- partitionOffsets.foreach { case (tp, off) =>
+ implicit val ordering = new Ordering[TopicPartition] {
+ override def compare(x: TopicPartition, y: TopicPartition): Int = {
+ Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
+ }
+ }
+ val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
+ partitions.foreach { tp =>
+ val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 881018fd95..c8326ffcc7 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}
+
+ test("read Spark 2.1.0 log format") {
+ val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
+ assert(KafkaSourceOffset(offset) ===
+ KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
+ }
+
+ private def readFromResource(file: String): SerializedOffset = {
+ import scala.io.Source
+ val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
+ SerializedOffset(str)
+ }
}
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
new file mode 100644
index 0000000000..e1ec8a74f0
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
@@ -0,0 +1,9 @@
+v1
+{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"}
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
new file mode 100644
index 0000000000..e7989804e8
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
@@ -0,0 +1,3 @@
+v1
+{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
new file mode 100644
index 0000000000..42fb0ee416
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
new file mode 100644
index 0000000000..95f78bb262
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
@@ -0,0 +1,4 @@
+v1
+{"path":"/a/b/0","timestamp":1480730949000,"batchId":0}
+{"path":"/a/b/1","timestamp":1480730950000,"batchId":1}
+{"path":"/a/b/2","timestamp":1480730950000,"batchId":2}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
new file mode 100644
index 0000000000..2caa5972e4
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/3","timestamp":1480730950000,"batchId":3}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
new file mode 100644
index 0000000000..e54b943229
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/4","timestamp":1480730951000,"batchId":4}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
new file mode 100644
index 0000000000..51b4008129
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
@@ -0,0 +1 @@
+345
diff --git a/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
new file mode 100644
index 0000000000..6410031743
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
@@ -0,0 +1 @@
+{"topic1":{"0":456,"1":789},"topic2":{"0":0}}
diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
new file mode 100644
index 0000000000..fe5c1d44a6
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
@@ -0,0 +1,4 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
+0
+{"topic-0":{"0":1}} \ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index e046fee0c0..8a21b76e8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ test("read Spark 2.1.0 log format") {
+ assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
+ // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
+ SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
+ ))
+ }
+
/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
@@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
f(sinkLog)
}
}
+
+ private def readFromResource(dir: String): Seq[SinkFileStatus] = {
+ val input = getClass.getResource(s"/structured-streaming/$dir")
+ val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
+ log.allFiles()
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index d3a83ea0b9..d139efaaf8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -69,4 +69,20 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}
+
+ test("read Spark 2.1.0 log format") {
+ val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
+ assert(batchId === 0)
+ assert(offsetSeq.offsets === Seq(
+ Some(SerializedOffset("0")),
+ Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
+ ))
+ assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L)))
+ }
+
+ private def readFromResource(dir: String): (Long, OffsetSeq) = {
+ val input = getClass.getResource(s"/structured-streaming/$dir")
+ val log = new OffsetSeqLog(spark, input.toString)
+ log.getLatest().get
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 8256c63d87..ff1f3e26f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming
import java.io.File
-import scala.collection.mutable
-
import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
assert(options.maxFilesPerTrigger == Some(1))
}
+
+ test("FileStreamSource offset - read Spark 2.1.0 log format") {
+ val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
+ assert(LongOffset.convert(offset) === Some(LongOffset(345)))
+ }
+
+ test("FileStreamSourceLog - read Spark 2.1.0 log format") {
+ assert(readLogFromResource("file-source-log-version-2.1.0") === Seq(
+ FileEntry("/a/b/0", 1480730949000L, 0L),
+ FileEntry("/a/b/1", 1480730950000L, 1L),
+ FileEntry("/a/b/2", 1480730950000L, 2L),
+ FileEntry("/a/b/3", 1480730950000L, 3L),
+ FileEntry("/a/b/4", 1480730951000L, 4L)
+ ))
+ }
+
+ private def readLogFromResource(dir: String): Seq[FileEntry] = {
+ val input = getClass.getResource(s"/structured-streaming/$dir")
+ val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString)
+ log.allFiles()
+ }
+
+ private def readOffsetFromResource(file: String): SerializedOffset = {
+ import scala.io.Source
+ val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
+ SerializedOffset(str.trim)
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {