aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-11-09 17:38:19 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-09 17:38:19 -0800
commit1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00 (patch)
tree24fe31cc7c492f110210f171b70c60df89f831bf /streaming
parent0ce6f9b2d203ce67aeb4d3aedf19bbd997fe01b9 (diff)
downloadspark-1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00.tar.gz
spark-1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00.tar.bz2
spark-1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00.zip
[SPARK-11462][STREAMING] Add JavaStreamingListener
Currently, StreamingListener is not Java friendly because it exposes some Scala collections to Java users directly, such as Option, Map. This PR added a Java version of StreamingListener and a bunch of Java friendly classes for Java users. Author: zsxwing <zsxwing@gmail.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #9420 from zsxwing/java-streaming-listener.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala168
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala122
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java85
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala290
4 files changed, 665 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
new file mode 100644
index 0000000000..c86c7101ff
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import org.apache.spark.streaming.Time
+
+/**
+ * A listener interface for receiving information about an ongoing streaming computation.
+ */
+private[streaming] class JavaStreamingListener {
+
+ /** Called when a receiver has been started */
+ def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { }
+
+ /** Called when a receiver has reported an error */
+ def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = { }
+
+ /** Called when a receiver has been stopped */
+ def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = { }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = { }
+
+ /** Called when processing of a batch of jobs has started. */
+ def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = { }
+
+ /** Called when processing of a batch of jobs has completed. */
+ def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = { }
+
+ /** Called when processing of a job of a batch has started. */
+ def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = { }
+
+ /** Called when processing of a job of a batch has completed. */
+ def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = { }
+}
+
+/**
+ * Base trait for events related to JavaStreamingListener
+ */
+private[streaming] sealed trait JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo)
+ extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerBatchCompleted(val batchInfo: JavaBatchInfo)
+ extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerBatchStarted(val batchInfo: JavaBatchInfo)
+ extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerOutputOperationStarted(
+ val outputOperationInfo: JavaOutputOperationInfo) extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerOutputOperationCompleted(
+ val outputOperationInfo: JavaOutputOperationInfo) extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerReceiverStarted(val receiverInfo: JavaReceiverInfo)
+ extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerReceiverError(val receiverInfo: JavaReceiverInfo)
+ extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerReceiverStopped(val receiverInfo: JavaReceiverInfo)
+ extends JavaStreamingListenerEvent
+
+/**
+ * Class having information on batches.
+ *
+ * @param batchTime Time of the batch
+ * @param streamIdToInputInfo A map of input stream id to its input info
+ * @param submissionTime Clock time of when jobs of this batch was submitted to the streaming
+ * scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing.
+ * `-1` means the batch has not yet started
+ * @param processingEndTime Clock time of when the last job of this batch finished processing. `-1`
+ * means the batch has not yet completed.
+ * @param schedulingDelay Time taken for the first job of this batch to start processing from the
+ * time this batch was submitted to the streaming scheduler. Essentially, it
+ * is `processingStartTime` - `submissionTime`. `-1` means the batch has not
+ * yet started
+ * @param processingDelay Time taken for the all jobs of this batch to finish processing from the
+ * time they started processing. Essentially, it is
+ * `processingEndTime` - `processingStartTime`. `-1` means the batch has not
+ * yet completed.
+ * @param totalDelay Time taken for all the jobs of this batch to finish processing from the time
+ * they were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
+ * `-1` means the batch has not yet completed.
+ * @param numRecords The number of recorders received by the receivers in this batch
+ * @param outputOperationInfos The output operations in this batch
+ */
+private[streaming] case class JavaBatchInfo(
+ batchTime: Time,
+ streamIdToInputInfo: java.util.Map[Int, JavaStreamInputInfo],
+ submissionTime: Long,
+ processingStartTime: Long,
+ processingEndTime: Long,
+ schedulingDelay: Long,
+ processingDelay: Long,
+ totalDelay: Long,
+ numRecords: Long,
+ outputOperationInfos: java.util.Map[Int, JavaOutputOperationInfo])
+
+/**
+ * Track the information of input stream at specified batch time.
+ *
+ * @param inputStreamId the input stream id
+ * @param numRecords the number of records in a batch
+ * @param metadata metadata for this batch. It should contain at least one standard field named
+ * "Description" which maps to the content that will be shown in the UI.
+ * @param metadataDescription description of this input stream
+ */
+private[streaming] case class JavaStreamInputInfo(
+ inputStreamId: Int,
+ numRecords: Long,
+ metadata: java.util.Map[String, Any],
+ metadataDescription: String)
+
+/**
+ * Class having information about a receiver
+ */
+private[streaming] case class JavaReceiverInfo(
+ streamId: Int,
+ name: String,
+ active: Boolean,
+ location: String,
+ lastErrorMessage: String,
+ lastError: String,
+ lastErrorTime: Long)
+
+/**
+ * Class having information on output operations.
+ *
+ * @param batchTime Time of the batch
+ * @param id Id of this output operation. Different output operations have different ids in a batch.
+ * @param name The name of this output operation.
+ * @param description The description of this output operation.
+ * @param startTime Clock time of when the output operation started processing. `-1` means the
+ * output operation has not yet started
+ * @param endTime Clock time of when the output operation started processing. `-1` means the output
+ * operation has not yet completed
+ * @param failureReason Failure reason if this output operation fails. If the output operation is
+ * successful, this field is `null`.
+ */
+private[streaming] case class JavaOutputOperationInfo(
+ batchTime: Time,
+ id: Int,
+ name: String,
+ description: String,
+ startTime: Long,
+ endTime: Long,
+ failureReason: String)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
new file mode 100644
index 0000000000..2c60b396a6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.streaming.scheduler._
+
+/**
+ * A wrapper to convert a [[JavaStreamingListener]] to a [[StreamingListener]].
+ */
+private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: JavaStreamingListener)
+ extends StreamingListener {
+
+ private def toJavaReceiverInfo(receiverInfo: ReceiverInfo): JavaReceiverInfo = {
+ JavaReceiverInfo(
+ receiverInfo.streamId,
+ receiverInfo.name,
+ receiverInfo.active,
+ receiverInfo.location,
+ receiverInfo.lastErrorMessage,
+ receiverInfo.lastError,
+ receiverInfo.lastErrorTime
+ )
+ }
+
+ private def toJavaStreamInputInfo(streamInputInfo: StreamInputInfo): JavaStreamInputInfo = {
+ JavaStreamInputInfo(
+ streamInputInfo.inputStreamId,
+ streamInputInfo.numRecords: Long,
+ streamInputInfo.metadata.asJava,
+ streamInputInfo.metadataDescription.orNull
+ )
+ }
+
+ private def toJavaOutputOperationInfo(
+ outputOperationInfo: OutputOperationInfo): JavaOutputOperationInfo = {
+ JavaOutputOperationInfo(
+ outputOperationInfo.batchTime,
+ outputOperationInfo.id,
+ outputOperationInfo.name,
+ outputOperationInfo.description: String,
+ outputOperationInfo.startTime.getOrElse(-1),
+ outputOperationInfo.endTime.getOrElse(-1),
+ outputOperationInfo.failureReason.orNull
+ )
+ }
+
+ private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = {
+ JavaBatchInfo(
+ batchInfo.batchTime,
+ batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo(_)).asJava,
+ batchInfo.submissionTime,
+ batchInfo.processingStartTime.getOrElse(-1),
+ batchInfo.processingEndTime.getOrElse(-1),
+ batchInfo.schedulingDelay.getOrElse(-1),
+ batchInfo.processingDelay.getOrElse(-1),
+ batchInfo.totalDelay.getOrElse(-1),
+ batchInfo.numRecords,
+ batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo(_)).asJava
+ )
+ }
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
+ javaStreamingListener.onReceiverStarted(
+ new JavaStreamingListenerReceiverStarted(toJavaReceiverInfo(receiverStarted.receiverInfo)))
+ }
+
+ override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
+ javaStreamingListener.onReceiverError(
+ new JavaStreamingListenerReceiverError(toJavaReceiverInfo(receiverError.receiverInfo)))
+ }
+
+ override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
+ javaStreamingListener.onReceiverStopped(
+ new JavaStreamingListenerReceiverStopped(toJavaReceiverInfo(receiverStopped.receiverInfo)))
+ }
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
+ javaStreamingListener.onBatchSubmitted(
+ new JavaStreamingListenerBatchSubmitted(toJavaBatchInfo(batchSubmitted.batchInfo)))
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
+ javaStreamingListener.onBatchStarted(
+ new JavaStreamingListenerBatchStarted(toJavaBatchInfo(batchStarted.batchInfo)))
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+ javaStreamingListener.onBatchCompleted(
+ new JavaStreamingListenerBatchCompleted(toJavaBatchInfo(batchCompleted.batchInfo)))
+ }
+
+ override def onOutputOperationStarted(
+ outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
+ javaStreamingListener.onOutputOperationStarted(new JavaStreamingListenerOutputOperationStarted(
+ toJavaOutputOperationInfo(outputOperationStarted.outputOperationInfo)))
+ }
+
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
+ javaStreamingListener.onOutputOperationCompleted(
+ new JavaStreamingListenerOutputOperationCompleted(
+ toJavaOutputOperationInfo(outputOperationCompleted.outputOperationInfo)))
+ }
+
+}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
new file mode 100644
index 0000000000..8cc285aa7f
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.*;
+
+public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
+
+ @Override
+ public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) {
+ JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo();
+ receiverInfo.streamId();
+ receiverInfo.name();
+ receiverInfo.active();
+ receiverInfo.location();
+ receiverInfo.lastErrorMessage();
+ receiverInfo.lastError();
+ receiverInfo.lastErrorTime();
+ }
+
+ @Override
+ public void onReceiverError(JavaStreamingListenerReceiverError receiverError) {
+ JavaReceiverInfo receiverInfo = receiverError.receiverInfo();
+ receiverInfo.streamId();
+ receiverInfo.name();
+ receiverInfo.active();
+ receiverInfo.location();
+ receiverInfo.lastErrorMessage();
+ receiverInfo.lastError();
+ receiverInfo.lastErrorTime();
+ }
+
+ @Override
+ public void onReceiverStopped(JavaStreamingListenerReceiverStopped receiverStopped) {
+ JavaReceiverInfo receiverInfo = receiverStopped.receiverInfo();
+ receiverInfo.streamId();
+ receiverInfo.name();
+ receiverInfo.active();
+ receiverInfo.location();
+ receiverInfo.lastErrorMessage();
+ receiverInfo.lastError();
+ receiverInfo.lastErrorTime();
+ }
+
+ @Override
+ public void onBatchSubmitted(JavaStreamingListenerBatchSubmitted batchSubmitted) {
+ super.onBatchSubmitted(batchSubmitted);
+ }
+
+ @Override
+ public void onBatchStarted(JavaStreamingListenerBatchStarted batchStarted) {
+ super.onBatchStarted(batchStarted);
+ }
+
+ @Override
+ public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+ super.onBatchCompleted(batchCompleted);
+ }
+
+ @Override
+ public void onOutputOperationStarted(JavaStreamingListenerOutputOperationStarted outputOperationStarted) {
+ super.onOutputOperationStarted(outputOperationStarted);
+ }
+
+ @Override
+ public void onOutputOperationCompleted(JavaStreamingListenerOutputOperationCompleted outputOperationCompleted) {
+ super.onOutputOperationCompleted(outputOperationCompleted);
+ }
+}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
new file mode 100644
index 0000000000..6d6d61e70c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler._
+
+class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
+
+ test("basic") {
+ val listener = new TestJavaStreamingListener()
+ val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+
+ val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
+ streamId = 2,
+ name = "test",
+ active = true,
+ location = "localhost"
+ ))
+ listenerWrapper.onReceiverStarted(receiverStarted)
+ assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo)
+
+ val receiverStopped = StreamingListenerReceiverStopped(ReceiverInfo(
+ streamId = 2,
+ name = "test",
+ active = false,
+ location = "localhost"
+ ))
+ listenerWrapper.onReceiverStopped(receiverStopped)
+ assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo)
+
+ val receiverError = StreamingListenerReceiverError(ReceiverInfo(
+ streamId = 2,
+ name = "test",
+ active = false,
+ location = "localhost",
+ lastErrorMessage = "failed",
+ lastError = "failed",
+ lastErrorTime = System.currentTimeMillis()
+ ))
+ listenerWrapper.onReceiverError(receiverError)
+ assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo)
+
+ val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
+ batchTime = Time(1000L),
+ streamIdToInputInfo = Map(
+ 0 -> StreamInputInfo(
+ inputStreamId = 0,
+ numRecords = 1000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+ 1 -> StreamInputInfo(
+ inputStreamId = 1,
+ numRecords = 2000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+ submissionTime = 1001L,
+ None,
+ None,
+ outputOperationInfos = Map(
+ 0 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = None,
+ endTime = None,
+ failureReason = None),
+ 1 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 1,
+ name = "op2",
+ description = "operation2",
+ startTime = None,
+ endTime = None,
+ failureReason = None))
+ ))
+ listenerWrapper.onBatchSubmitted(batchSubmitted)
+ assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo)
+
+ val batchStarted = StreamingListenerBatchStarted(BatchInfo(
+ batchTime = Time(1000L),
+ streamIdToInputInfo = Map(
+ 0 -> StreamInputInfo(
+ inputStreamId = 0,
+ numRecords = 1000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+ 1 -> StreamInputInfo(
+ inputStreamId = 1,
+ numRecords = 2000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+ submissionTime = 1001L,
+ Some(1002L),
+ None,
+ outputOperationInfos = Map(
+ 0 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = None,
+ failureReason = None),
+ 1 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 1,
+ name = "op2",
+ description = "operation2",
+ startTime = Some(1005L),
+ endTime = None,
+ failureReason = None))
+ ))
+ listenerWrapper.onBatchStarted(batchStarted)
+ assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)
+
+ val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
+ batchTime = Time(1000L),
+ streamIdToInputInfo = Map(
+ 0 -> StreamInputInfo(
+ inputStreamId = 0,
+ numRecords = 1000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+ 1 -> StreamInputInfo(
+ inputStreamId = 1,
+ numRecords = 2000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+ submissionTime = 1001L,
+ Some(1002L),
+ Some(1010L),
+ outputOperationInfos = Map(
+ 0 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = Some(1004L),
+ failureReason = None),
+ 1 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 1,
+ name = "op2",
+ description = "operation2",
+ startTime = Some(1005L),
+ endTime = Some(1010L),
+ failureReason = None))
+ ))
+ listenerWrapper.onBatchCompleted(batchCompleted)
+ assertBatchInfo(listener.batchCompleted.batchInfo, batchCompleted.batchInfo)
+
+ val outputOperationStarted = StreamingListenerOutputOperationStarted(OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = None,
+ failureReason = None
+ ))
+ listenerWrapper.onOutputOperationStarted(outputOperationStarted)
+ assertOutputOperationInfo(listener.outputOperationStarted.outputOperationInfo,
+ outputOperationStarted.outputOperationInfo)
+
+ val outputOperationCompleted = StreamingListenerOutputOperationCompleted(OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = Some(1004L),
+ failureReason = None
+ ))
+ listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
+ assertOutputOperationInfo(listener.outputOperationCompleted.outputOperationInfo,
+ outputOperationCompleted.outputOperationInfo)
+ }
+
+ private def assertReceiverInfo(
+ javaReceiverInfo: JavaReceiverInfo, receiverInfo: ReceiverInfo): Unit = {
+ assert(javaReceiverInfo.streamId === receiverInfo.streamId)
+ assert(javaReceiverInfo.name === receiverInfo.name)
+ assert(javaReceiverInfo.active === receiverInfo.active)
+ assert(javaReceiverInfo.location === receiverInfo.location)
+ assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
+ assert(javaReceiverInfo.lastError === receiverInfo.lastError)
+ assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
+ }
+
+ private def assertBatchInfo(javaBatchInfo: JavaBatchInfo, batchInfo: BatchInfo): Unit = {
+ assert(javaBatchInfo.batchTime === batchInfo.batchTime)
+ assert(javaBatchInfo.streamIdToInputInfo.size === batchInfo.streamIdToInputInfo.size)
+ batchInfo.streamIdToInputInfo.foreach { case (streamId, streamInputInfo) =>
+ assertStreamingInfo(javaBatchInfo.streamIdToInputInfo.get(streamId), streamInputInfo)
+ }
+ assert(javaBatchInfo.submissionTime === batchInfo.submissionTime)
+ assert(javaBatchInfo.processingStartTime === batchInfo.processingStartTime.getOrElse(-1))
+ assert(javaBatchInfo.processingEndTime === batchInfo.processingEndTime.getOrElse(-1))
+ assert(javaBatchInfo.schedulingDelay === batchInfo.schedulingDelay.getOrElse(-1))
+ assert(javaBatchInfo.processingDelay === batchInfo.processingDelay.getOrElse(-1))
+ assert(javaBatchInfo.totalDelay === batchInfo.totalDelay.getOrElse(-1))
+ assert(javaBatchInfo.numRecords === batchInfo.numRecords)
+ assert(javaBatchInfo.outputOperationInfos.size === batchInfo.outputOperationInfos.size)
+ batchInfo.outputOperationInfos.foreach { case (outputOperationId, outputOperationInfo) =>
+ assertOutputOperationInfo(
+ javaBatchInfo.outputOperationInfos.get(outputOperationId), outputOperationInfo)
+ }
+ }
+
+ private def assertStreamingInfo(
+ javaStreamInputInfo: JavaStreamInputInfo, streamInputInfo: StreamInputInfo): Unit = {
+ assert(javaStreamInputInfo.inputStreamId === streamInputInfo.inputStreamId)
+ assert(javaStreamInputInfo.numRecords === streamInputInfo.numRecords)
+ assert(javaStreamInputInfo.metadata === streamInputInfo.metadata.asJava)
+ assert(javaStreamInputInfo.metadataDescription === streamInputInfo.metadataDescription.orNull)
+ }
+
+ private def assertOutputOperationInfo(
+ javaOutputOperationInfo: JavaOutputOperationInfo,
+ outputOperationInfo: OutputOperationInfo): Unit = {
+ assert(javaOutputOperationInfo.batchTime === outputOperationInfo.batchTime)
+ assert(javaOutputOperationInfo.id === outputOperationInfo.id)
+ assert(javaOutputOperationInfo.name === outputOperationInfo.name)
+ assert(javaOutputOperationInfo.description === outputOperationInfo.description)
+ assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
+ assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
+ assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
+ }
+}
+
+class TestJavaStreamingListener extends JavaStreamingListener {
+
+ var receiverStarted: JavaStreamingListenerReceiverStarted = null
+ var receiverError: JavaStreamingListenerReceiverError = null
+ var receiverStopped: JavaStreamingListenerReceiverStopped = null
+ var batchSubmitted: JavaStreamingListenerBatchSubmitted = null
+ var batchStarted: JavaStreamingListenerBatchStarted = null
+ var batchCompleted: JavaStreamingListenerBatchCompleted = null
+ var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
+ var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
+
+ override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
+ this.receiverStarted = receiverStarted
+ }
+
+ override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
+ this.receiverError = receiverError
+ }
+
+ override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
+ this.receiverStopped = receiverStopped
+ }
+
+ override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
+ this.batchSubmitted = batchSubmitted
+ }
+
+ override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
+ this.batchStarted = batchStarted
+ }
+
+ override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
+ this.batchCompleted = batchCompleted
+ }
+
+ override def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
+ this.outputOperationStarted = outputOperationStarted
+ }
+
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
+ this.outputOperationCompleted = outputOperationCompleted
+ }
+}