aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2017-03-23 14:32:05 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-23 14:32:05 -0700
commit746a558de2136f91f8fe77c6e51256017aa50913 (patch)
tree3325526a325b9774ee1c9a0916210647b1007132 /sql/core/src/main/scala/org/apache
parentb0ae6a38a3ef65e4e853781c5127ba38997a8546 (diff)
downloadspark-746a558de2136f91f8fe77c6e51256017aa50913.tar.gz
spark-746a558de2136f91f8fe77c6e51256017aa50913.tar.bz2
spark-746a558de2136f91f8fe77c6e51256017aa50913.zip
[SPARK-19876][SS][WIP] OneTime Trigger Executor
## What changes were proposed in this pull request? An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers. In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature. ## How was this patch tested? A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly. In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests: - The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop). - The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log. - A OneTime trigger execution that results in an exception being thrown. marmbrus tdas zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17219 from tcondie/stream-commit.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala77
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala81
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala)36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java105
7 files changed, 308 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
new file mode 100644
index 0000000000..fb1a4fb9b1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Used to write log files that represent batch commit points in structured streaming.
+ * A commit log file will be written immediately after the successful completion of a
+ * batch, and before processing the next batch. Here is an execution summary:
+ * - trigger batch 1
+ * - obtain batch 1 offsets and write to offset log
+ * - process batch 1
+ * - write batch 1 to completion log
+ * - trigger batch 2
+ * - obtain bactch 2 offsets and write to offset log
+ * - process batch 2
+ * - write batch 2 to completion log
+ * ....
+ *
+ * The current format of the batch completion log is:
+ * line 1: version
+ * line 2: metadata (optional json string)
+ */
+class BatchCommitLog(sparkSession: SparkSession, path: String)
+ extends HDFSMetadataLog[String](sparkSession, path) {
+
+ override protected def deserialize(in: InputStream): String = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+ if (!lines.hasNext) {
+ throw new IllegalStateException("Incomplete log file in the offset commit log")
+ }
+ parseVersion(lines.next().trim, BatchCommitLog.VERSION)
+ // read metadata
+ lines.next().trim match {
+ case BatchCommitLog.SERIALIZED_VOID => null
+ case metadata => metadata
+ }
+ }
+
+ override protected def serialize(metadata: String, out: OutputStream): Unit = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8))
+ out.write('\n')
+
+ // write metadata or void
+ out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata)
+ .getBytes(UTF_8))
+ }
+}
+
+object BatchCommitLog {
+ private val VERSION = 1
+ private val SERIALIZED_VOID = "{}"
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 60d5283e6b..34e9262af7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -165,6 +165,8 @@ class StreamExecution(
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+ case OneTimeTrigger => OneTimeExecutor()
+ case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
/** Defines the internal state of execution */
@@ -209,6 +211,13 @@ class StreamExecution(
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
+ /**
+ * A log that records the batch ids that have completed. This is used to check if a batch was
+ * fully processed, and its output was committed to the sink, hence no need to process it again.
+ * This is used (for instance) during restart, to help identify which batch to run next.
+ */
+ val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits"))
+
/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING
@@ -291,10 +300,13 @@ class StreamExecution(
runBatch(sparkSessionToRunBatches)
}
}
-
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
+ // Update committed offsets.
+ committedOffsets ++= availableOffsets
+ batchCommitLog.add(currentBatchId, null)
+ logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
@@ -306,9 +318,6 @@ class StreamExecution(
} else {
false
}
-
- // Update committed offsets.
- committedOffsets ++= availableOffsets
updateStatusMessage("Waiting for next trigger")
continueToRun
})
@@ -392,13 +401,33 @@ class StreamExecution(
* - currentBatchId
* - committedOffsets
* - availableOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the offset log) the offsets used to run the last batch
+ * IF last batch exists THEN
+ * Set the next batch to be executed as the last recovered batch
+ * Check the commit log to see which batch was committed last
+ * IF the last batch was committed THEN
+ * Call getBatch using the last batch start and end offsets
+ * // ^^^^ above line is needed since some sources assume last batch always re-executes
+ * Setup for a new batch i.e., start = last batch end, and identify new end
+ * DONE
+ * ELSE
+ * Identify a brand new batch
+ * DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
- case Some((batchId, nextOffsets)) =>
- logInfo(s"Resuming streaming query, starting with batch $batchId")
- currentBatchId = batchId
+ case Some((latestBatchId, nextOffsets)) =>
+ /* First assume that we are re-executing the latest known batch
+ * in the offset log */
+ currentBatchId = latestBatchId
availableOffsets = nextOffsets.toStreamProgress(sources)
+ /* Initialize committed offsets to a committed batch, which at this
+ * is the second latest batch id in the offset log. */
+ offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId =>
+ committedOffsets = secondLatestBatchId.toStreamProgress(sources)
+ }
// update offset metadata
nextOffsets.metadata.foreach { metadata =>
@@ -419,14 +448,37 @@ class StreamExecution(
SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString)
}
- logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
- s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
-
- offsetLog.get(batchId - 1).foreach {
- case lastOffsets =>
- committedOffsets = lastOffsets.toStreamProgress(sources)
- logDebug(s"Resuming with committed offsets: $committedOffsets")
+ /* identify the current batch id: if commit log indicates we successfully processed the
+ * latest batch id in the offset log, then we can safely move to the next batch
+ * i.e., committedBatchId + 1 */
+ batchCommitLog.getLatest() match {
+ case Some((latestCommittedBatchId, _)) =>
+ if (latestBatchId == latestCommittedBatchId) {
+ /* The last batch was successfully committed, so we can safely process a
+ * new next batch but first:
+ * Make a call to getBatch using the offsets from previous batch.
+ * because certain sources (e.g., KafkaSource) assume on restart the last
+ * batch will be executed before getOffset is called again. */
+ availableOffsets.foreach { ao: (Source, Offset) =>
+ val (source, end) = ao
+ if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
+ val start = committedOffsets.get(source)
+ source.getBatch(start, end)
+ }
+ }
+ currentBatchId = latestCommittedBatchId + 1
+ committedOffsets ++= availableOffsets
+ // Construct a new batch be recomputing availableOffsets
+ constructNextBatch()
+ } else if (latestCommittedBatchId < latestBatchId - 1) {
+ logWarning(s"Batch completion log latest batch id is " +
+ s"${latestCommittedBatchId}, which is not trailing " +
+ s"batchid $latestBatchId by one")
+ }
+ case None => logInfo("no commit log present")
}
+ logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
+ s"$committedOffsets and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
@@ -523,6 +575,7 @@ class StreamExecution(
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minBatchesToRetain < currentBatchId) {
offsetLog.purge(currentBatchId - minBatchesToRetain)
+ batchCommitLog.purge(currentBatchId - minBatchesToRetain)
}
}
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index ac510df209..02996ac854 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -30,6 +30,17 @@ trait TriggerExecutor {
}
/**
+ * A trigger executor that runs a single batch only, then terminates.
+ */
+case class OneTimeExecutor() extends TriggerExecutor {
+
+ /**
+ * Execute a single batch using `batchRunner`.
+ */
+ override def execute(batchRunner: () => Boolean): Unit = batchRunner()
+}
+
+/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
new file mode 100644
index 0000000000..271bc4da99
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.streaming.Trigger
+
+/**
+ * A [[Trigger]] that process only one batch of data in a streaming query then terminates
+ * the query.
+ */
+@Experimental
+@InterfaceStability.Evolving
+case object OneTimeTrigger extends Trigger
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index fe52013bad..f2f700590c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -377,7 +377,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private var outputMode: OutputMode = OutputMode.Append
- private var trigger: Trigger = ProcessingTime(0L)
+ private var trigger: Trigger = Trigger.ProcessingTime(0L)
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
index 68f2eab9d4..bdad8e4717 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -28,39 +28,30 @@ import org.apache.spark.unsafe.types.CalendarInterval
/**
* :: Experimental ::
- * Used to indicate how often results should be produced by a [[StreamingQuery]].
- *
- * @since 2.0.0
- */
-@Experimental
-@InterfaceStability.Evolving
-sealed trait Trigger
-
-/**
- * :: Experimental ::
* A trigger that runs a query periodically based on the processing time. If `interval` is 0,
* the query will run as fast as possible.
*
* Scala Example:
* {{{
- * df.write.trigger(ProcessingTime("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime("10 seconds"))
*
* import scala.concurrent.duration._
- * df.write.trigger(ProcessingTime(10.seconds))
+ * df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
*
* Java Example:
* {{{
- * df.write.trigger(ProcessingTime.create("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime.create("10 seconds"))
*
* import java.util.concurrent.TimeUnit
- * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
+@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
case class ProcessingTime(intervalMs: Long) extends Trigger {
require(intervalMs >= 0, "the interval of trigger should not be negative")
}
@@ -73,6 +64,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
*/
@Experimental
@InterfaceStability.Evolving
+@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
object ProcessingTime {
/**
@@ -80,11 +72,13 @@ object ProcessingTime {
*
* Example:
* {{{
- * df.write.trigger(ProcessingTime("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime("10 seconds"))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def apply(interval: String): ProcessingTime = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
@@ -110,11 +104,13 @@ object ProcessingTime {
* Example:
* {{{
* import scala.concurrent.duration._
- * df.write.trigger(ProcessingTime(10.seconds))
+ * df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def apply(interval: Duration): ProcessingTime = {
new ProcessingTime(interval.toMillis)
}
@@ -124,11 +120,13 @@ object ProcessingTime {
*
* Example:
* {{{
- * df.write.trigger(ProcessingTime.create("10 seconds"))
+ * df.writeStream.trigger(ProcessingTime.create("10 seconds"))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def create(interval: String): ProcessingTime = {
apply(interval)
}
@@ -139,11 +137,13 @@ object ProcessingTime {
* Example:
* {{{
* import java.util.concurrent.TimeUnit
- * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.0.0
+ * @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
+ @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0")
def create(interval: Long, unit: TimeUnit): ProcessingTime = {
new ProcessingTime(unit.toMillis(interval))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
new file mode 100644
index 0000000000..a03a851f24
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+
+/**
+ * :: Experimental ::
+ * Policy used to indicate how often results should be produced by a [[StreamingQuery]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class Trigger {
+
+ /**
+ * :: Experimental ::
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `interval` is 0, the query will run as fast as possible.
+ *
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(long intervalMs) {
+ return ProcessingTime.apply(intervalMs);
+ }
+
+ /**
+ * :: Experimental ::
+ * (Java-friendly)
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `interval` is 0, the query will run as fast as possible.
+ *
+ * {{{
+ * import java.util.concurrent.TimeUnit
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+ return ProcessingTime.create(interval, timeUnit);
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-friendly)
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `duration` is 0, the query will run as fast as possible.
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ * df.writeStream.trigger(ProcessingTime(10.seconds))
+ * }}}
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(Duration interval) {
+ return ProcessingTime.apply(interval);
+ }
+
+ /**
+ * :: Experimental ::
+ * A trigger policy that runs a query periodically based on an interval in processing time.
+ * If `interval` is effectively 0, the query will run as fast as possible.
+ *
+ * {{{
+ * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+ * }}}
+ * @since 2.2.0
+ */
+ public static Trigger ProcessingTime(String interval) {
+ return ProcessingTime.apply(interval);
+ }
+
+ /**
+ * A trigger that process only one batch of data in a streaming query then terminates
+ * the query.
+ *
+ * @since 2.2.0
+ */
+ public static Trigger Once() {
+ return OneTimeTrigger$.MODULE$;
+ }
+}