diff options
author | Tyson Condie <tcondie@gmail.com> | 2017-03-23 14:32:05 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-23 14:32:05 -0700 |
commit | 746a558de2136f91f8fe77c6e51256017aa50913 (patch) | |
tree | 3325526a325b9774ee1c9a0916210647b1007132 | |
parent | b0ae6a38a3ef65e4e853781c5127ba38997a8546 (diff) | |
download | spark-746a558de2136f91f8fe77c6e51256017aa50913.tar.gz spark-746a558de2136f91f8fe77c6e51256017aa50913.tar.bz2 spark-746a558de2136f91f8fe77c6e51256017aa50913.zip |
[SPARK-19876][SS][WIP] OneTime Trigger Executor
## What changes were proposed in this pull request?
An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers.
In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature.
## How was this patch tested?
A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly.
In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests:
- The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop).
- The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log.
- A OneTime trigger execution that results in an exception being thrown.
marmbrus tdas zsxwing
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Tyson Condie <tcondie@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #17219 from tcondie/stream-commit.
19 files changed, 439 insertions, 94 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 7b6396e029..6391d6269c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -301,8 +301,6 @@ class KafkaSourceSuite extends KafkaSourceTest { StopStream, StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, - AdvanceManualClock(100), - waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index bd4528bd21..9925a8ba72 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -64,7 +64,11 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.<init>$default$11"), // [SPARK-17161] Removing Python-friendly constructors not needed - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"), + + // [SPARK-19876] Add one time trigger, and improve Trigger APIs + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.streaming.Trigger"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.streaming.ProcessingTime") ) // Exclude rules for 2.1.x diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 80f4340cdf..27d6725615 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -277,44 +277,6 @@ class StreamingQueryManager(object): self._jsqm.resetTerminated() -class Trigger(object): - """Used to indicate how often results should be produced by a :class:`StreamingQuery`. - - .. note:: Experimental - - .. versionadded:: 2.0 - """ - - __metaclass__ = ABCMeta - - @abstractmethod - def _to_java_trigger(self, sqlContext): - """Internal method to construct the trigger on the jvm. - """ - pass - - -class ProcessingTime(Trigger): - """A trigger that runs a query periodically based on the processing time. If `interval` is 0, - the query will run as fast as possible. - - The interval should be given as a string, e.g. '2 seconds', '5 minutes', ... - - .. note:: Experimental - - .. versionadded:: 2.0 - """ - - def __init__(self, interval): - if type(interval) != str or len(interval.strip()) == 0: - raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.") - self.interval = interval - - def _to_java_trigger(self, sqlContext): - return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create( - self.interval) - - class DataStreamReader(OptionUtils): """ Interface used to load a streaming :class:`DataFrame` from external storage systems @@ -790,7 +752,7 @@ class DataStreamWriter(object): @keyword_only @since(2.0) - def trigger(self, processingTime=None): + def trigger(self, processingTime=None, once=None): """Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. @@ -800,17 +762,26 @@ class DataStreamWriter(object): >>> # trigger the query for execution every 5 seconds >>> writer = sdf.writeStream.trigger(processingTime='5 seconds') + >>> # trigger the query for just once batch of data + >>> writer = sdf.writeStream.trigger(once=True) """ - from pyspark.sql.streaming import ProcessingTime - trigger = None + jTrigger = None if processingTime is not None: + if once is not None: + raise ValueError('Multiple triggers not allowed.') if type(processingTime) != str or len(processingTime.strip()) == 0: - raise ValueError('The processing time must be a non empty string. Got: %s' % + raise ValueError('Value for processingTime must be a non empty string. Got: %s' % processingTime) - trigger = ProcessingTime(processingTime) - if trigger is None: - raise ValueError('A trigger was not provided. Supported triggers: processingTime.') - self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark)) + interval = processingTime.strip() + jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime( + interval) + elif once is not None: + if once is not True: + raise ValueError('Value for once must be True. Got: %s' % once) + jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once() + else: + raise ValueError('No trigger provided') + self._jwrite = self._jwrite.trigger(jTrigger) return self @ignore_unicode_prefix diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 29d613bc5f..b93b7ed192 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1255,13 +1255,26 @@ class SQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) - def test_stream_trigger_takes_keyword_args(self): + def test_stream_trigger(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') + + # Should take at least one arg + try: + df.writeStream.trigger() + except ValueError: + pass + + # Should not take multiple args + try: + df.writeStream.trigger(once=True, processingTime='5 seconds') + except ValueError: + pass + + # Should take only keyword args try: df.writeStream.trigger('5 seconds') self.fail("Should have thrown an exception") except TypeError: - # should throw error pass def test_stream_read_options(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala new file mode 100644 index 0000000000..fb1a4fb9b1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets._ + +import scala.io.{Source => IOSource} + +import org.apache.spark.sql.SparkSession + +/** + * Used to write log files that represent batch commit points in structured streaming. + * A commit log file will be written immediately after the successful completion of a + * batch, and before processing the next batch. Here is an execution summary: + * - trigger batch 1 + * - obtain batch 1 offsets and write to offset log + * - process batch 1 + * - write batch 1 to completion log + * - trigger batch 2 + * - obtain bactch 2 offsets and write to offset log + * - process batch 2 + * - write batch 2 to completion log + * .... + * + * The current format of the batch completion log is: + * line 1: version + * line 2: metadata (optional json string) + */ +class BatchCommitLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[String](sparkSession, path) { + + override protected def deserialize(in: InputStream): String = { + // called inside a try-finally where the underlying stream is closed in the caller + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + parseVersion(lines.next().trim, BatchCommitLog.VERSION) + // read metadata + lines.next().trim match { + case BatchCommitLog.SERIALIZED_VOID => null + case metadata => metadata + } + } + + override protected def serialize(metadata: String, out: OutputStream): Unit = { + // called inside a try-finally where the underlying stream is closed in the caller + out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8)) + out.write('\n') + + // write metadata or void + out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata) + .getBytes(UTF_8)) + } +} + +object BatchCommitLog { + private val VERSION = 1 + private val SERIALIZED_VOID = "{}" +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 60d5283e6b..34e9262af7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -165,6 +165,8 @@ class StreamExecution( private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => OneTimeExecutor() + case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") } /** Defines the internal state of execution */ @@ -209,6 +211,13 @@ class StreamExecution( */ val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) + /** + * A log that records the batch ids that have completed. This is used to check if a batch was + * fully processed, and its output was committed to the sink, hence no need to process it again. + * This is used (for instance) during restart, to help identify which batch to run next. + */ + val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits")) + /** Whether all fields of the query have been initialized */ private def isInitialized: Boolean = state.get != INITIALIZING @@ -291,10 +300,13 @@ class StreamExecution( runBatch(sparkSessionToRunBatches) } } - // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) if (dataAvailable) { + // Update committed offsets. + committedOffsets ++= availableOffsets + batchCommitLog.add(currentBatchId, null) + logDebug(s"batch ${currentBatchId} committed") // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 } else { @@ -306,9 +318,6 @@ class StreamExecution( } else { false } - - // Update committed offsets. - committedOffsets ++= availableOffsets updateStatusMessage("Waiting for next trigger") continueToRun }) @@ -392,13 +401,33 @@ class StreamExecution( * - currentBatchId * - committedOffsets * - availableOffsets + * The basic structure of this method is as follows: + * + * Identify (from the offset log) the offsets used to run the last batch + * IF last batch exists THEN + * Set the next batch to be executed as the last recovered batch + * Check the commit log to see which batch was committed last + * IF the last batch was committed THEN + * Call getBatch using the last batch start and end offsets + * // ^^^^ above line is needed since some sources assume last batch always re-executes + * Setup for a new batch i.e., start = last batch end, and identify new end + * DONE + * ELSE + * Identify a brand new batch + * DONE */ private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { offsetLog.getLatest() match { - case Some((batchId, nextOffsets)) => - logInfo(s"Resuming streaming query, starting with batch $batchId") - currentBatchId = batchId + case Some((latestBatchId, nextOffsets)) => + /* First assume that we are re-executing the latest known batch + * in the offset log */ + currentBatchId = latestBatchId availableOffsets = nextOffsets.toStreamProgress(sources) + /* Initialize committed offsets to a committed batch, which at this + * is the second latest batch id in the offset log. */ + offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId => + committedOffsets = secondLatestBatchId.toStreamProgress(sources) + } // update offset metadata nextOffsets.metadata.foreach { metadata => @@ -419,14 +448,37 @@ class StreamExecution( SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString) } - logDebug(s"Found possibly unprocessed offsets $availableOffsets " + - s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") - - offsetLog.get(batchId - 1).foreach { - case lastOffsets => - committedOffsets = lastOffsets.toStreamProgress(sources) - logDebug(s"Resuming with committed offsets: $committedOffsets") + /* identify the current batch id: if commit log indicates we successfully processed the + * latest batch id in the offset log, then we can safely move to the next batch + * i.e., committedBatchId + 1 */ + batchCommitLog.getLatest() match { + case Some((latestCommittedBatchId, _)) => + if (latestBatchId == latestCommittedBatchId) { + /* The last batch was successfully committed, so we can safely process a + * new next batch but first: + * Make a call to getBatch using the offsets from previous batch. + * because certain sources (e.g., KafkaSource) assume on restart the last + * batch will be executed before getOffset is called again. */ + availableOffsets.foreach { ao: (Source, Offset) => + val (source, end) = ao + if (committedOffsets.get(source).map(_ != end).getOrElse(true)) { + val start = committedOffsets.get(source) + source.getBatch(start, end) + } + } + currentBatchId = latestCommittedBatchId + 1 + committedOffsets ++= availableOffsets + // Construct a new batch be recomputing availableOffsets + constructNextBatch() + } else if (latestCommittedBatchId < latestBatchId - 1) { + logWarning(s"Batch completion log latest batch id is " + + s"${latestCommittedBatchId}, which is not trailing " + + s"batchid $latestBatchId by one") + } + case None => logInfo("no commit log present") } + logDebug(s"Resuming at batch $currentBatchId with committed offsets " + + s"$committedOffsets and available offsets $availableOffsets") case None => // We are starting this stream for the first time. logInfo(s"Starting new streaming query.") currentBatchId = 0 @@ -523,6 +575,7 @@ class StreamExecution( // Note that purge is exclusive, i.e. it purges everything before the target ID. if (minBatchesToRetain < currentBatchId) { offsetLog.purge(currentBatchId - minBatchesToRetain) + batchCommitLog.purge(currentBatchId - minBatchesToRetain) } } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index ac510df209..02996ac854 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -30,6 +30,17 @@ trait TriggerExecutor { } /** + * A trigger executor that runs a single batch only, then terminates. + */ +case class OneTimeExecutor() extends TriggerExecutor { + + /** + * Execute a single batch using `batchRunner`. + */ + override def execute(batchRunner: () => Boolean): Unit = batchRunner() +} + +/** * A trigger executor that runs a batch every `intervalMs` milliseconds. */ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala new file mode 100644 index 0000000000..271bc4da99 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.streaming.Trigger + +/** + * A [[Trigger]] that process only one batch of data in a streaming query then terminates + * the query. + */ +@Experimental +@InterfaceStability.Evolving +case object OneTimeTrigger extends Trigger diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index fe52013bad..f2f700590c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -377,7 +377,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { private var outputMode: OutputMode = OutputMode.Append - private var trigger: Trigger = ProcessingTime(0L) + private var trigger: Trigger = Trigger.ProcessingTime(0L) private var extraOptions = new scala.collection.mutable.HashMap[String, String] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index 68f2eab9d4..bdad8e4717 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -28,39 +28,30 @@ import org.apache.spark.unsafe.types.CalendarInterval /** * :: Experimental :: - * Used to indicate how often results should be produced by a [[StreamingQuery]]. - * - * @since 2.0.0 - */ -@Experimental -@InterfaceStability.Evolving -sealed trait Trigger - -/** - * :: Experimental :: * A trigger that runs a query periodically based on the processing time. If `interval` is 0, * the query will run as fast as possible. * * Scala Example: * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) + * df.writeStream.trigger(ProcessingTime("10 seconds")) * * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) + * df.writeStream.trigger(ProcessingTime(10.seconds)) * }}} * * Java Example: * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) + * df.writeStream.trigger(ProcessingTime.create("10 seconds")) * * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} * * @since 2.0.0 */ @Experimental @InterfaceStability.Evolving +@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") case class ProcessingTime(intervalMs: Long) extends Trigger { require(intervalMs >= 0, "the interval of trigger should not be negative") } @@ -73,6 +64,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { */ @Experimental @InterfaceStability.Evolving +@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") object ProcessingTime { /** @@ -80,11 +72,13 @@ object ProcessingTime { * * Example: * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) + * df.writeStream.trigger(ProcessingTime("10 seconds")) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") def apply(interval: String): ProcessingTime = { if (StringUtils.isBlank(interval)) { throw new IllegalArgumentException( @@ -110,11 +104,13 @@ object ProcessingTime { * Example: * {{{ * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) + * df.writeStream.trigger(ProcessingTime(10.seconds)) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") def apply(interval: Duration): ProcessingTime = { new ProcessingTime(interval.toMillis) } @@ -124,11 +120,13 @@ object ProcessingTime { * * Example: * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) + * df.writeStream.trigger(ProcessingTime.create("10 seconds")) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") def create(interval: String): ProcessingTime = { apply(interval) } @@ -139,11 +137,13 @@ object ProcessingTime { * Example: * {{{ * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0") def create(interval: Long, unit: TimeUnit): ProcessingTime = { new ProcessingTime(unit.toMillis(interval)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java new file mode 100644 index 0000000000..a03a851f24 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; + +/** + * :: Experimental :: + * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. + * + * @since 2.0.0 + */ +@Experimental +@InterfaceStability.Evolving +public class Trigger { + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long intervalMs) { + return ProcessingTime.apply(intervalMs); + } + + /** + * :: Experimental :: + * (Java-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * {{{ + * import java.util.concurrent.TimeUnit + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { + return ProcessingTime.create(interval, timeUnit); + } + + /** + * :: Experimental :: + * (Scala-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `duration` is 0, the query will run as fast as possible. + * + * {{{ + * import scala.concurrent.duration._ + * df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(Duration interval) { + return ProcessingTime.apply(interval); + } + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is effectively 0, the query will run as fast as possible. + * + * {{{ + * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(String interval) { + return ProcessingTime.apply(interval); + } + + /** + * A trigger that process only one batch of data in a streaming query then terminates + * the query. + * + * @since 2.2.0 + */ + public static Trigger Once() { + return OneTimeTrigger$.MODULE$; + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 7614ea5eb3..fd850a7365 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -218,7 +218,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin AddData(inputData, 25), // Evict items less than previous watermark. CheckLastBatch((10, 5)), StopStream, - AssertOnQuery { q => // clear the sink + AssertOnQuery { q => // purge commit and clear the sink + val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L + q.batchCommitLog.purge(commit) q.sink.asInstanceOf[MemorySink].clear() true }, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index 89a25973af..a00a1a582a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -575,9 +575,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf StopStream, StartStream(ProcessingTime("1 second"), triggerClock = clock), + AdvanceManualClock(10 * 1000), AddData(inputData, "c"), - AdvanceManualClock(20 * 1000), + AdvanceManualClock(1 * 1000), CheckLastBatch(("b", "-1"), ("c", "1")), assertNumStateRows(total = 1, updated = 2), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index f01211e20c..32920f6dfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -156,6 +156,15 @@ class StreamSuite extends StreamTest { AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId, s"offsetLog's latest should be $expectedId") + // Check the latest batchid in the commit log + def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId, + s"commitLog's latest should be $expectedId") + + // Ensure that there has not been an incremental execution after restart + def CheckNoIncrementalExecutionCurrentBatchId(): AssertOnQuery = + AssertOnQuery(_.lastExecution == null, s"lastExecution not expected to run") + // For each batch, we would log the state change during the execution // This checks whether the key of the state change log is the expected batch id def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery = @@ -181,6 +190,7 @@ class StreamSuite extends StreamTest { // Check the results of batch 0 CheckAnswer(1, 2, 3), CheckIncrementalExecutionCurrentBatchId(0), + CheckCommitLogLatestBatchId(0), CheckOffsetLogLatestBatchId(0), CheckSinkLatestBatchId(0), // Add some data in batch 1 @@ -191,6 +201,7 @@ class StreamSuite extends StreamTest { // Check the results of batch 1 CheckAnswer(1, 2, 3, 4, 5, 6), CheckIncrementalExecutionCurrentBatchId(1), + CheckCommitLogLatestBatchId(1), CheckOffsetLogLatestBatchId(1), CheckSinkLatestBatchId(1), @@ -203,6 +214,7 @@ class StreamSuite extends StreamTest { // the currentId does not get logged (e.g. as 2) even if the clock has advanced many times CheckAnswer(1, 2, 3, 4, 5, 6), CheckIncrementalExecutionCurrentBatchId(1), + CheckCommitLogLatestBatchId(1), CheckOffsetLogLatestBatchId(1), CheckSinkLatestBatchId(1), @@ -210,14 +222,15 @@ class StreamSuite extends StreamTest { StopStream, StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), - /* -- batch 1 rerun ----------------- */ - // this batch 1 would re-run because the latest batch id logged in offset log is 1 + /* -- batch 1 no rerun ----------------- */ + // batch 1 would not re-run because the latest batch id logged in commit log is 1 AdvanceManualClock(10 * 1000), + CheckNoIncrementalExecutionCurrentBatchId(), /* -- batch 2 ----------------------- */ // Check the results of batch 1 CheckAnswer(1, 2, 3, 4, 5, 6), - CheckIncrementalExecutionCurrentBatchId(1), + CheckCommitLogLatestBatchId(1), CheckOffsetLogLatestBatchId(1), CheckSinkLatestBatchId(1), // Add some data in batch 2 @@ -228,6 +241,7 @@ class StreamSuite extends StreamTest { // Check the results of batch 2 CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9), CheckIncrementalExecutionCurrentBatchId(2), + CheckCommitLogLatestBatchId(2), CheckOffsetLogLatestBatchId(2), CheckSinkLatestBatchId(2)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 60e2375a98..8cf1791336 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -159,7 +159,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** Starts the stream, resuming if data has already been processed. It must not be running. */ case class StartStream( - trigger: Trigger = ProcessingTime(0), + trigger: Trigger = Trigger.ProcessingTime(0), triggerClock: Clock = new SystemClock, additionalConfs: Map[String, String] = Map.empty) extends StreamAction diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 0c8015672b..600c039cd0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -272,11 +272,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() + q.batchCommitLog.purge(3) // advance by a minute i.e., 90 seconds total clock.advance(60 * 1000L) true }, StartStream(ProcessingTime("10 seconds"), triggerClock = clock), + // The commit log blown, causing the last batch to re-run CheckLastBatch((20L, 1), (85L, 1)), AssertOnQuery { q => clock.getTimeMillis() == 90000L @@ -322,11 +324,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() + q.batchCommitLog.purge(3) // advance by 60 days i.e., 90 days total clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) true }, StartStream(ProcessingTime("10 day"), triggerClock = clock), + // Commit log blown, causing a re-run of the last batch CheckLastBatch((20L, 1), (85L, 1)), // advance clock to 100 days, should retain keys >= 90 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index eb09b9ffcf..03dad8a6dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -57,6 +57,20 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val inputData = new MemoryStream[Int](0, sqlContext) val df = inputData.toDS().as[Long].map { 10 / _ } val listener = new EventCollector + + case class AssertStreamExecThreadToWaitForClock() + extends AssertOnQuery(q => { + eventually(Timeout(streamingTimeout)) { + if (q.exception.isEmpty) { + assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis)) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + }, "") + try { // No events until started spark.streams.addListener(listener) @@ -81,6 +95,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Progress event generated when data processed AddData(inputData, 1, 2), AdvanceManualClock(100), + AssertStreamExecThreadToWaitForClock(), CheckAnswer(10, 5), AssertOnQuery { query => assert(listener.progressEvents.nonEmpty) @@ -109,8 +124,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Termination event generated with exception message when stopped with error StartStream(ProcessingTime(100), triggerClock = clock), + AssertStreamExecThreadToWaitForClock(), AddData(inputData, 0), - AdvanceManualClock(100), + AdvanceManualClock(100), // process bad data ExpectFailure[SparkException](), AssertOnQuery { query => eventually(Timeout(streamingTimeout)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index a0a2b2b4c9..3f41ecdb7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -158,6 +158,49 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + testQuietly("OneTime trigger, commit log, and exception") { + import Trigger.Once + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map { 6 / _} + + testStream(mapped)( + AssertOnQuery(_.isActive === true), + StopStream, + AddData(inputData, 1, 2), + StartStream(trigger = Once), + CheckAnswer(6, 3), + StopStream, // clears out StreamTest state + AssertOnQuery { q => + // both commit log and offset log contain the same (latest) batch id + q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) == + q.offsetLog.getLatest().map(_._1).getOrElse(-2L) + }, + AssertOnQuery { q => + // blow away commit log and sink result + q.batchCommitLog.purge(1) + q.sink.asInstanceOf[MemorySink].clear() + true + }, + StartStream(trigger = Once), + CheckAnswer(6, 3), // ensure we fall back to offset log and reprocess batch + StopStream, + AddData(inputData, 3), + StartStream(trigger = Once), + CheckLastBatch(2), // commit log should be back in place + StopStream, + AddData(inputData, 0), + StartStream(trigger = Once), + ExpectFailure[SparkException](), + AssertOnQuery(_.isActive === false), + AssertOnQuery(q => { + q.exception.get.startOffset === + q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString && + q.exception.get.endOffset === + q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString + }, "incorrect start offset or end offset on exception") + ) + } + testQuietly("status, lastProgress, and recentProgress") { import StreamingQuerySuite._ clock = new StreamManualClock @@ -237,6 +280,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AdvanceManualClock(500), // time = 1100 to unblock job AssertOnQuery { _ => clock.getTimeMillis() === 1100 }, CheckAnswer(2), + AssertStreamExecThreadToWaitForClock(), AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), @@ -275,6 +319,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AddData(inputData, 1, 2), AdvanceManualClock(100), // allow another trigger + AssertStreamExecThreadToWaitForClock(), CheckAnswer(4), AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === false), @@ -306,8 +351,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi // Test status and progress after query terminated with error StartStream(ProcessingTime(100), triggerClock = clock), + AdvanceManualClock(100), // ensure initial trigger completes before AddData AddData(inputData, 0), - AdvanceManualClock(100), + AdvanceManualClock(100), // allow another trigger ExpectFailure[SparkException](), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 341ab0eb92..05cd3d9f7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _} +import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -346,7 +347,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { q = df.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) - .trigger(ProcessingTime.create(100, TimeUnit.SECONDS)) + .trigger(ProcessingTime(100, TimeUnit.SECONDS)) .start() q.stop() |