aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2016-11-09 15:03:22 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-09 15:03:22 -0800
commit3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 (patch)
tree626afd38724496d5630f54d17acef35424c0746a
parent64fbdf1aa90b66269daec29f62dc9431c1173bab (diff)
downloadspark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.gz
spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.bz2
spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.zip
[SPARK-17829][SQL] Stable format for offset log
## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie <tcondie@gmail.com> Author: Tyson Condie <tcondie@clash.local> Closes #15626 from tcondie/spark-8360.
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala2
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala19
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala14
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala55
-rw-r--r--python/pyspark/sql/streaming.py12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala)15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala80
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala63
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala24
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala38
25 files changed, 402 insertions, 150 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 40d568a12c..13d717092a 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.kafka010
-import java.io.Writer
-
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index b21508cd7e..5bcc5124b0 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -114,7 +116,22 @@ private[kafka010] case class KafkaSource(
* `KafkaConsumer.poll` may hang forever (KAFKA-1894).
*/
private lazy val initialPartitionOffsets = {
- val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
+ val metadataLog =
+ new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
+ out.write(bytes.length)
+ out.write(bytes)
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ val length = in.read()
+ val bytes = new Array[Byte](length)
+ in.read(bytes)
+ KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8)))
+ }
+ }
+
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index b5ade98251..b5da415b30 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition
-import org.apache.spark.sql.execution.streaming.Offset
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
@@ -27,9 +27,8 @@ import org.apache.spark.sql.execution.streaming.Offset
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {
- override def toString(): String = {
- partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]")
- }
+
+ override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
/** Companion object of the [[KafkaSourceOffset]] */
@@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset {
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
offset match {
case o: KafkaSourceOffset => o.partitionToOffsets
+ case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets
case _ =>
throw new IllegalArgumentException(
s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
@@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset {
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap)
}
+
+ /**
+ * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]]
+ */
+ def apply(offset: SerializedOffset): KafkaSourceOffset =
+ KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 7056a41b17..881018fd95 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -17,9 +17,13 @@
package org.apache.spark.sql.kafka010
+import java.io.File
+
+import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.OffsetSuite
+import org.apache.spark.sql.test.SharedSQLContext
-class KafkaSourceOffsetSuite extends OffsetSuite {
+class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
compare(
one = KafkaSourceOffset(("t", 0, 1L)),
@@ -36,4 +40,53 @@ class KafkaSourceOffsetSuite extends OffsetSuite {
compare(
one = KafkaSourceOffset(("t", 0, 1L)),
two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)))
+
+
+ val kso1 = KafkaSourceOffset(("t", 0, 1L))
+ val kso2 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L))
+ val kso3 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L), ("t", 1, 4L))
+
+ compare(KafkaSourceOffset(SerializedOffset(kso1.json)),
+ KafkaSourceOffset(SerializedOffset(kso2.json)))
+
+ test("basic serialization - deserialization") {
+ assert(KafkaSourceOffset.getPartitionOffsets(kso1) ==
+ KafkaSourceOffset.getPartitionOffsets(SerializedOffset(kso1.json)))
+ }
+
+
+ testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
+ withTempDir { temp =>
+ // use non-existent directory to test whether log make the dir
+ val dir = new File(temp, "dir")
+ val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
+ val batch0 = OffsetSeq.fill(kso1)
+ val batch1 = OffsetSeq.fill(kso2, kso3)
+
+ val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o =>
+ SerializedOffset(o.json))): _*)
+
+ val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o =>
+ SerializedOffset(o.json))): _*)
+
+ assert(metadataLog.add(0, batch0))
+ assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+
+ assert(metadataLog.add(1, batch1))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+ assert(metadataLog.get(1) === Some(batch1Serialized))
+ assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+ assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+
+ // Adding the same batch does nothing
+ metadataLog.add(1, OffsetSeq.fill(LongOffset(3)))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+ assert(metadataLog.get(1) === Some(batch1Serialized))
+ assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+ assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+ }
+ }
}
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 1c94413e3c..f326f16232 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -220,7 +220,7 @@ class StreamingQueryStatus(object):
triggerId: 5
Source statuses [1 source]:
Source 1 - MySource1
- Available offset: #0
+ Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
@@ -228,7 +228,7 @@ class StreamingQueryStatus(object):
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
- Committed offsets: [#1, -]
+ Committed offsets: [1, -]
"""
return self._jsqs.toString()
@@ -366,7 +366,7 @@ class SourceStatus(object):
>>> print(sqs.sourceStatuses[0])
Status of source MySource1
- Available offset: #0
+ Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
@@ -396,7 +396,7 @@ class SourceStatus(object):
Description of the current offset if known.
>>> sqs.sourceStatuses[0].offsetDesc
- u'#0'
+ u'0'
"""
return self._jss.offsetDesc()
@@ -457,7 +457,7 @@ class SinkStatus(object):
>>> print(sqs.sinkStatus)
Status of sink MySink
- Committed offsets: [#1, -]
+ Committed offsets: [1, -]
"""
return self._jss.toString()
@@ -481,7 +481,7 @@ class SinkStatus(object):
Description of the current offsets up to which data has been written by the sink.
>>> sqs.sinkStatus.offsetDesc
- u'[#1, -]'
+ u'[1, -]'
"""
return self._jss.offsetDesc()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index b26edeeb04..8af3db1968 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -24,6 +24,8 @@ import scala.io.{Source => IOSource}
import scala.reflect.ClassTag
import org.apache.hadoop.fs.{Path, PathFilter}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
import org.apache.spark.sql.SparkSession
@@ -37,7 +39,7 @@ import org.apache.spark.sql.SparkSession
* compact log files every 10 batches by default into a big file. When
* doing a compaction, it will read all old log files and merge them with the new batch.
*/
-abstract class CompactibleFileStreamLog[T: ClassTag](
+abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
metadataLogVersion: String,
sparkSession: SparkSession,
path: String)
@@ -45,6 +47,11 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
import CompactibleFileStreamLog._
+ private implicit val formats = Serialization.formats(NoTypeHints)
+
+ /** Needed to serialize type T into JSON when using Jackson */
+ private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
+
/**
* If we delete the old files after compaction at once, there is a race condition in S3: other
* processes may see the old files are deleted but still cannot see the compaction file using
@@ -59,16 +66,6 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
protected def compactInterval: Int
/**
- * Serialize the data into encoded string.
- */
- protected def serializeData(t: T): String
-
- /**
- * Deserialize the string into data object.
- */
- protected def deserializeData(encodedString: String): T
-
- /**
* Filter out the obsolete logs.
*/
def compactLogs(logs: Seq[T]): Seq[T]
@@ -99,7 +96,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
out.write(metadataLogVersion.getBytes(UTF_8))
logData.foreach { data =>
out.write('\n')
- out.write(serializeData(data).getBytes(UTF_8))
+ out.write(Serialization.write(data).getBytes(UTF_8))
}
}
@@ -112,7 +109,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
if (version != metadataLogVersion) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
- lines.map(deserializeData).toArray
+ lines.map(Serialization.read[T]).toArray
}
override def add(batchId: Long, logs: Array[T]): Boolean = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index f9e24167a1..b4f14151f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -93,14 +93,6 @@ class FileStreamSinkLog(
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
- protected override def serializeData(data: SinkFileStatus): String = {
- write(data)
- }
-
- protected override def deserializeData(encodedString: String): SinkFileStatus = {
- read[SinkFileStatus](encodedString)
- }
-
override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 680df01acc..8494aef004 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -131,8 +131,8 @@ class FileStreamSource(
* Returns the data that is between the offsets (`start`, `end`].
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
- val endId = end.asInstanceOf[LongOffset].offset
+ val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset
+ val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset
assert(startId <= endId)
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 4681f2ba08..fe81b15607 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -60,14 +60,6 @@ class FileStreamSourceLog(
}
}
- protected override def serializeData(data: FileEntry): String = {
- Serialization.write(data)
- }
-
- protected override def deserializeData(encodedString: String): FileEntry = {
- Serialization.read[FileEntry](encodedString)
- }
-
def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
logs
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 9a0f87cf04..db7057d7da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution.streaming
-import java.io.{FileNotFoundException, InputStream, IOException, OutputStream}
+import java.io._
+import java.nio.charset.StandardCharsets
import java.util.{ConcurrentModificationException, EnumSet, UUID}
import scala.reflect.ClassTag
@@ -26,9 +27,10 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
-import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.UninterruptibleThread
@@ -44,9 +46,14 @@ import org.apache.spark.util.UninterruptibleThread
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
*/
-class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
+class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {
+ private implicit val formats = Serialization.formats(NoTypeHints)
+
+ /** Needed to serialize type T into JSON when using Jackson */
+ private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
+
// Avoid serializing generic sequences, see SPARK-17372
require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
"Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
@@ -67,8 +74,6 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
override def accept(path: Path): Boolean = isBatchFile(path)
}
- private val serializer = new JavaSerializer(sparkSession.sparkContext.conf).newInstance()
-
protected def batchIdToPath(batchId: Long): Path = {
new Path(metadataPath, batchId.toString)
}
@@ -88,14 +93,13 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
protected def serialize(metadata: T, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
- val outStream = serializer.serializeStream(out)
- outStream.writeObject(metadata)
+ Serialization.write(metadata, out)
}
protected def deserialize(in: InputStream): T = {
// called inside a try-finally where the underlying stream is closed in the caller
- val inStream = serializer.deserializeStream(in)
- inStream.readObject[T]()
+ val reader = new InputStreamReader(in, StandardCharsets.UTF_8)
+ Serialization.read[T](reader)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index c5e8827777..5f0b195fcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -22,8 +22,27 @@ package org.apache.spark.sql.execution.streaming
*/
case class LongOffset(offset: Long) extends Offset {
+ override val json = offset.toString
+
def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+}
+
+object LongOffset {
+
+ /**
+ * LongOffset factory from serialized offset.
+ * @return new LongOffset
+ */
+ def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong)
- override def toString: String = s"#$offset"
+ /**
+ * Convert generic Offset to LongOffset if possible.
+ * @return converted LongOffset
+ */
+ def convert(offset: Offset): Option[LongOffset] = offset match {
+ case lo: LongOffset => Some(lo)
+ case so: SerializedOffset => Some(LongOffset(so))
+ case _ => None
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
index 1f52abf277..4efcee0f8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
@@ -23,4 +23,38 @@ package org.apache.spark.sql.execution.streaming
* ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no
* new data has arrived.
*/
-trait Offset extends Serializable {}
+abstract class Offset {
+
+ /**
+ * Equality based on JSON string representation. We leverage the
+ * JSON representation for normalization between the Offset's
+ * in memory and on disk representations.
+ */
+ override def equals(obj: Any): Boolean = obj match {
+ case o: Offset => this.json == o.json
+ case _ => false
+ }
+
+ override def hashCode(): Int = this.json.hashCode
+
+ override def toString(): String = this.json.toString
+
+ /**
+ * A JSON-serialized representation of an Offset that is
+ * used for saving offsets to the offset log.
+ * Note: We assume that equivalent/equal offsets serialize to
+ * identical JSON strings.
+ *
+ * @return JSON string encoding
+ */
+ def json: String
+}
+
+/**
+ * Used when loading a JSON serialized offset from external storage.
+ * We are currently not responsible for converting JSON serialized
+ * data into an internal (i.e., object) representation. Sources should
+ * define a factory method in their source Offset companion objects
+ * that accepts a [[SerializedOffset]] for doing the conversion.
+ */
+case class SerializedOffset(override val json: String) extends Offset
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index ebc6ee8184..a4e1fe6797 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -17,12 +17,14 @@
package org.apache.spark.sql.execution.streaming
+
/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*/
-case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
+case class OffsetSeq(offsets: Seq[Option[Offset]]) {
+
/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
* sources.
@@ -36,15 +38,16 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
}
override def toString: String =
- offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")
+ offsets.map(_.map(_.json).getOrElse("-")).mkString("[", ", ", "]")
}
-object CompositeOffset {
+object OffsetSeq {
+
/**
- * Returns a [[CompositeOffset]] with a variable sequence of offsets.
+ * Returns a [[OffsetSeq]] with a variable sequence of offsets.
* `nulls` in the sequence are converted to `None`s.
*/
- def fill(offsets: Offset*): CompositeOffset = {
- CompositeOffset(offsets.map(Option(_)))
+ def fill(offsets: Offset*): OffsetSeq = {
+ OffsetSeq(offsets.map(Option(_)))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
new file mode 100644
index 0000000000..d1c9d95be9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -0,0 +1,80 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.streaming
+
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * This class is used to log offsets to persistent files in HDFS.
+ * Each file corresponds to a specific batch of offsets. The file
+ * format contain a version string in the first line, followed
+ * by a the JSON string representation of the offsets separated
+ * by a newline character. If a source offset is missing, then
+ * that line will contain a string value defined in the
+ * SERIALIZED_VOID_OFFSET variable in [[OffsetSeqLog]] companion object.
+ * For instance, when dealine wiht [[LongOffset]] types:
+ * v1 // version 1
+ * {0} // LongOffset 0
+ * {3} // LongOffset 3
+ * - // No offset for this source i.e., an invalid JSON string
+ * {2} // LongOffset 2
+ * ...
+ */
+class OffsetSeqLog(sparkSession: SparkSession, path: String)
+ extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
+
+ override protected def deserialize(in: InputStream): OffsetSeq = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ def parseOffset(value: String): Offset = value match {
+ case OffsetSeqLog.SERIALIZED_VOID_OFFSET => null
+ case json => SerializedOffset(json)
+ }
+ val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+ if (!lines.hasNext) {
+ throw new IllegalStateException("Incomplete log file")
+ }
+ val version = lines.next()
+ if (version != OffsetSeqLog.VERSION) {
+ throw new IllegalStateException(s"Unknown log version: ${version}")
+ }
+ OffsetSeq.fill(lines.map(parseOffset).toArray: _*)
+ }
+
+ override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
+ metadata.offsets.map(_.map(_.json)).foreach { offset =>
+ out.write('\n')
+ offset match {
+ case Some(json: String) => out.write(json.getBytes(UTF_8))
+ case None => out.write(OffsetSeqLog.SERIALIZED_VOID_OFFSET.getBytes(UTF_8))
+ }
+ }
+ }
+}
+
+object OffsetSeqLog {
+ private val VERSION = "v1"
+ private val SERIALIZED_VOID_OFFSET = "-"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index f3bd5bfe23..75ffe90f2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -45,6 +45,14 @@ trait Source {
* Higher layers will always call this method with a value of `start` greater than or equal
* to the last value passed to `commit` and a value of `end` less than or equal to the
* last value returned by `getOffset`
+ *
+ * It is possible for the [[Offset]] type to be a [[SerializedOffset]] when it was
+ * obtained from the log. Moreover, [[StreamExecution]] only compares the [[Offset]]
+ * JSON representation to determine if the two objects are equal. This could have
+ * ramifications when upgrading [[Offset]] JSON formats i.e., two equivalent [[Offset]]
+ * objects could differ between version. Consequently, [[StreamExecution]] may call
+ * this method with two such equivalent [[Offset]] objects. In which case, the [[Source]]
+ * should return an empty [[DataFrame]]
*/
def getBatch(start: Option[Offset], end: Offset): DataFrame
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 37af1a550a..57e89f8536 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -149,7 +148,7 @@ class StreamExecution(
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
- val offsetLog = new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))
+ val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
/** Whether the query is currently active or not */
override def isActive: Boolean = state == ACTIVE
@@ -249,7 +248,7 @@ class StreamExecution(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
- Some(committedOffsets.toCompositeOffset(sources)))
+ Some(committedOffsets.toOffsetSeq(sources)))
logError(s"Query $name terminated with error", e)
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
@@ -343,7 +342,7 @@ class StreamExecution(
}
if (hasNewData) {
reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
- assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+ assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")
@@ -684,14 +683,14 @@ class StreamExecution(
val sourceStatuses = sources.map { s =>
SourceStatus(
s.toString,
- localAvailableOffsets.get(s).map(_.toString).getOrElse("-"), // TODO: use json if available
+ localAvailableOffsets.get(s).map(_.json).getOrElse("-"),
streamMetrics.currentSourceInputRate(s),
streamMetrics.currentSourceProcessingRate(s),
streamMetrics.currentSourceTriggerDetails(s))
}.toArray
val sinkStatus = SinkStatus(
sink.toString,
- committedOffsets.toCompositeOffset(sources).toString)
+ committedOffsets.toOffsetSeq(sources).toString)
currentStatus =
StreamingQueryStatus(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index db0bd9e6bc..05a6547670 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -26,8 +26,8 @@ class StreamProgress(
val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset])
extends scala.collection.immutable.Map[Source, Offset] {
- def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
- CompositeOffset(source.map(get))
+ def toOffsetSeq(source: Seq[Source]): OffsetSeq = {
+ OffsetSeq(source.map(get))
}
override def toString: String =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 613c7ccdd2..582b548122 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -106,8 +106,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
val startOrdinal =
- start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
- val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+ start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
+ val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
// Internal buffer only holds the batches after lastCommittedOffset.
val newBlocks = synchronized {
@@ -127,19 +127,21 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
override def commit(end: Offset): Unit = synchronized {
- end match {
- case newOffset: LongOffset =>
- val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
-
- if (offsetDiff < 0) {
- sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
- }
-
- batches.trimStart(offsetDiff)
- lastOffsetCommitted = newOffset
- case _ =>
- sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
- "an instance of this class")
+ def check(newOffset: LongOffset): Unit = {
+ val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+ if (offsetDiff < 0) {
+ sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+ }
+
+ batches.trimStart(offsetDiff)
+ lastOffsetCommitted = newOffset
+ }
+
+ LongOffset.convert(end) match {
+ case Some(lo) => check(lo)
+ case None => sys.error(s"MemoryStream.commit() received an offset ($end) " +
+ "that did not originate with an instance of this class")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index 042977f870..900d92bc0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -116,8 +116,8 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
/** Returns the data that is between the offsets (`start`, `end`]. */
override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
val startOrdinal =
- start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
- val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+ start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
+ val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
// Internal buffer only holds the batches after lastOffsetCommitted
val rawList = synchronized {
@@ -140,20 +140,19 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
}
override def commit(end: Offset): Unit = synchronized {
- if (end.isInstanceOf[LongOffset]) {
- val newOffset = end.asInstanceOf[LongOffset]
- val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
-
- if (offsetDiff < 0) {
- sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
- }
-
- batches.trimStart(offsetDiff)
- lastOffsetCommitted = newOffset
- } else {
+ val newOffset = LongOffset.convert(end).getOrElse(
sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
+ )
+
+ val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+ if (offsetDiff < 0) {
+ sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}
+
+ batches.trimStart(offsetDiff)
+ lastOffsetCommitted = newOffset
}
/** Stop this source. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index bd3e5a5618..0a58142e06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
+import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecution}
/**
* :: Experimental ::
@@ -36,8 +36,8 @@ class StreamingQueryException private[sql](
@transient val query: StreamingQuery,
val message: String,
val cause: Throwable,
- val startOffset: Option[Offset] = None,
- val endOffset: Option[Offset] = None)
+ val startOffset: Option[OffsetSeq] = None,
+ val endOffset: Option[OffsetSeq] = None)
extends Exception(message, cause) {
/** Time when the exception occurred */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index a50b0d96c1..99c7729d02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -27,7 +27,7 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset}
+import org.apache.spark.sql.execution.streaming.{LongOffset, OffsetSeq}
import org.apache.spark.util.JsonProtocol
/**
@@ -140,7 +140,7 @@ private[sql] object StreamingQueryStatus {
sourceStatuses = Array(
SourceStatus(
desc = "MySource1",
- offsetDesc = LongOffset(0).toString,
+ offsetDesc = LongOffset(0).json,
inputRate = 15.5,
processingRate = 23.5,
triggerDetails = Map(
@@ -149,7 +149,7 @@ private[sql] object StreamingQueryStatus {
SOURCE_GET_BATCH_LATENCY -> "20"))),
sinkStatus = SinkStatus(
desc = "MySink",
- offsetDesc = CompositeOffset(Some(LongOffset(1)) :: None :: Nil).toString),
+ offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
triggerDetails = Map(
TRIGGER_ID -> "5",
IS_TRIGGER_ACTIVE -> "true",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
new file mode 100644
index 0000000000..3afd11fa46
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
+
+ /** test string offset type */
+ case class StringOffset(override val json: String) extends Offset
+
+ testWithUninterruptibleThread("serialization - deserialization") {
+ withTempDir { temp =>
+ val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+ val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
+ val batch0 = OffsetSeq.fill(LongOffset(0), LongOffset(1), LongOffset(2))
+ val batch1 = OffsetSeq.fill(StringOffset("one"), StringOffset("two"), StringOffset("three"))
+
+ val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o =>
+ SerializedOffset(o.json))): _*)
+
+ val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o =>
+ SerializedOffset(o.json))): _*)
+
+ assert(metadataLog.add(0, batch0))
+ assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+
+ assert(metadataLog.add(1, batch1))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+ assert(metadataLog.get(1) === Some(batch1Serialized))
+ assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+ assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+
+ // Adding the same batch does nothing
+ metadataLog.add(1, OffsetSeq.fill(LongOffset(3)))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+ assert(metadataLog.get(1) === Some(batch1Serialized))
+ assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+ assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
index b65a987770..f208f9bd9b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, Offset}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset}
trait OffsetSuite extends SparkFunSuite {
/** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */
@@ -35,25 +35,11 @@ trait OffsetSuite extends SparkFunSuite {
class LongOffsetSuite extends OffsetSuite {
val one = LongOffset(1)
val two = LongOffset(2)
+ val three = LongOffset(3)
compare(one, two)
-}
-
-class CompositeOffsetSuite extends OffsetSuite {
- compare(
- one = CompositeOffset(Some(LongOffset(1)) :: Nil),
- two = CompositeOffset(Some(LongOffset(2)) :: Nil))
-
- compare(
- one = CompositeOffset(None :: Nil),
- two = CompositeOffset(Some(LongOffset(2)) :: Nil))
-
- compare(
- one = CompositeOffset.fill(LongOffset(0), LongOffset(1)),
- two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
-
- compare(
- one = CompositeOffset.fill(LongOffset(1), LongOffset(1)),
- two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
+ compare(LongOffset(SerializedOffset(one.json)),
+ LongOffset(SerializedOffset(three.json)))
}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
index 1a98cf2ba7..6af19fb0c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
@@ -24,7 +24,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
"""
|Status of source MySource1
- | Available offset: #0
+ | Available offset: 0
| Input rate: 15.5 rows/sec
| Processing rate: 23.5 rows/sec
| Trigger details:
@@ -36,7 +36,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
"""
|Status of sink MySink
- | Committed offsets: [#1, -]
+ | Committed offsets: [1, -]
""".stripMargin.trim, "SinkStatus.toString does not match")
assert(StreamingQueryStatus.testStatus.toString ===
@@ -56,7 +56,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
| triggerId: 5
| Source statuses [1 source]:
| Source 1 - MySource1
- | Available offset: #0
+ | Available offset: 0
| Input rate: 15.5 rows/sec
| Processing rate: 23.5 rows/sec
| Trigger details:
@@ -64,7 +64,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
| latency.getOffset.source: 10
| latency.getBatch.source: 20
| Sink status - MySink
- | Committed offsets: [#1, -]
+ | Committed offsets: [1, -]
""".stripMargin.trim, "StreamingQueryStatus.toString does not match")
}
@@ -72,10 +72,10 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
test("json") {
assert(StreamingQueryStatus.testStatus.json ===
"""
- |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"#0","inputRate":15.5,
+ |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
|"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
|"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
- |"sinkStatus":{"description":"MySink","offsetDesc":"[#1, -]"}}
+ |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
""".stripMargin.replace("\n", "").trim)
}
@@ -86,7 +86,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
|{
| "sourceStatuses" : [ {
| "description" : "MySource1",
- | "offsetDesc" : "#0",
+ | "offsetDesc" : "0",
| "inputRate" : 15.5,
| "processingRate" : 23.5,
| "triggerDetails" : {
@@ -97,7 +97,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
| } ],
| "sinkStatus" : {
| "description" : "MySink",
- | "offsetDesc" : "[#1, -]"
+ | "offsetDesc" : "[1, -]"
| }
|}
""".stripMargin.trim)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 31b7fe0b04..e2e66d6663 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -104,7 +104,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
AssertOnQuery(
q =>
- q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)),
+ q.exception.get.startOffset.get === q.committedOffsets.toOffsetSeq(Seq(inputData)),
"incorrect start offset on exception")
)
}
@@ -124,13 +124,13 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString),
+ AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: Nil).toString),
AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"),
AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
AssertOnQuery(_.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString),
+ AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: Nil).toString),
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
@@ -139,38 +139,38 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.processingRate >= 0.0),
AssertOnQuery(_.status.sourceStatuses.length === 1),
AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).toString),
+ AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).json),
AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0),
AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0),
AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- CompositeOffset.fill(LongOffset(0)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).toString),
+ OffsetSeq.fill(LongOffset(0)).toString),
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json),
AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0),
AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString),
+ AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(0)).toString),
AddData(inputData, 1, 2),
CheckAnswer(6, 3, 6, 3),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
+ AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- CompositeOffset.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
- AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString),
+ OffsetSeq.fill(LongOffset(1)).toString),
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
+ AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
StopStream,
AssertOnQuery(_.status.inputRate === 0.0),
AssertOnQuery(_.status.processingRate === 0.0),
AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
+ AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- CompositeOffset.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
+ OffsetSeq.fill(LongOffset(1)).toString),
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString),
+ AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
AssertOnQuery(_.status.triggerDetails.isEmpty),
StartStream(),
@@ -179,15 +179,15 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.inputRate === 0.0),
AssertOnQuery(_.status.processingRate === 0.0),
AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).toString),
+ AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).json),
AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- CompositeOffset.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).toString),
+ OffsetSeq.fill(LongOffset(1)).toString),
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json),
AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString)
+ AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString)
)
}