From 3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 9 Nov 2016 15:03:22 -0800 Subject: [SPARK-17829][SQL] Stable format for offset log ## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie Author: Tyson Condie Closes #15626 from tcondie/spark-8360. --- .../org/apache/spark/sql/kafka010/JsonUtils.scala | 2 - .../apache/spark/sql/kafka010/KafkaSource.scala | 19 ++++- .../spark/sql/kafka010/KafkaSourceOffset.scala | 14 ++-- .../sql/kafka010/KafkaSourceOffsetSuite.scala | 55 ++++++++++++++- python/pyspark/sql/streaming.py | 12 ++-- .../streaming/CompactibleFileStreamLog.scala | 23 +++---- .../sql/execution/streaming/CompositeOffset.scala | 50 -------------- .../execution/streaming/FileStreamSinkLog.scala | 8 --- .../sql/execution/streaming/FileStreamSource.scala | 4 +- .../execution/streaming/FileStreamSourceLog.scala | 8 --- .../sql/execution/streaming/HDFSMetadataLog.scala | 22 +++--- .../spark/sql/execution/streaming/LongOffset.scala | 21 +++++- .../spark/sql/execution/streaming/Offset.scala | 36 +++++++++- .../spark/sql/execution/streaming/OffsetSeq.scala | 53 ++++++++++++++ .../sql/execution/streaming/OffsetSeqLog.scala | 80 ++++++++++++++++++++++ .../spark/sql/execution/streaming/Source.scala | 8 +++ .../sql/execution/streaming/StreamExecution.scala | 11 ++- .../sql/execution/streaming/StreamProgress.scala | 4 +- .../spark/sql/execution/streaming/memory.scala | 32 +++++---- .../spark/sql/execution/streaming/socket.scala | 25 ++++--- .../sql/streaming/StreamingQueryException.scala | 6 +- .../spark/sql/streaming/StreamingQueryStatus.scala | 6 +- .../execution/streaming/OffsetSeqLogSuite.scala | 63 +++++++++++++++++ .../apache/spark/sql/streaming/OffsetSuite.scala | 24 ++----- .../sql/streaming/StreamingQueryStatusSuite.scala | 16 ++--- .../spark/sql/streaming/StreamingQuerySuite.scala | 38 +++++----- 26 files changed, 446 insertions(+), 194 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 40d568a12c..13d717092a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.kafka010 -import java.io.Writer - import scala.collection.mutable.HashMap import scala.util.control.NonFatal diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index b21508cd7e..5bcc5124b0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -114,7 +116,22 @@ private[kafka010] case class KafkaSource( * `KafkaConsumer.poll` may hang forever (KAFKA-1894). */ private lazy val initialPartitionOffsets = { - val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) + val metadataLog = + new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { + override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { + val bytes = metadata.json.getBytes(StandardCharsets.UTF_8) + out.write(bytes.length) + out.write(bytes) + } + + override def deserialize(in: InputStream): KafkaSourceOffset = { + val length = in.read() + val bytes = new Array[Byte](length) + in.read(bytes) + KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8))) + } + } + metadataLog.get(0).getOrElse { val offsets = startingOffsets match { case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets()) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index b5ade98251..b5da415b30 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.Offset +import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and @@ -27,9 +27,8 @@ import org.apache.spark.sql.execution.streaming.Offset */ private[kafka010] case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset { - override def toString(): String = { - partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]") - } + + override val json = JsonUtils.partitionOffsets(partitionToOffsets) } /** Companion object of the [[KafkaSourceOffset]] */ @@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset { def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = { offset match { case o: KafkaSourceOffset => o.partitionToOffsets + case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets case _ => throw new IllegalArgumentException( s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset") @@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset { def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = { KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap) } + + /** + * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]] + */ + def apply(offset: SerializedOffset): KafkaSourceOffset = + KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json)) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 7056a41b17..881018fd95 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.kafka010 +import java.io.File + +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.OffsetSuite +import org.apache.spark.sql.test.SharedSQLContext -class KafkaSourceOffsetSuite extends OffsetSuite { +class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { compare( one = KafkaSourceOffset(("t", 0, 1L)), @@ -36,4 +40,53 @@ class KafkaSourceOffsetSuite extends OffsetSuite { compare( one = KafkaSourceOffset(("t", 0, 1L)), two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) + + + val kso1 = KafkaSourceOffset(("t", 0, 1L)) + val kso2 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L)) + val kso3 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L), ("t", 1, 4L)) + + compare(KafkaSourceOffset(SerializedOffset(kso1.json)), + KafkaSourceOffset(SerializedOffset(kso2.json))) + + test("basic serialization - deserialization") { + assert(KafkaSourceOffset.getPartitionOffsets(kso1) == + KafkaSourceOffset.getPartitionOffsets(SerializedOffset(kso1.json))) + } + + + testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") { + withTempDir { temp => + // use non-existent directory to test whether log make the dir + val dir = new File(temp, "dir") + val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) + val batch0 = OffsetSeq.fill(kso1) + val batch1 = OffsetSeq.fill(kso2, kso3) + + val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o => + SerializedOffset(o.json))): _*) + + val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o => + SerializedOffset(o.json))): _*) + + assert(metadataLog.add(0, batch0)) + assert(metadataLog.getLatest() === Some(0 -> batch0Serialized)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + + assert(metadataLog.add(1, batch1)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) + + // Adding the same batch does nothing + metadataLog.add(1, OffsetSeq.fill(LongOffset(3))) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) + } + } } diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1c94413e3c..f326f16232 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -220,7 +220,7 @@ class StreamingQueryStatus(object): triggerId: 5 Source statuses [1 source]: Source 1 - MySource1 - Available offset: #0 + Available offset: 0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec Trigger details: @@ -228,7 +228,7 @@ class StreamingQueryStatus(object): latency.getOffset.source: 10 latency.getBatch.source: 20 Sink status - MySink - Committed offsets: [#1, -] + Committed offsets: [1, -] """ return self._jsqs.toString() @@ -366,7 +366,7 @@ class SourceStatus(object): >>> print(sqs.sourceStatuses[0]) Status of source MySource1 - Available offset: #0 + Available offset: 0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec Trigger details: @@ -396,7 +396,7 @@ class SourceStatus(object): Description of the current offset if known. >>> sqs.sourceStatuses[0].offsetDesc - u'#0' + u'0' """ return self._jss.offsetDesc() @@ -457,7 +457,7 @@ class SinkStatus(object): >>> print(sqs.sinkStatus) Status of sink MySink - Committed offsets: [#1, -] + Committed offsets: [1, -] """ return self._jss.toString() @@ -481,7 +481,7 @@ class SinkStatus(object): Description of the current offsets up to which data has been written by the sink. >>> sqs.sinkStatus.offsetDesc - u'[#1, -]' + u'[1, -]' """ return self._jss.offsetDesc() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index b26edeeb04..8af3db1968 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -24,6 +24,8 @@ import scala.io.{Source => IOSource} import scala.reflect.ClassTag import org.apache.hadoop.fs.{Path, PathFilter} +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession @@ -37,7 +39,7 @@ import org.apache.spark.sql.SparkSession * compact log files every 10 batches by default into a big file. When * doing a compaction, it will read all old log files and merge them with the new batch. */ -abstract class CompactibleFileStreamLog[T: ClassTag]( +abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( metadataLogVersion: String, sparkSession: SparkSession, path: String) @@ -45,6 +47,11 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( import CompactibleFileStreamLog._ + private implicit val formats = Serialization.formats(NoTypeHints) + + /** Needed to serialize type T into JSON when using Jackson */ + private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) + /** * If we delete the old files after compaction at once, there is a race condition in S3: other * processes may see the old files are deleted but still cannot see the compaction file using @@ -58,16 +65,6 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( protected def compactInterval: Int - /** - * Serialize the data into encoded string. - */ - protected def serializeData(t: T): String - - /** - * Deserialize the string into data object. - */ - protected def deserializeData(encodedString: String): T - /** * Filter out the obsolete logs. */ @@ -99,7 +96,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( out.write(metadataLogVersion.getBytes(UTF_8)) logData.foreach { data => out.write('\n') - out.write(serializeData(data).getBytes(UTF_8)) + out.write(Serialization.write(data).getBytes(UTF_8)) } } @@ -112,7 +109,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( if (version != metadataLogVersion) { throw new IllegalStateException(s"Unknown log version: ${version}") } - lines.map(deserializeData).toArray + lines.map(Serialization.read[T]).toArray } override def add(batchId: Long, logs: Array[T]): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala deleted file mode 100644 index ebc6ee8184..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -/** - * An ordered collection of offsets, used to track the progress of processing data from one or more - * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance - * vector clock that must progress linearly forward. - */ -case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { - /** - * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of - * sources. - * - * This method is typically used to associate a serialized offset with actual sources (which - * cannot be serialized). - */ - def toStreamProgress(sources: Seq[Source]): StreamProgress = { - assert(sources.size == offsets.size) - new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) } - } - - override def toString: String = - offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]") -} - -object CompositeOffset { - /** - * Returns a [[CompositeOffset]] with a variable sequence of offsets. - * `nulls` in the sequence are converted to `None`s. - */ - def fill(offsets: Offset*): CompositeOffset = { - CompositeOffset(offsets.map(Option(_))) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index f9e24167a1..b4f14151f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -93,14 +93,6 @@ class FileStreamSinkLog( s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") - protected override def serializeData(data: SinkFileStatus): String = { - write(data) - } - - protected override def deserializeData(encodedString: String): SinkFileStatus = { - read[SinkFileStatus](encodedString) - } - override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet if (deletedFiles.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 680df01acc..8494aef004 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -131,8 +131,8 @@ class FileStreamSource( * Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) - val endId = end.asInstanceOf[LongOffset].offset + val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset + val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset assert(startId <= endId) val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 4681f2ba08..fe81b15607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -60,14 +60,6 @@ class FileStreamSourceLog( } } - protected override def serializeData(data: FileEntry): String = { - Serialization.write(data) - } - - protected override def deserializeData(encodedString: String): FileEntry = { - Serialization.read[FileEntry](encodedString) - } - def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { logs } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 9a0f87cf04..db7057d7da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming -import java.io.{FileNotFoundException, InputStream, IOException, OutputStream} +import java.io._ +import java.nio.charset.StandardCharsets import java.util.{ConcurrentModificationException, EnumSet, UUID} import scala.reflect.ClassTag @@ -26,9 +27,10 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging -import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.SparkSession import org.apache.spark.util.UninterruptibleThread @@ -44,9 +46,14 @@ import org.apache.spark.util.UninterruptibleThread * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing * files in a directory always shows the latest files. */ -class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) +class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String) extends MetadataLog[T] with Logging { + private implicit val formats = Serialization.formats(NoTypeHints) + + /** Needed to serialize type T into JSON when using Jackson */ + private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) + // Avoid serializing generic sequences, see SPARK-17372 require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]], "Should not create a log with type Seq, use Arrays instead - see SPARK-17372") @@ -67,8 +74,6 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) override def accept(path: Path): Boolean = isBatchFile(path) } - private val serializer = new JavaSerializer(sparkSession.sparkContext.conf).newInstance() - protected def batchIdToPath(batchId: Long): Path = { new Path(metadataPath, batchId.toString) } @@ -88,14 +93,13 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) protected def serialize(metadata: T, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - val outStream = serializer.serializeStream(out) - outStream.writeObject(metadata) + Serialization.write(metadata, out) } protected def deserialize(in: InputStream): T = { // called inside a try-finally where the underlying stream is closed in the caller - val inStream = serializer.deserializeStream(in) - inStream.readObject[T]() + val reader = new InputStreamReader(in, StandardCharsets.UTF_8) + Serialization.read[T](reader) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index c5e8827777..5f0b195fcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -22,8 +22,27 @@ package org.apache.spark.sql.execution.streaming */ case class LongOffset(offset: Long) extends Offset { + override val json = offset.toString + def +(increment: Long): LongOffset = new LongOffset(offset + increment) def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) +} + +object LongOffset { + + /** + * LongOffset factory from serialized offset. + * @return new LongOffset + */ + def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong) - override def toString: String = s"#$offset" + /** + * Convert generic Offset to LongOffset if possible. + * @return converted LongOffset + */ + def convert(offset: Offset): Option[LongOffset] = offset match { + case lo: LongOffset => Some(lo) + case so: SerializedOffset => Some(LongOffset(so)) + case _ => None + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index 1f52abf277..4efcee0f8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -23,4 +23,38 @@ package org.apache.spark.sql.execution.streaming * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no * new data has arrived. */ -trait Offset extends Serializable {} +abstract class Offset { + + /** + * Equality based on JSON string representation. We leverage the + * JSON representation for normalization between the Offset's + * in memory and on disk representations. + */ + override def equals(obj: Any): Boolean = obj match { + case o: Offset => this.json == o.json + case _ => false + } + + override def hashCode(): Int = this.json.hashCode + + override def toString(): String = this.json.toString + + /** + * A JSON-serialized representation of an Offset that is + * used for saving offsets to the offset log. + * Note: We assume that equivalent/equal offsets serialize to + * identical JSON strings. + * + * @return JSON string encoding + */ + def json: String +} + +/** + * Used when loading a JSON serialized offset from external storage. + * We are currently not responsible for converting JSON serialized + * data into an internal (i.e., object) representation. Sources should + * define a factory method in their source Offset companion objects + * that accepts a [[SerializedOffset]] for doing the conversion. + */ +case class SerializedOffset(override val json: String) extends Offset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala new file mode 100644 index 0000000000..a4e1fe6797 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + + +/** + * An ordered collection of offsets, used to track the progress of processing data from one or more + * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance + * vector clock that must progress linearly forward. + */ +case class OffsetSeq(offsets: Seq[Option[Offset]]) { + + /** + * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of + * sources. + * + * This method is typically used to associate a serialized offset with actual sources (which + * cannot be serialized). + */ + def toStreamProgress(sources: Seq[Source]): StreamProgress = { + assert(sources.size == offsets.size) + new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) } + } + + override def toString: String = + offsets.map(_.map(_.json).getOrElse("-")).mkString("[", ", ", "]") +} + +object OffsetSeq { + + /** + * Returns a [[OffsetSeq]] with a variable sequence of offsets. + * `nulls` in the sequence are converted to `None`s. + */ + def fill(offsets: Offset*): OffsetSeq = { + OffsetSeq(offsets.map(Option(_))) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala new file mode 100644 index 0000000000..d1c9d95be9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -0,0 +1,80 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.streaming + + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets._ + +import scala.io.{Source => IOSource} + +import org.apache.spark.sql.SparkSession + +/** + * This class is used to log offsets to persistent files in HDFS. + * Each file corresponds to a specific batch of offsets. The file + * format contain a version string in the first line, followed + * by a the JSON string representation of the offsets separated + * by a newline character. If a source offset is missing, then + * that line will contain a string value defined in the + * SERIALIZED_VOID_OFFSET variable in [[OffsetSeqLog]] companion object. + * For instance, when dealine wiht [[LongOffset]] types: + * v1 // version 1 + * {0} // LongOffset 0 + * {3} // LongOffset 3 + * - // No offset for this source i.e., an invalid JSON string + * {2} // LongOffset 2 + * ... + */ +class OffsetSeqLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[OffsetSeq](sparkSession, path) { + + override protected def deserialize(in: InputStream): OffsetSeq = { + // called inside a try-finally where the underlying stream is closed in the caller + def parseOffset(value: String): Offset = value match { + case OffsetSeqLog.SERIALIZED_VOID_OFFSET => null + case json => SerializedOffset(json) + } + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file") + } + val version = lines.next() + if (version != OffsetSeqLog.VERSION) { + throw new IllegalStateException(s"Unknown log version: ${version}") + } + OffsetSeq.fill(lines.map(parseOffset).toArray: _*) + } + + override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = { + // called inside a try-finally where the underlying stream is closed in the caller + out.write(OffsetSeqLog.VERSION.getBytes(UTF_8)) + metadata.offsets.map(_.map(_.json)).foreach { offset => + out.write('\n') + offset match { + case Some(json: String) => out.write(json.getBytes(UTF_8)) + case None => out.write(OffsetSeqLog.SERIALIZED_VOID_OFFSET.getBytes(UTF_8)) + } + } + } +} + +object OffsetSeqLog { + private val VERSION = "v1" + private val SERIALIZED_VOID_OFFSET = "-" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index f3bd5bfe23..75ffe90f2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -45,6 +45,14 @@ trait Source { * Higher layers will always call this method with a value of `start` greater than or equal * to the last value passed to `commit` and a value of `end` less than or equal to the * last value returned by `getOffset` + * + * It is possible for the [[Offset]] type to be a [[SerializedOffset]] when it was + * obtained from the log. Moreover, [[StreamExecution]] only compares the [[Offset]] + * JSON representation to determine if the two objects are equal. This could have + * ramifications when upgrading [[Offset]] JSON formats i.e., two equivalent [[Offset]] + * objects could differ between version. Consequently, [[StreamExecution]] may call + * this method with two such equivalent [[Offset]] objects. In which case, the [[Source]] + * should return an empty [[DataFrame]] */ def getBatch(start: Option[Offset], end: Offset): DataFrame diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 37af1a550a..57e89f8536 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -149,7 +148,7 @@ class StreamExecution( * processing is done. Thus, the Nth record in this log indicated data that is currently being * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ - val offsetLog = new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets")) + val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE @@ -249,7 +248,7 @@ class StreamExecution( this, s"Query $name terminated with exception: ${e.getMessage}", e, - Some(committedOffsets.toCompositeOffset(sources))) + Some(committedOffsets.toOffsetSeq(sources))) logError(s"Query $name terminated with error", e) // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to // handle them @@ -343,7 +342,7 @@ class StreamExecution( } if (hasNewData) { reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { - assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), + assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") @@ -684,14 +683,14 @@ class StreamExecution( val sourceStatuses = sources.map { s => SourceStatus( s.toString, - localAvailableOffsets.get(s).map(_.toString).getOrElse("-"), // TODO: use json if available + localAvailableOffsets.get(s).map(_.json).getOrElse("-"), streamMetrics.currentSourceInputRate(s), streamMetrics.currentSourceProcessingRate(s), streamMetrics.currentSourceTriggerDetails(s)) }.toArray val sinkStatus = SinkStatus( sink.toString, - committedOffsets.toCompositeOffset(sources).toString) + committedOffsets.toOffsetSeq(sources).toString) currentStatus = StreamingQueryStatus( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index db0bd9e6bc..05a6547670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -26,8 +26,8 @@ class StreamProgress( val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) extends scala.collection.immutable.Map[Source, Offset] { - def toCompositeOffset(source: Seq[Source]): CompositeOffset = { - CompositeOffset(source.map(get)) + def toOffsetSeq(source: Seq[Source]): OffsetSeq = { + OffsetSeq(source.map(get)) } override def toString: String = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 613c7ccdd2..582b548122 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -106,8 +106,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def getBatch(start: Option[Offset], end: Offset): DataFrame = { // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) val startOrdinal = - start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 - val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 + start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 + val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 // Internal buffer only holds the batches after lastCommittedOffset. val newBlocks = synchronized { @@ -127,19 +127,21 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } override def commit(end: Offset): Unit = synchronized { - end match { - case newOffset: LongOffset => - val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt - - if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") - } - - batches.trimStart(offsetDiff) - lastOffsetCommitted = newOffset - case _ => - sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + - "an instance of this class") + def check(newOffset: LongOffset): Unit = { + val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + } + + batches.trimStart(offsetDiff) + lastOffsetCommitted = newOffset + } + + LongOffset.convert(end) match { + case Some(lo) => check(lo) + case None => sys.error(s"MemoryStream.commit() received an offset ($end) " + + "that did not originate with an instance of this class") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 042977f870..900d92bc0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -116,8 +116,8 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo /** Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { val startOrdinal = - start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 - val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 + start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 + val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 // Internal buffer only holds the batches after lastOffsetCommitted val rawList = synchronized { @@ -140,20 +140,19 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo } override def commit(end: Offset): Unit = synchronized { - if (end.isInstanceOf[LongOffset]) { - val newOffset = end.asInstanceOf[LongOffset] - val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt - - if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") - } - - batches.trimStart(offsetDiff) - lastOffsetCommitted = newOffset - } else { + val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") + ) + + val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") } + + batches.trimStart(offsetDiff) + lastOffsetCommitted = newOffset } /** Stop this source. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index bd3e5a5618..0a58142e06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} +import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecution} /** * :: Experimental :: @@ -36,8 +36,8 @@ class StreamingQueryException private[sql]( @transient val query: StreamingQuery, val message: String, val cause: Throwable, - val startOffset: Option[Offset] = None, - val endOffset: Option[Offset] = None) + val startOffset: Option[OffsetSeq] = None, + val endOffset: Option[OffsetSeq] = None) extends Exception(message, cause) { /** Time when the exception occurred */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index a50b0d96c1..99c7729d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -27,7 +27,7 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset} +import org.apache.spark.sql.execution.streaming.{LongOffset, OffsetSeq} import org.apache.spark.util.JsonProtocol /** @@ -140,7 +140,7 @@ private[sql] object StreamingQueryStatus { sourceStatuses = Array( SourceStatus( desc = "MySource1", - offsetDesc = LongOffset(0).toString, + offsetDesc = LongOffset(0).json, inputRate = 15.5, processingRate = 23.5, triggerDetails = Map( @@ -149,7 +149,7 @@ private[sql] object StreamingQueryStatus { SOURCE_GET_BATCH_LATENCY -> "20"))), sinkStatus = SinkStatus( desc = "MySink", - offsetDesc = CompositeOffset(Some(LongOffset(1)) :: None :: Nil).toString), + offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString), triggerDetails = Map( TRIGGER_ID -> "5", IS_TRIGGER_ACTIVE -> "true", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala new file mode 100644 index 0000000000..3afd11fa46 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSQLContext + +class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { + + /** test string offset type */ + case class StringOffset(override val json: String) extends Offset + + testWithUninterruptibleThread("serialization - deserialization") { + withTempDir { temp => + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir + val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) + val batch0 = OffsetSeq.fill(LongOffset(0), LongOffset(1), LongOffset(2)) + val batch1 = OffsetSeq.fill(StringOffset("one"), StringOffset("two"), StringOffset("three")) + + val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o => + SerializedOffset(o.json))): _*) + + val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o => + SerializedOffset(o.json))): _*) + + assert(metadataLog.add(0, batch0)) + assert(metadataLog.getLatest() === Some(0 -> batch0Serialized)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + + assert(metadataLog.add(1, batch1)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) + + // Adding the same batch does nothing + metadataLog.add(1, OffsetSeq.fill(LongOffset(3))) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala index b65a987770..f208f9bd9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, Offset} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset} trait OffsetSuite extends SparkFunSuite { /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */ @@ -35,25 +35,11 @@ trait OffsetSuite extends SparkFunSuite { class LongOffsetSuite extends OffsetSuite { val one = LongOffset(1) val two = LongOffset(2) + val three = LongOffset(3) compare(one, two) -} - -class CompositeOffsetSuite extends OffsetSuite { - compare( - one = CompositeOffset(Some(LongOffset(1)) :: Nil), - two = CompositeOffset(Some(LongOffset(2)) :: Nil)) - - compare( - one = CompositeOffset(None :: Nil), - two = CompositeOffset(Some(LongOffset(2)) :: Nil)) - - compare( - one = CompositeOffset.fill(LongOffset(0), LongOffset(1)), - two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) - - compare( - one = CompositeOffset.fill(LongOffset(1), LongOffset(1)), - two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) + compare(LongOffset(SerializedOffset(one.json)), + LongOffset(SerializedOffset(three.json))) } + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala index 1a98cf2ba7..6af19fb0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala @@ -24,7 +24,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite { assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString === """ |Status of source MySource1 - | Available offset: #0 + | Available offset: 0 | Input rate: 15.5 rows/sec | Processing rate: 23.5 rows/sec | Trigger details: @@ -36,7 +36,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite { assert(StreamingQueryStatus.testStatus.sinkStatus.toString === """ |Status of sink MySink - | Committed offsets: [#1, -] + | Committed offsets: [1, -] """.stripMargin.trim, "SinkStatus.toString does not match") assert(StreamingQueryStatus.testStatus.toString === @@ -56,7 +56,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite { | triggerId: 5 | Source statuses [1 source]: | Source 1 - MySource1 - | Available offset: #0 + | Available offset: 0 | Input rate: 15.5 rows/sec | Processing rate: 23.5 rows/sec | Trigger details: @@ -64,7 +64,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite { | latency.getOffset.source: 10 | latency.getBatch.source: 20 | Sink status - MySink - | Committed offsets: [#1, -] + | Committed offsets: [1, -] """.stripMargin.trim, "StreamingQueryStatus.toString does not match") } @@ -72,10 +72,10 @@ class StreamingQueryStatusSuite extends SparkFunSuite { test("json") { assert(StreamingQueryStatus.testStatus.json === """ - |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"#0","inputRate":15.5, + |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5, |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100", |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}], - |"sinkStatus":{"description":"MySink","offsetDesc":"[#1, -]"}} + |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}} """.stripMargin.replace("\n", "").trim) } @@ -86,7 +86,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite { |{ | "sourceStatuses" : [ { | "description" : "MySource1", - | "offsetDesc" : "#0", + | "offsetDesc" : "0", | "inputRate" : 15.5, | "processingRate" : 23.5, | "triggerDetails" : { @@ -97,7 +97,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite { | } ], | "sinkStatus" : { | "description" : "MySink", - | "offsetDesc" : "[#1, -]" + | "offsetDesc" : "[1, -]" | } |} """.stripMargin.trim) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 31b7fe0b04..e2e66d6663 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -104,7 +104,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery( q => - q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)), + q.exception.get.startOffset.get === q.committedOffsets.toOffsetSeq(Seq(inputData)), "incorrect start offset on exception") ) } @@ -124,13 +124,13 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), AssertOnQuery(_.status.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString), + AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: Nil).toString), AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"), AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), AssertOnQuery(_.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString), + AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: Nil).toString), AddData(inputData, 1, 2), CheckAnswer(6, 3), @@ -139,38 +139,38 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.processingRate >= 0.0), AssertOnQuery(_.status.sourceStatuses.length === 1), AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).toString), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).json), AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0), AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0), AssertOnQuery(_.status.sinkStatus.description.contains("Memory")), AssertOnQuery(_.status.sinkStatus.offsetDesc === - CompositeOffset.fill(LongOffset(0)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).toString), + OffsetSeq.fill(LongOffset(0)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json), AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0), AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0), - AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString), + AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(0)).toString), AddData(inputData, 1, 2), CheckAnswer(6, 3, 6, 3), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json), AssertOnQuery(_.status.sinkStatus.offsetDesc === - CompositeOffset.fill(LongOffset(1)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString), - AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString), + OffsetSeq.fill(LongOffset(1)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json), + AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString), StopStream, AssertOnQuery(_.status.inputRate === 0.0), AssertOnQuery(_.status.processingRate === 0.0), AssertOnQuery(_.status.sourceStatuses.length === 1), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json), AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), AssertOnQuery(_.status.sinkStatus.offsetDesc === - CompositeOffset.fill(LongOffset(1)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString), + OffsetSeq.fill(LongOffset(1)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json), AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString), + AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString), AssertOnQuery(_.status.triggerDetails.isEmpty), StartStream(), @@ -179,15 +179,15 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.inputRate === 0.0), AssertOnQuery(_.status.processingRate === 0.0), AssertOnQuery(_.status.sourceStatuses.length === 1), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).toString), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).json), AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), AssertOnQuery(_.status.sinkStatus.offsetDesc === - CompositeOffset.fill(LongOffset(1)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).toString), + OffsetSeq.fill(LongOffset(1)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json), AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString) + AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString) ) } -- cgit v1.2.3