diff options
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala | 77 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 81 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala | 11 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala | 29 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 2 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala) | 36 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java | 105 |
7 files changed, 308 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala new file mode 100644 index 0000000000..fb1a4fb9b1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets._ + +import scala.io.{Source => IOSource} + +import org.apache.spark.sql.SparkSession + +/** + * Used to write log files that represent batch commit points in structured streaming. + * A commit log file will be written immediately after the successful completion of a + * batch, and before processing the next batch. Here is an execution summary: + * - trigger batch 1 + * - obtain batch 1 offsets and write to offset log + * - process batch 1 + * - write batch 1 to completion log + * - trigger batch 2 + * - obtain bactch 2 offsets and write to offset log + * - process batch 2 + * - write batch 2 to completion log + * .... + * + * The current format of the batch completion log is: + * line 1: version + * line 2: metadata (optional json string) + */ +class BatchCommitLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[String](sparkSession, path) { + + override protected def deserialize(in: InputStream): String = { + // called inside a try-finally where the underlying stream is closed in the caller + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + parseVersion(lines.next().trim, BatchCommitLog.VERSION) + // read metadata + lines.next().trim match { + case BatchCommitLog.SERIALIZED_VOID => null + case metadata => metadata + } + } + + override protected def serialize(metadata: String, out: OutputStream): Unit = { + // called inside a try-finally where the underlying stream is closed in the caller + out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8)) + out.write('\n') + + // write metadata or void + out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata) + .getBytes(UTF_8)) + } +} + +object BatchCommitLog { + private val VERSION = 1 + private val SERIALIZED_VOID = "{}" +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 60d5283e6b..34e9262af7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -165,6 +165,8 @@ class StreamExecution( private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => OneTimeExecutor() + case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") } /** Defines the internal state of execution */ @@ -209,6 +211,13 @@ class StreamExecution( */ val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) + /** + * A log that records the batch ids that have completed. This is used to check if a batch was + * fully processed, and its output was committed to the sink, hence no need to process it again. + * This is used (for instance) during restart, to help identify which batch to run next. + */ + val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits")) + /** Whether all fields of the query have been initialized */ private def isInitialized: Boolean = state.get != INITIALIZING @@ -291,10 +300,13 @@ class StreamExecution( runBatch(sparkSessionToRunBatches) } } - // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) if (dataAvailable) { + // Update committed offsets. + committedOffsets ++= availableOffsets + batchCommitLog.add(currentBatchId, null) + logDebug(s"batch ${currentBatchId} committed") // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 } else { @@ -306,9 +318,6 @@ class StreamExecution( } else { false } - - // Update committed offsets. - committedOffsets ++= availableOffsets updateStatusMessage("Waiting for next trigger") continueToRun }) @@ -392,13 +401,33 @@ class StreamExecution( * - currentBatchId * - committedOffsets * - availableOffsets + * The basic structure of this method is as follows: + * + * Identify (from the offset log) the offsets used to run the last batch + * IF last batch exists THEN + * Set the next batch to be executed as the last recovered batch + * Check the commit log to see which batch was committed last + * IF the last batch was committed THEN + * Call getBatch using the last batch start and end offsets + * // ^^^^ above line is needed since some sources assume last batch always re-executes + * Setup for a new batch i.e., start = last batch end, and identify new end + * DONE + * ELSE + * Identify a brand new batch + * DONE */ private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { offsetLog.getLatest() match { - case Some((batchId, nextOffsets)) => - logInfo(s"Resuming streaming query, starting with batch $batchId") - currentBatchId = batchId + case Some((latestBatchId, nextOffsets)) => + /* First assume that we are re-executing the latest known batch + * in the offset log */ + currentBatchId = latestBatchId availableOffsets = nextOffsets.toStreamProgress(sources) + /* Initialize committed offsets to a committed batch, which at this + * is the second latest batch id in the offset log. */ + offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId => + committedOffsets = secondLatestBatchId.toStreamProgress(sources) + } // update offset metadata nextOffsets.metadata.foreach { metadata => @@ -419,14 +448,37 @@ class StreamExecution( SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString) } - logDebug(s"Found possibly unprocessed offsets $availableOffsets " + - s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") - - offsetLog.get(batchId - 1).foreach { - case lastOffsets => - committedOffsets = lastOffsets.toStreamProgress(sources) - logDebug(s"Resuming with committed offsets: $committedOffsets") + /* identify the current batch id: if commit log indicates we successfully processed the + * latest batch id in the offset log, then we can safely move to the next batch + * i.e., committedBatchId + 1 */ + batchCommitLog.getLatest() match { + case Some((latestCommittedBatchId, _)) => + if (latestBatchId == latestCommittedBatchId) { + /* The last batch was successfully committed, so we can safely process a + * new next batch but first: + * Make a call to getBatch using the offsets from previous batch. + * because certain sources (e.g., KafkaSource) assume on restart the last + * batch will be executed before getOffset is called again. */ + availableOffsets.foreach { ao: (Source, Offset) => + val (source, end) = ao + if (committedOffsets.get(source).map(_ != end).getOrElse(true)) { + val start = committedOffsets.get(source) + source.getBatch(start, end) + } + } + currentBatchId = latestCommittedBatchId + 1 + committedOffsets ++= availableOffsets + // Construct a new batch be recomputing availableOffsets + constructNextBatch() + } else if (latestCommittedBatchId < latestBatchId - 1) { + logWarning(s"Batch completion log latest batch id is " + + s"${latestCommittedBatchId}, which is not trailing " + + s"batchid $latestBatchId by one") + } + case None => logInfo("no commit log present") } + logDebug(s"Resuming at batch $currentBatchId with committed offsets " + + s"$committedOffsets and available offsets $availableOffsets") case None => // We are starting this stream for the first time. logInfo(s"Starting new streaming query.") currentBatchId = 0 @@ -523,6 +575,7 @@ class StreamExecution( // Note that purge is exclusive, i.e. it purges everything before the target ID. if (minBatchesToRetain < currentBatchId) { offsetLog.purge(currentBatchId - minBatchesToRetain) + batchCommitLog.purge(currentBatchId - minBatchesToRetain) } } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index ac510df209..02996ac854 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -30,6 +30,17 @@ trait TriggerExecutor { } /** + * A trigger executor that runs a single batch only, then terminates. + */ +case class OneTimeExecutor() extends TriggerExecutor { + + /** + * Execute a single batch using `batchRunner`. + */ + override def execute(batchRunner: () => Boolean): Unit = batchRunner() +} + +/** * A trigger executor that runs a batch every `intervalMs` milliseconds. */ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala new file mode 100644 index 0000000000..271bc4da99 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.streaming.Trigger + +/** + * A [[Trigger]] that process only one batch of data in a streaming query then terminates + * the query. + */ +@Experimental +@InterfaceStability.Evolving +case object OneTimeTrigger extends Trigger diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index fe52013bad..f2f700590c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -377,7 +377,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { private var outputMode: OutputMode = OutputMode.Append - private var trigger: Trigger = ProcessingTime(0L) + private var trigger: Trigger = Trigger.ProcessingTime(0L) private var extraOptions = new scala.collection.mutable.HashMap[String, String] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index 68f2eab9d4..bdad8e4717 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -28,39 +28,30 @@ import org.apache.spark.unsafe.types.CalendarInterval /** * :: Experimental :: - * Used to indicate how often results should be produced by a [[StreamingQuery]]. - * - * @since 2.0.0 - */ -@Experimental -@InterfaceStability.Evolving -sealed trait Trigger - -/** - * :: Experimental :: * A trigger that runs a query periodically based on the processing time. If `interval` is 0, * the query will run as fast as possible. * * Scala Example: * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) + * df.writeStream.trigger(ProcessingTime("10 seconds")) * * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) + * df.writeStream.trigger(ProcessingTime(10.seconds)) * }}} * * Java Example: * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) + * df.writeStream.trigger(ProcessingTime.create("10 seconds")) * * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} * * @since 2.0.0 */ @Experimental @InterfaceStability.Evolving +@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") case class ProcessingTime(intervalMs: Long) extends Trigger { require(intervalMs >= 0, "the interval of trigger should not be negative") } @@ -73,6 +64,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { */ @Experimental @InterfaceStability.Evolving +@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") object ProcessingTime { /** @@ -80,11 +72,13 @@ object ProcessingTime { * * Example: * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) + * df.writeStream.trigger(ProcessingTime("10 seconds")) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") def apply(interval: String): ProcessingTime = { if (StringUtils.isBlank(interval)) { throw new IllegalArgumentException( @@ -110,11 +104,13 @@ object ProcessingTime { * Example: * {{{ * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) + * df.writeStream.trigger(ProcessingTime(10.seconds)) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") def apply(interval: Duration): ProcessingTime = { new ProcessingTime(interval.toMillis) } @@ -124,11 +120,13 @@ object ProcessingTime { * * Example: * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) + * df.writeStream.trigger(ProcessingTime.create("10 seconds")) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") def create(interval: String): ProcessingTime = { apply(interval) } @@ -139,11 +137,13 @@ object ProcessingTime { * Example: * {{{ * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} * * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0") def create(interval: Long, unit: TimeUnit): ProcessingTime = { new ProcessingTime(unit.toMillis(interval)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java new file mode 100644 index 0000000000..a03a851f24 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; + +/** + * :: Experimental :: + * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. + * + * @since 2.0.0 + */ +@Experimental +@InterfaceStability.Evolving +public class Trigger { + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long intervalMs) { + return ProcessingTime.apply(intervalMs); + } + + /** + * :: Experimental :: + * (Java-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * {{{ + * import java.util.concurrent.TimeUnit + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { + return ProcessingTime.create(interval, timeUnit); + } + + /** + * :: Experimental :: + * (Scala-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `duration` is 0, the query will run as fast as possible. + * + * {{{ + * import scala.concurrent.duration._ + * df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(Duration interval) { + return ProcessingTime.apply(interval); + } + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is effectively 0, the query will run as fast as possible. + * + * {{{ + * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(String interval) { + return ProcessingTime.apply(interval); + } + + /** + * A trigger that process only one batch of data in a streaming query then terminates + * the query. + * + * @since 2.2.0 + */ + public static Trigger Once() { + return OneTimeTrigger$.MODULE$; + } +} |