aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2017-03-23 14:32:05 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-23 14:32:05 -0700
commit746a558de2136f91f8fe77c6e51256017aa50913 (patch)
tree3325526a325b9774ee1c9a0916210647b1007132
parentb0ae6a38a3ef65e4e853781c5127ba38997a8546 (diff)
downloadspark-746a558de2136f91f8fe77c6e51256017aa50913.tar.gz
spark-746a558de2136f91f8fe77c6e51256017aa50913.tar.bz2
spark-746a558de2136f91f8fe77c6e51256017aa50913.zip
[SPARK-19876][SS][WIP] OneTime Trigger Executor
## What changes were proposed in this pull request? An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers. In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature. ## How was this patch tested? A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly. In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests: - The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop). - The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log. - A OneTime trigger execution that results in an exception being thrown. marmbrus tdas zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17219 from tcondie/stream-commit.
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala2
-rw-r--r--project/MimaExcludes.scala6
-rw-r--r--python/pyspark/sql/streaming.py63
-rw-r--r--python/pyspark/sql/tests.py17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala77
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala81
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala)36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java105
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala48
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala5
19 files changed, 439 insertions, 94 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 7b6396e029..6391d6269c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -301,8 +301,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index bd4528bd21..9925a8ba72 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -64,7 +64,11 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.<init>$default$11"),
// [SPARK-17161] Removing Python-friendly constructors not needed
- ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this")
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"),
+
+ // [SPARK-19876] Add one time trigger, and improve Trigger APIs
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.streaming.Trigger"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.streaming.ProcessingTime")
)
// Exclude rules for 2.1.x
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 80f4340cdf..27d6725615 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -277,44 +277,6 @@ class StreamingQueryManager(object):
self._jsqm.resetTerminated()
-class Trigger(object):
- """Used to indicate how often results should be produced by a :class:`StreamingQuery`.
-
- .. note:: Experimental
-
- .. versionadded:: 2.0
- """
-
- __metaclass__ = ABCMeta
-
- @abstractmethod
- def _to_java_trigger(self, sqlContext):
- """Internal method to construct the trigger on the jvm.
- """
- pass
-
-
-class ProcessingTime(Trigger):
- """A trigger that runs a query periodically based on the processing time. If `interval` is 0,
- the query will run as fast as possible.
-
- The interval should be given as a string, e.g. '2 seconds', '5 minutes', ...
-
- .. note:: Experimental
-
- .. versionadded:: 2.0
- """
-
- def __init__(self, interval):
- if type(interval) != str or len(interval.strip()) == 0:
- raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.")
- self.interval = interval
-
- def _to_java_trigger(self, sqlContext):
- return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
- self.interval)
-
-
class DataStreamReader(OptionUtils):
"""
Interface used to load a streaming :class:`DataFrame` from external storage systems
@@ -790,7 +752,7 @@ class DataStreamWriter(object):
@keyword_only
@since(2.0)
- def trigger(self, processingTime=None):
+ def trigger(self, processingTime=None, once=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
@@ -800,17 +762,26 @@ class DataStreamWriter(object):
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
+ >>> # trigger the query for just once batch of data
+ >>> writer = sdf.writeStream.trigger(once=True)
"""
- from pyspark.sql.streaming import ProcessingTime
- trigger = None
+ jTrigger = None
if processingTime is not None:
+ if once is not None:
+ raise ValueError('Multiple triggers not allowed.')
if type(processingTime) != str or len(processingTime.strip()) == 0:
- raise ValueError('The processing time must be a non empty string. Got: %s' %
+ raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
processingTime)
- trigger = ProcessingTime(processingTime)
- if trigger is None:
- raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
- self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
+ interval = processingTime.strip()
+ jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
+ interval)
+ elif once is not None:
+ if once is not True:
+ raise ValueError('Value for once must be True. Got: %s' % once)
+ jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
+ else:
+ raise ValueError('No trigger provided')
+ self._jwrite = self._jwrite.trigger(jTrigger)
return self
@ignore_unicode_prefix
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 29d613bc5f..b93b7ed192 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1255,13 +1255,26 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
- def test_stream_trigger_takes_keyword_args(self):
+ def test_stream_trigger(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+
+ # Should take at least one arg
+ try:
+ df.writeStream.trigger()
+ except ValueError:
+ pass
+
+ # Should not take multiple args
+ try:
+ df.writeStream.trigger(once=True, processingTime='5 seconds')
+ except ValueError:
+ pass
+
+ # Should take only keyword args
try:
df.writeStream.trigger('5 seconds')
self.fail("Should have thrown an exception")
except TypeError:
- # should throw error
pass
def test_stream_read_options(self):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
new file mode 100644
index 0000000000..fb1a4fb9b1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Used to write log files that represent batch commit points in structured streaming.
+ * A commit log file will be written immediately after the successful completion of a
+ * batch, and before processing the next batch. Here is an execution summary:
+ * - trigger batch 1
+ * - obtain batch 1 offsets and write to offset log
+ * - process batch 1
+ * - write batch 1 to completion log
+ * - trigger batch 2
+ * - obtain bactch 2 offsets and write to offset log
+ * - process batch 2
+ * - write batch 2 to completion log
+ * ....
+ *
+ * The current format of the batch completion log is:
+ * line 1: version
+ * line 2: metadata (optional json string)
+ */
+class BatchCommitLog(sparkSession: SparkSession, path: String)
+ extends HDFSMetadataLog[String](sparkSession, path) {
+
+ override protected def deserialize(in: InputStream): String = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+ if (!lines.hasNext) {
+ throw new IllegalStateException("Incomplete log file in the offset commit log")
+ }
+ parseVersion(lines.next().trim, BatchCommitLog.VERSION)
+ // read metadata
+ lines.next().trim match {
+ case BatchCommitLog.SERIALIZED_VOID => null
+ case metadata => metadata
+ }
+ }
+
+ override protected def serialize(metadata: String, out: OutputStream): Unit = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8))
+ out.write('\n')
+
+ // write metadata or void
+ out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata)
+ .getBytes(UTF_8))
+ }
+}
+
+object BatchCommitLog {
+ private val VERSION = 1
+ private val SERIALIZED_VOID = "{}"
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 60d5283e6b..34e9262af7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -165,6 +165,8 @@ class StreamExecution(
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+ case OneTimeTrigger => OneTimeExecutor()
+ case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
/** Defines the internal state of execution */
@@ -209,6 +211,13 @@ class StreamExecution(
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
+ /**
+ * A log that records the batch ids that have completed. This is used to check if a batch was
+ * fully processed, and its output was committed to the sink, hence no need to process it again.
+ * This is used (for instance) during restart, to help identify which batch to run next.
+ */
+ val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits"))
+
/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING
@@ -291,10 +300,13 @@ class StreamExecution(
runBatch(sparkSessionToRunBatches)
}
}
-
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
+ // Update committed offsets.
+ committedOffsets ++= availableOffsets
+ batchCommitLog.add(currentBatchId, null)
+ logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
@@ -306,9 +318,6 @@ class StreamExecution(
} else {
false
}
-
- // Update committed offsets.
- committedOffsets ++= availableOffsets
updateStatusMessage("Waiting for next trigger")
continueToRun
})
@@ -392,13 +401,33 @@ class StreamExecution(
* - currentBatchId
* - committedOffsets
* - availableOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the offset log) the offsets used to run the last batch
+ * IF last batch exists THEN
+ * Set the next batch to be executed as the last recovered batch
+ * Check the commit log to see which batch was committed last
+ * IF the last batch was committed THEN
+ * Call getBatch using the last batch start and end offsets
+ * // ^^^^ above line is needed since some sources assume last batch always re-executes
+ * Setup for a new batch i.e., start = last batch end, and identify new end
+ * DONE
+ * ELSE
+ * Identify a brand new batch
+ * DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
- case Some((batchId, nextOffsets)) =>
- logInfo(s"Resuming streaming query, starting with batch $batchId")
- currentBatchId = batchId
+ case Some((latestBatchId, nextOffsets)) =>
+ /* First assume that we are re-executing the latest known batch
+ * in the offset log */
+ currentBatchId = latestBatchId
availableOffsets = nextOffsets.toStreamProgress(sources)
+ /* Initialize committed offsets to a committed batch, which at this
+ * is the second latest batch id in the offset log. */
+ offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId =>
+ committedOffsets = secondLatestBatchId.toStreamProgress(sources)
+ }
// update offset metadata
nextOffsets.metadata.foreach { metadata =>
@@ -419,14 +448,37 @@ class StreamExecution(
SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString)
}
- logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
- s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
-
- offsetLog.get(batchId - 1).foreach {
- case lastOffsets =>
- committedOffsets = lastOffsets.toStreamProgress(sources)
- logDebug(s"Resuming with committed offsets: $committedOffsets")
+ /* identify the current batch id: if commit log indicates we successfully processed the
+ * latest batch id in the offset log, then we can safely move to the next batch
+ * i.e., committedBatchId + 1 */
+ batchCommitLog.getLatest() match {
+ case Some((latestCommittedBatchId, _)) =>
+ if (latestBatchId == latestCommittedBatchId) {
+ /* The last batch was successfully committed, so we can safely process a
+ * new next batch but first:
+ * Make a call to getBatch using the offsets from previous batch.
+ * because certain sources (e.g., KafkaSource) assume on restart the last
+ * batch will be executed before getOffset is called again. */
+ availableOffsets.foreach { ao: (Source, Offset) =>
+ val (source, end) = ao
+ if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
+ val start = committedOffsets.get(source)
+ source.getBatch(start, end)
+ }
+ }
+ currentBatchId = latestCommittedBatchId + 1
+ committedOffsets ++= availableOffsets
+ // Construct a new batch be recomputing availableOffsets
+ constructNextBatch()
+ } else if (latestCommittedBatchId < latestBatchId - 1) {
+ logWarning(s"Batch completion log latest batch id is " +
+ s"${latestCommittedBatchId}, which is not trailing " +
+ s"batchid $latestBatchId by one")
+ }
+ case None => logInfo("no commit log present")
}
+ logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
+ s"$committedOffsets and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
@@ -523,6 +575,7 @@ class StreamExecution(
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minBatchesToRetain < currentBatchId) {
offsetLog.purge(currentBatchId - minBatchesToRetain)
+ batchCommitLog.purge(currentBatchId - minBatchesToRetain)
}
}
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index ac510df209..02996ac854 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -30,6 +30,17 @@ trait TriggerExecutor {
}
/**
+ * A trigger executor that runs a single batch only, then terminates.
+ */
+case class OneTimeExecutor() extends TriggerExecutor {
+
+ /**
+ * Execute a single batch using `batchRunner`.
+ */
+ override def execute(batchRunner: () => Boolean): Unit = batchRunner()
+}
+
+/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
new file mode 100644
index 0000000000..271bc4da99
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.streaming.Trigger
+
+/**
+ * A [[Trigger]] that process only one batch of data in a streaming query then terminates
+ * the query.
+ */
+@Experimental
+@InterfaceStability.Evolving
+case object OneTimeTrigger extends Trigger
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index fe52013bad..f2f700590c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -377,7 +377,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private var outputMode: OutputMode = OutputMode.Append
- private var trigger: Trigger = ProcessingTime(0L)
+ private var trigger: Trigger = Trigger.ProcessingTime(0L)
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
index 68f2eab9d4..bdad8e4717 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -28,39 +28,30 @@ import org.apache.spark.unsafe.types.CalendarInterval
/**
* :: Experimental ::
- * Used to indicate how often results should be produced by a [[StreamingQuery]].
- *
- * @since 2.0.0
- */
-@Experimental
-@InterfaceStability.Evolving
-sealed trait Trigger
-
-/**
- * :: Experimental ::
* A trigger that runs a query periodically based on the processing time. If `interval` is 0,
* the query will run as fast as possible.
*
* Scala Example:
* {{{
- * df.write.trigger(ProcessingTime("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime("10 seconds"))
*
* import scala.concurrent.duration._
- * df.write.trigger(ProcessingTime(10.seconds))
+ * df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
*
* Java Example:
* {{{
- * df.write.trigger(ProcessingTime.create("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime.create("10 seconds"))
*
* import java.util.concurrent.TimeUnit
- * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
+@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
case class ProcessingTime(intervalMs: Long) extends Trigger {
require(intervalMs >= 0, "the interval of trigger should not be negative")
}
@@ -73,6 +64,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
*/
@Experimental
@InterfaceStability.Evolving
+@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
object ProcessingTime {
/**
@@ -80,11 +72,13 @@ object ProcessingTime {
*
* Example:
* {{{
- * df.write.trigger(ProcessingTime("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime("10 seconds"))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def apply(interval: String): ProcessingTime = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
@@ -110,11 +104,13 @@ object ProcessingTime {
* Example:
* {{{
* import scala.concurrent.duration._
- * df.write.trigger(ProcessingTime(10.seconds))
+ * df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def apply(interval: Duration): ProcessingTime = {
new ProcessingTime(interval.toMillis)
}
@@ -124,11 +120,13 @@ object ProcessingTime {
*
* Example:
* {{{
- * df.write.trigger(ProcessingTime.create("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime.create("10 seconds"))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def create(interval: String): ProcessingTime = {
apply(interval)
}
@@ -139,11 +137,13 @@ object ProcessingTime {
* Example:
* {{{
* import java.util.concurrent.TimeUnit
- * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0")
def create(interval: Long, unit: TimeUnit): ProcessingTime = {
new ProcessingTime(unit.toMillis(interval))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
new file mode 100644
index 0000000000..a03a851f24
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+
+/**
+ * :: Experimental ::
+ * Policy used to indicate how often results should be produced by a [[StreamingQuery]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class Trigger {
+
+ /**
+ * :: Experimental ::
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `interval` is 0, the query will run as fast as possible.
+ *
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(long intervalMs) {
+ return ProcessingTime.apply(intervalMs);
+ }
+
+ /**
+ * :: Experimental ::
+ * (Java-friendly)
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `interval` is 0, the query will run as fast as possible.
+ *
+ * {{{
+ * import java.util.concurrent.TimeUnit
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+ return ProcessingTime.create(interval, timeUnit);
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-friendly)
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `duration` is 0, the query will run as fast as possible.
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ * df.writeStream.trigger(ProcessingTime(10.seconds))
+ * }}}
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(Duration interval) {
+ return ProcessingTime.apply(interval);
+ }
+
+ /**
+ * :: Experimental ::
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `interval` is effectively 0, the query will run as fast as possible.
+ *
+ * {{{
+ * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+ * }}}
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(String interval) {
+ return ProcessingTime.apply(interval);
+ }
+
+ /**
+ * A trigger that process only one batch of data in a streaming query then terminates
+ * the query.
+ *
+ * @since 2.2.0
+ */
+ public static Trigger Once() {
+ return OneTimeTrigger$.MODULE$;
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 7614ea5eb3..fd850a7365 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -218,7 +218,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
AddData(inputData, 25), // Evict items less than previous watermark.
CheckLastBatch((10, 5)),
StopStream,
- AssertOnQuery { q => // clear the sink
+ AssertOnQuery { q => // purge commit and clear the sink
+ val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
+ q.batchCommitLog.purge(commit)
q.sink.asInstanceOf[MemorySink].clear()
true
},
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 89a25973af..a00a1a582a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -575,9 +575,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
StopStream,
StartStream(ProcessingTime("1 second"), triggerClock = clock),
+ AdvanceManualClock(10 * 1000),
AddData(inputData, "c"),
- AdvanceManualClock(20 * 1000),
+ AdvanceManualClock(1 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f01211e20c..32920f6dfa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -156,6 +156,15 @@ class StreamSuite extends StreamTest {
AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
s"offsetLog's latest should be $expectedId")
+ // Check the latest batchid in the commit log
+ def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
+ AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId,
+ s"commitLog's latest should be $expectedId")
+
+ // Ensure that there has not been an incremental execution after restart
+ def CheckNoIncrementalExecutionCurrentBatchId(): AssertOnQuery =
+ AssertOnQuery(_.lastExecution == null, s"lastExecution not expected to run")
+
// For each batch, we would log the state change during the execution
// This checks whether the key of the state change log is the expected batch id
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
@@ -181,6 +190,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 0
CheckAnswer(1, 2, 3),
CheckIncrementalExecutionCurrentBatchId(0),
+ CheckCommitLogLatestBatchId(0),
CheckOffsetLogLatestBatchId(0),
CheckSinkLatestBatchId(0),
// Add some data in batch 1
@@ -191,6 +201,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
+ CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
@@ -203,6 +214,7 @@ class StreamSuite extends StreamTest {
// the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
+ CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
@@ -210,14 +222,15 @@ class StreamSuite extends StreamTest {
StopStream,
StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),
- /* -- batch 1 rerun ----------------- */
- // this batch 1 would re-run because the latest batch id logged in offset log is 1
+ /* -- batch 1 no rerun ----------------- */
+ // batch 1 would not re-run because the latest batch id logged in commit log is 1
AdvanceManualClock(10 * 1000),
+ CheckNoIncrementalExecutionCurrentBatchId(),
/* -- batch 2 ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
- CheckIncrementalExecutionCurrentBatchId(1),
+ CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
// Add some data in batch 2
@@ -228,6 +241,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 2
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
CheckIncrementalExecutionCurrentBatchId(2),
+ CheckCommitLogLatestBatchId(2),
CheckOffsetLogLatestBatchId(2),
CheckSinkLatestBatchId(2))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 60e2375a98..8cf1791336 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -159,7 +159,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** Starts the stream, resuming if data has already been processed. It must not be running. */
case class StartStream(
- trigger: Trigger = ProcessingTime(0),
+ trigger: Trigger = Trigger.ProcessingTime(0),
triggerClock: Clock = new SystemClock,
additionalConfs: Map[String, String] = Map.empty)
extends StreamAction
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 0c8015672b..600c039cd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -272,11 +272,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
+ q.batchCommitLog.purge(3)
// advance by a minute i.e., 90 seconds total
clock.advance(60 * 1000L)
true
},
StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+ // The commit log blown, causing the last batch to re-run
CheckLastBatch((20L, 1), (85L, 1)),
AssertOnQuery { q =>
clock.getTimeMillis() == 90000L
@@ -322,11 +324,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
+ q.batchCommitLog.purge(3)
// advance by 60 days i.e., 90 days total
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
true
},
StartStream(ProcessingTime("10 day"), triggerClock = clock),
+ // Commit log blown, causing a re-run of the last batch
CheckLastBatch((20L, 1), (85L, 1)),
// advance clock to 100 days, should retain keys >= 90
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index eb09b9ffcf..03dad8a6dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -57,6 +57,20 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val inputData = new MemoryStream[Int](0, sqlContext)
val df = inputData.toDS().as[Long].map { 10 / _ }
val listener = new EventCollector
+
+ case class AssertStreamExecThreadToWaitForClock()
+ extends AssertOnQuery(q => {
+ eventually(Timeout(streamingTimeout)) {
+ if (q.exception.isEmpty) {
+ assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }, "")
+
try {
// No events until started
spark.streams.addListener(listener)
@@ -81,6 +95,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Progress event generated when data processed
AddData(inputData, 1, 2),
AdvanceManualClock(100),
+ AssertStreamExecThreadToWaitForClock(),
CheckAnswer(10, 5),
AssertOnQuery { query =>
assert(listener.progressEvents.nonEmpty)
@@ -109,8 +124,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Termination event generated with exception message when stopped with error
StartStream(ProcessingTime(100), triggerClock = clock),
+ AssertStreamExecThreadToWaitForClock(),
AddData(inputData, 0),
- AdvanceManualClock(100),
+ AdvanceManualClock(100), // process bad data
ExpectFailure[SparkException](),
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index a0a2b2b4c9..3f41ecdb7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -158,6 +158,49 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}
+ testQuietly("OneTime trigger, commit log, and exception") {
+ import Trigger.Once
+ val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map { 6 / _}
+
+ testStream(mapped)(
+ AssertOnQuery(_.isActive === true),
+ StopStream,
+ AddData(inputData, 1, 2),
+ StartStream(trigger = Once),
+ CheckAnswer(6, 3),
+ StopStream, // clears out StreamTest state
+ AssertOnQuery { q =>
+ // both commit log and offset log contain the same (latest) batch id
+ q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) ==
+ q.offsetLog.getLatest().map(_._1).getOrElse(-2L)
+ },
+ AssertOnQuery { q =>
+ // blow away commit log and sink result
+ q.batchCommitLog.purge(1)
+ q.sink.asInstanceOf[MemorySink].clear()
+ true
+ },
+ StartStream(trigger = Once),
+ CheckAnswer(6, 3), // ensure we fall back to offset log and reprocess batch
+ StopStream,
+ AddData(inputData, 3),
+ StartStream(trigger = Once),
+ CheckLastBatch(2), // commit log should be back in place
+ StopStream,
+ AddData(inputData, 0),
+ StartStream(trigger = Once),
+ ExpectFailure[SparkException](),
+ AssertOnQuery(_.isActive === false),
+ AssertOnQuery(q => {
+ q.exception.get.startOffset ===
+ q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString &&
+ q.exception.get.endOffset ===
+ q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString
+ }, "incorrect start offset or end offset on exception")
+ )
+ }
+
testQuietly("status, lastProgress, and recentProgress") {
import StreamingQuerySuite._
clock = new StreamManualClock
@@ -237,6 +280,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AdvanceManualClock(500), // time = 1100 to unblock job
AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
CheckAnswer(2),
+ AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
@@ -275,6 +319,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AddData(inputData, 1, 2),
AdvanceManualClock(100), // allow another trigger
+ AssertStreamExecThreadToWaitForClock(),
CheckAnswer(4),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === false),
@@ -306,8 +351,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
// Test status and progress after query terminated with error
StartStream(ProcessingTime(100), triggerClock = clock),
+ AdvanceManualClock(100), // ensure initial trigger completes before AddData
AddData(inputData, 0),
- AdvanceManualClock(100),
+ AdvanceManualClock(100), // allow another trigger
ExpectFailure[SparkException](),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 341ab0eb92..05cd3d9f7c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _}
+import org.apache.spark.sql.streaming.Trigger._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -346,7 +347,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
q = df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
- .trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
+ .trigger(ProcessingTime(100, TimeUnit.SECONDS))
.start()
q.stop()