aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dev/.rat-excludes1
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala9
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala12
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact9
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/83
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/92
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact4
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/32
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/42
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt1
-rw-r--r--sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt1
-rw-r--r--sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/04
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala30
15 files changed, 114 insertions, 3 deletions
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index a3efddeaa5..6be1c72bc6 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager
.Rbuildignore
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse
+structured-streaming/*
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 13d717092a..868edb5dcd 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -81,7 +81,14 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
- partitionOffsets.foreach { case (tp, off) =>
+ implicit val ordering = new Ordering[TopicPartition] {
+ override def compare(x: TopicPartition, y: TopicPartition): Int = {
+ Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
+ }
+ }
+ val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
+ partitions.foreach { tp =>
+ val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 881018fd95..c8326ffcc7 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}
+
+ test("read Spark 2.1.0 log format") {
+ val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
+ assert(KafkaSourceOffset(offset) ===
+ KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
+ }
+
+ private def readFromResource(file: String): SerializedOffset = {
+ import scala.io.Source
+ val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
+ SerializedOffset(str)
+ }
}
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
new file mode 100644
index 0000000000..e1ec8a74f0
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
@@ -0,0 +1,9 @@
+v1
+{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"}
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
new file mode 100644
index 0000000000..e7989804e8
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
@@ -0,0 +1,3 @@
+v1
+{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
new file mode 100644
index 0000000000..42fb0ee416
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
new file mode 100644
index 0000000000..95f78bb262
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
@@ -0,0 +1,4 @@
+v1
+{"path":"/a/b/0","timestamp":1480730949000,"batchId":0}
+{"path":"/a/b/1","timestamp":1480730950000,"batchId":1}
+{"path":"/a/b/2","timestamp":1480730950000,"batchId":2}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
new file mode 100644
index 0000000000..2caa5972e4
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/3","timestamp":1480730950000,"batchId":3}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
new file mode 100644
index 0000000000..e54b943229
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/4","timestamp":1480730951000,"batchId":4}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
new file mode 100644
index 0000000000..51b4008129
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
@@ -0,0 +1 @@
+345
diff --git a/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
new file mode 100644
index 0000000000..6410031743
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
@@ -0,0 +1 @@
+{"topic1":{"0":456,"1":789},"topic2":{"0":0}}
diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
new file mode 100644
index 0000000000..fe5c1d44a6
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
@@ -0,0 +1,4 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
+0
+{"topic-0":{"0":1}} \ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index e046fee0c0..8a21b76e8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ test("read Spark 2.1.0 log format") {
+ assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
+ // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
+ SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
+ ))
+ }
+
/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
@@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
f(sinkLog)
}
}
+
+ private def readFromResource(dir: String): Seq[SinkFileStatus] = {
+ val input = getClass.getResource(s"/structured-streaming/$dir")
+ val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
+ log.allFiles()
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index d3a83ea0b9..d139efaaf8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -69,4 +69,20 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}
+
+ test("read Spark 2.1.0 log format") {
+ val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
+ assert(batchId === 0)
+ assert(offsetSeq.offsets === Seq(
+ Some(SerializedOffset("0")),
+ Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
+ ))
+ assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L)))
+ }
+
+ private def readFromResource(dir: String): (Long, OffsetSeq) = {
+ val input = getClass.getResource(s"/structured-streaming/$dir")
+ val log = new OffsetSeqLog(spark, input.toString)
+ log.getLatest().get
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 8256c63d87..ff1f3e26f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming
import java.io.File
-import scala.collection.mutable
-
import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
assert(options.maxFilesPerTrigger == Some(1))
}
+
+ test("FileStreamSource offset - read Spark 2.1.0 log format") {
+ val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
+ assert(LongOffset.convert(offset) === Some(LongOffset(345)))
+ }
+
+ test("FileStreamSourceLog - read Spark 2.1.0 log format") {
+ assert(readLogFromResource("file-source-log-version-2.1.0") === Seq(
+ FileEntry("/a/b/0", 1480730949000L, 0L),
+ FileEntry("/a/b/1", 1480730950000L, 1L),
+ FileEntry("/a/b/2", 1480730950000L, 2L),
+ FileEntry("/a/b/3", 1480730950000L, 3L),
+ FileEntry("/a/b/4", 1480730951000L, 4L)
+ ))
+ }
+
+ private def readLogFromResource(dir: String): Seq[FileEntry] = {
+ val input = getClass.getResource(s"/structured-streaming/$dir")
+ val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString)
+ log.allFiles()
+ }
+
+ private def readOffsetFromResource(file: String): SerializedOffset = {
+ import scala.io.Source
+ val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
+ SerializedOffset(str.trim)
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {