aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-06-07 16:40:03 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-07 16:40:03 -0700
commit0cfd6192f38932a26195a6a8dbbc637d67f5ec55 (patch)
treefc59c52bc36d68ea49aea8faea499f51baf2f834
parent695dbc816a6d70289abeb145cb62ff4e62b3f49b (diff)
downloadspark-0cfd6192f38932a26195a6a8dbbc637d67f5ec55.tar.gz
spark-0cfd6192f38932a26195a6a8dbbc637d67f5ec55.tar.bz2
spark-0cfd6192f38932a26195a6a8dbbc637d67f5ec55.zip
[SPARK-15580][SQL] Add ContinuousQueryInfo to make ContinuousQueryListener events serializable
## What changes were proposed in this pull request? This PR adds ContinuousQueryInfo to make ContinuousQueryListener events serializable in order to support writing events into the event log. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13335 from zsxwing/query-info.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala133
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala16
8 files changed, 203 insertions, 76 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
index 2a1be09693..f50951f9bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
@@ -22,15 +22,13 @@ import org.apache.spark.sql.streaming.ContinuousQueryListener
import org.apache.spark.util.ListenerBus
/**
- * A bus to forward events to [[ContinuousQueryListener]]s. This one will wrap received
- * [[ContinuousQueryListener.Event]]s as WrappedContinuousQueryListenerEvents and send them to the
- * Spark listener bus. It also registers itself with Spark listener bus, so that it can receive
- * WrappedContinuousQueryListenerEvents, unwrap them as ContinuousQueryListener.Events and
- * dispatch them to ContinuousQueryListener.
+ * A bus to forward events to [[ContinuousQueryListener]]s. This one will send received
+ * [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also registers itself with
+ * Spark listener bus, so that it can receive [[ContinuousQueryListener.Event]]s and dispatch them
+ * to ContinuousQueryListener.
*/
class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
- extends SparkListener
- with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
+ extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
import ContinuousQueryListener._
@@ -45,13 +43,13 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
case s: QueryStarted =>
postToAll(s)
case _ =>
- sparkListenerBus.post(new WrappedContinuousQueryListenerEvent(event))
+ sparkListenerBus.post(event)
}
}
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
- case WrappedContinuousQueryListenerEvent(e) =>
+ case e: ContinuousQueryListener.Event =>
postToAll(e)
case _ =>
}
@@ -71,15 +69,4 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
}
}
- /**
- * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
- * listener bus.
- */
- private case class WrappedContinuousQueryListenerEvent(
- streamingListenerEvent: ContinuousQueryListener.Event)
- extends SparkListenerEvent {
-
- // Do not log streaming events in event log as history server does not support these events.
- protected[spark] override def logEvent: Boolean = false
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 16d38a2f7d..d9800e4afd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -131,12 +131,13 @@ class StreamExecution(
/** Returns current status of all the sources. */
override def sourceStatuses: Array[SourceStatus] = {
val localAvailableOffsets = availableOffsets
- sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray
+ sources.map(s =>
+ new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray
}
/** Returns current status of the sink. */
override def sinkStatus: SinkStatus =
- new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources))
+ new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString)
/** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */
override def exception: Option[ContinuousQueryException] = Option(streamDeathCause)
@@ -167,7 +168,7 @@ class StreamExecution(
// Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
// so must mark this as ACTIVE first.
state = ACTIVE
- postEvent(new QueryStarted(this)) // Assumption: Does not throw exception.
+ postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception.
// Unblock starting thread
startLatch.countDown()
@@ -206,7 +207,10 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
- postEvent(new QueryTerminated(this))
+ postEvent(new QueryTerminated(
+ this.toInfo,
+ exception.map(_.getMessage),
+ exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
terminationLatch.countDown()
}
}
@@ -374,7 +378,7 @@ class StreamExecution(
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets
- postEvent(new QueryProgress(this))
+ postEvent(new QueryProgress(this.toInfo))
}
private def postEvent(event: ContinuousQueryListener.Event) {
@@ -484,6 +488,13 @@ class StreamExecution(
""".stripMargin
}
+ private def toInfo: ContinuousQueryInfo = {
+ new ContinuousQueryInfo(
+ this.name,
+ this.sourceStatuses,
+ this.sinkStatus)
+ }
+
trait State
case object INITIALIZED extends State
case object ACTIVE extends State
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
new file mode 100644
index 0000000000..57b718b08f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * A class used to report information about the progress of a [[ContinuousQuery]].
+ *
+ * @param name The [[ContinuousQuery]] name.
+ * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
+ * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
+ */
+@Experimental
+class ContinuousQueryInfo private[sql](
+ val name: String,
+ val sourceStatuses: Seq[SourceStatus],
+ val sinkStatus: SinkStatus)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
index 6bdd513288..dd311148e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
+import org.apache.spark.scheduler.SparkListenerEvent
/**
* :: Experimental ::
@@ -70,26 +71,43 @@ abstract class ContinuousQueryListener {
object ContinuousQueryListener {
/**
- * Base type of [[ContinuousQueryListener]] events.
+ * :: Experimental ::
+ * Base type of [[ContinuousQueryListener]] events
* @since 2.0.0
*/
- trait Event
+ @Experimental
+ trait Event extends SparkListenerEvent
/**
- * Event representing the start of a query.
+ * :: Experimental ::
+ * Event representing the start of a query
* @since 2.0.0
*/
- class QueryStarted private[sql](val query: ContinuousQuery) extends Event
+ @Experimental
+ class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event
/**
- * Event representing any progress updates in a query.
+ * :: Experimental ::
+ * Event representing any progress updates in a query
* @since 2.0.0
*/
- class QueryProgress private[sql](val query: ContinuousQuery) extends Event
+ @Experimental
+ class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event
/**
- * Event representing that termination of a query.
+ * :: Experimental ::
+ * Event representing that termination of a query
+ *
+ * @param queryInfo Information about the status of the query.
+ * @param exception The exception message of the [[ContinuousQuery]] if the query was terminated
+ * with an exception. Otherwise, it will be `None`.
+ * @param stackTrace The stack trace of the exception if the query was terminated with an
+ * exception. It will be empty if there was no error.
* @since 2.0.0
*/
- class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
+ @Experimental
+ class QueryTerminated private[sql](
+ val queryInfo: ContinuousQueryInfo,
+ val exception: Option[String],
+ val stackTrace: Seq[StackTraceElement]) extends Event
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index 79ddf01042..de1efe961f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -18,17 +18,17 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, Sink}
+import org.apache.spark.sql.execution.streaming.Sink
/**
* :: Experimental ::
* Status and metrics of a streaming [[Sink]].
*
* @param description Description of the source corresponding to this status
- * @param offset Current offset up to which data has been written by the sink
+ * @param offsetDesc Description of the current offset up to which data has been written by the sink
* @since 2.0.0
*/
@Experimental
class SinkStatus private[sql](
val description: String,
- val offset: Offset)
+ val offsetDesc: String)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
index 8fccd5b7a3..bd0c8485e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -18,17 +18,17 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, Source}
+import org.apache.spark.sql.execution.streaming.Source
/**
* :: Experimental ::
* Status and metrics of a streaming [[Source]].
*
- * @param description Description of the source corresponding to this status
- * @param offset Current offset of the source, if known
+ * @param description Description of the source corresponding to this status
+ * @param offsetDesc Description of the current [[Source]] offset if known
* @since 2.0.0
*/
@Experimental
class SourceStatus private[sql] (
val description: String,
- val offset: Option[Offset])
+ val offsetDesc: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
index cdd97da8ae..9b59ab60a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
@@ -26,7 +26,9 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
+import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.util.JsonProtocol
class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
@@ -51,14 +53,13 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
- assert(status.active == true)
assert(status.sourceStatuses.size === 1)
assert(status.sourceStatuses(0).description.contains("Memory"))
// The source and sink offsets must be None as this must be called before the
// batches have started
- assert(status.sourceStatuses(0).offset === None)
- assert(status.sinkStatus.offset === CompositeOffset(None :: Nil))
+ assert(status.sourceStatuses(0).offsetDesc === None)
+ assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString)
// No progress events or termination events
assert(listener.progressStatuses.isEmpty)
@@ -73,9 +74,8 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.peek()
assert(status != null)
- assert(status.active == true)
- assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
- assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
+ assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
+ assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
// No termination events
assert(listener.terminationStatus === null)
@@ -86,10 +86,8 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
eventually(Timeout(streamingTimeout)) {
val status = listener.terminationStatus
assert(status != null)
-
- assert(status.active === false) // must be inactive by the time onQueryTerm is called
- assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
- assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
+ assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
+ assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
}
listener.checkAsyncErrors()
}
@@ -141,6 +139,92 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("exception should be reported in QueryTerminated") {
+ val listener = new QueryStatusCollector
+ withListenerAdded(listener) {
+ val input = MemoryStream[Int]
+ testStream(input.toDS.map(_ / 0))(
+ StartStream(),
+ AddData(input, 1),
+ ExpectFailure[SparkException](),
+ Assert {
+ spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+ assert(listener.terminationStatus !== null)
+ assert(listener.terminationException.isDefined)
+ assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
+ assert(listener.terminationStackTrace.nonEmpty)
+ }
+ )
+ }
+ }
+
+ test("QueryStarted serialization") {
+ val queryStartedInfo = new ContinuousQueryInfo(
+ "name",
+ Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)),
+ new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
+ val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo)
+ val json = JsonProtocol.sparkEventToJson(queryStarted)
+ val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
+ .asInstanceOf[ContinuousQueryListener.QueryStarted]
+ assertContinuousQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo)
+ }
+
+ test("QueryProgress serialization") {
+ val queryProcessInfo = new ContinuousQueryInfo(
+ "name",
+ Seq(
+ new SourceStatus("source1", Some(LongOffset(0).toString)),
+ new SourceStatus("source2", Some(LongOffset(1).toString))),
+ new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString))
+ val queryProcess = new ContinuousQueryListener.QueryProgress(queryProcessInfo)
+ val json = JsonProtocol.sparkEventToJson(queryProcess)
+ val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
+ .asInstanceOf[ContinuousQueryListener.QueryProgress]
+ assertContinuousQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo)
+ }
+
+ test("QueryTerminated serialization") {
+ val queryTerminatedInfo = new ContinuousQueryInfo(
+ "name",
+ Seq(
+ new SourceStatus("source1", Some(LongOffset(0).toString)),
+ new SourceStatus("source2", Some(LongOffset(1).toString))),
+ new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString))
+ val exception = new RuntimeException("exception")
+ val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated(
+ queryTerminatedInfo,
+ Some(exception.getMessage),
+ exception.getStackTrace)
+ val json =
+ JsonProtocol.sparkEventToJson(queryQueryTerminated)
+ val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
+ .asInstanceOf[ContinuousQueryListener.QueryTerminated]
+ assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo)
+ assert(queryQueryTerminated.exception === newQueryTerminated.exception)
+ }
+
+ private def assertContinuousQueryInfoEquals(
+ expected: ContinuousQueryInfo,
+ actual: ContinuousQueryInfo): Unit = {
+ assert(expected.name === actual.name)
+ assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
+ expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
+ case (expectedSource, actualSource) =>
+ assertSourceStatus(expectedSource, actualSource)
+ }
+ assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
+ }
+
+ private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = {
+ assert(expected.description === actual.description)
+ assert(expected.offsetDesc === actual.offsetDesc)
+ }
+
+ private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = {
+ assert(expected.description === actual.description)
+ assert(expected.offsetDesc === actual.offsetDesc)
+ }
private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = {
try {
@@ -164,9 +248,12 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
// to catch errors in the async listener events
@volatile private var asyncTestWaiter = new Waiter
- @volatile var startStatus: QueryStatus = null
- @volatile var terminationStatus: QueryStatus = null
- val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
+ @volatile var startStatus: ContinuousQueryInfo = null
+ @volatile var terminationStatus: ContinuousQueryInfo = null
+ @volatile var terminationException: Option[String] = null
+ @volatile var terminationStackTrace: Seq[StackTraceElement] = null
+
+ val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]
def reset(): Unit = {
startStatus = null
@@ -182,35 +269,25 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
override def onQueryStarted(queryStarted: QueryStarted): Unit = {
asyncTestWaiter {
- startStatus = QueryStatus(queryStarted.query)
+ startStatus = queryStarted.queryInfo
}
}
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
- progressStatuses.add(QueryStatus(queryProgress.query))
+ progressStatuses.add(queryProgress.queryInfo)
}
}
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
- terminationStatus = QueryStatus(queryTerminated.query)
+ terminationStatus = queryTerminated.queryInfo
+ terminationException = queryTerminated.exception
+ terminationStackTrace = queryTerminated.stackTrace
}
asyncTestWaiter.dismiss()
}
}
-
- case class QueryStatus(
- active: Boolean,
- exception: Option[Exception],
- sourceStatuses: Array[SourceStatus],
- sinkStatus: SinkStatus)
-
- object QueryStatus {
- def apply(query: ContinuousQuery): QueryStatus = {
- QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus)
- }
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index e4ca86d9d5..55424058f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -66,21 +66,21 @@ class ContinuousQuerySuite extends StreamTest {
testStream(mapped)(
AssertOnQuery(_.sourceStatuses.length === 1),
AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.sourceStatuses(0).offset === None),
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === None),
AssertOnQuery(_.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)),
+ AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString),
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
- AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))),
- AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))),
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)),
+ AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString),
AddData(inputData, 1, 2),
CheckAnswer(6, 3, 6, 3),
- AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))),
- AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))),
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(1).toString)),
+ AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString),
AddData(inputData, 0),
ExpectFailure[SparkException],
- AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))),
- AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1)))
+ AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(2).toString)),
+ AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString)
)
}