aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2016-11-18 11:11:24 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-18 11:11:24 -0800
commit51baca2219fda8692b88fc8552548544aec73a1e (patch)
treed4ce1c82e0e1b589c01f9fafc4639d946d8229b8
parentd9dd979d170f44383a9a87f892f2486ddb3cca7d (diff)
downloadspark-51baca2219fda8692b88fc8552548544aec73a1e.tar.gz
spark-51baca2219fda8692b88fc8552548544aec73a1e.tar.bz2
spark-51baca2219fda8692b88fc8552548544aec73a1e.zip
[SPARK-18187][SQL] CompactibleFileStreamLog should not use "compactInterval" direcly with user setting.
## What changes were proposed in this pull request? CompactibleFileStreamLog relys on "compactInterval" to detect a compaction batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will return wrong answer, resulting data loss. This PR procides a way to check the validity of 'compactInterval', and calculate an appropriate value. ## How was this patch tested? When restart a stream, we change the 'spark.sql.streaming.fileSource.log.compactInterval' different with the former one. The primary solution to this issue was given by uncleGen Added extensions include an additional metadata field in OffsetSeq and CompactibleFileStreamLog APIs. zsxwing Author: Tyson Condie <tcondie@gmail.com> Author: genmao.ygm <genmao.ygm@genmaoygmdeMacBook-Air.local> Closes #15852 from tcondie/spark-18187.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala33
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala41
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala20
9 files changed, 178 insertions, 39 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 8af3db1968..8529ceac30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -63,7 +63,46 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
protected def isDeletingExpiredLog: Boolean
- protected def compactInterval: Int
+ protected def defaultCompactInterval: Int
+
+ protected final lazy val compactInterval: Int = {
+ // SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval.
+ // If there are existing log entries, then we should ensure a compatible compactInterval
+ // is used, irrespective of the defaultCompactInterval. There are three cases:
+ //
+ // 1. If there is no '.compact' file, we can use the default setting directly.
+ // 2. If there are two or more '.compact' files, we use the interval of patch id suffix with
+ // '.compact' as compactInterval. This case could arise if isDeletingExpiredLog == false.
+ // 3. If there is only one '.compact' file, then we must find a compact interval
+ // that is compatible with (i.e., a divisor of) the previous compact file, and that
+ // faithfully tries to represent the revised default compact interval i.e., is at least
+ // is large if possible.
+ // e.g., if defaultCompactInterval is 5 (and previous compact interval could have
+ // been any 2,3,4,6,12), then a log could be: 11.compact, 12, 13, in which case
+ // will ensure that the new compactInterval = 6 > 5 and (11 + 1) % 6 == 0
+ val compactibleBatchIds = fileManager.list(metadataPath, batchFilesFilter)
+ .filter(f => f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX))
+ .map(f => pathToBatchId(f.getPath))
+ .sorted
+ .reverse
+
+ // Case 1
+ var interval = defaultCompactInterval
+ if (compactibleBatchIds.length >= 2) {
+ // Case 2
+ val latestCompactBatchId = compactibleBatchIds(0)
+ val previousCompactBatchId = compactibleBatchIds(1)
+ interval = (latestCompactBatchId - previousCompactBatchId).toInt
+ } else if (compactibleBatchIds.length == 1) {
+ // Case 3
+ interval = CompactibleFileStreamLog.deriveCompactInterval(
+ defaultCompactInterval, compactibleBatchIds(0).toInt)
+ }
+ assert(interval > 0, s"intervalValue = $interval not positive value.")
+ logInfo(s"Set the compact interval to $interval " +
+ s"[defaultCompactInterval: $defaultCompactInterval]")
+ interval
+ }
/**
* Filter out the obsolete logs.
@@ -245,4 +284,24 @@ object CompactibleFileStreamLog {
def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
(batchId + compactInterval + 1) / compactInterval * compactInterval - 1
}
+
+ /**
+ * Derives a compact interval from the latest compact batch id and
+ * a default compact interval.
+ */
+ def deriveCompactInterval(defaultInterval: Int, latestCompactBatchId: Int) : Int = {
+ if (latestCompactBatchId + 1 <= defaultInterval) {
+ latestCompactBatchId + 1
+ } else if (defaultInterval < (latestCompactBatchId + 1) / 2) {
+ // Find the first divisor >= default compact interval
+ def properDivisors(min: Int, n: Int) =
+ (min to n/2).view.filter(i => n % i == 0) :+ n
+
+ properDivisors(defaultInterval, latestCompactBatchId + 1).head
+ } else {
+ // default compact interval > than any divisor other than latest compact id
+ latestCompactBatchId + 1
+ }
+ }
}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index b4f14151f1..eb6eed87ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -88,9 +88,11 @@ class FileStreamSinkLog(
protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
- protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval
- require(compactInterval > 0,
- s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
+ protected override val defaultCompactInterval =
+ sparkSession.sessionState.conf.fileSinkLogCompactInterval
+
+ require(defaultCompactInterval > 0,
+ s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")
override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index fe81b15607..327b3ac267 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -38,11 +38,12 @@ class FileStreamSourceLog(
import CompactibleFileStreamLog._
// Configurations about metadata compaction
- protected override val compactInterval =
+ protected override val defaultCompactInterval: Int =
sparkSession.sessionState.conf.fileSourceLogCompactInterval
- require(compactInterval > 0,
- s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
- s"positive value.")
+
+ require(defaultCompactInterval > 0,
+ s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} " +
+ s"(was $defaultCompactInterval) to a positive value.")
protected override val fileCleanupDelayMs =
sparkSession.sessionState.conf.fileSourceLogCleanupDelay
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index db7057d7da..080729b2ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -70,7 +70,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
/**
* A `PathFilter` to filter only batch files
*/
- private val batchFilesFilter = new PathFilter {
+ protected val batchFilesFilter = new PathFilter {
override def accept(path: Path): Boolean = isBatchFile(path)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index a4e1fe6797..7469caeee3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -23,7 +23,7 @@ package org.apache.spark.sql.execution.streaming
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*/
-case class OffsetSeq(offsets: Seq[Option[Offset]]) {
+case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) {
/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
@@ -47,7 +47,13 @@ object OffsetSeq {
* Returns a [[OffsetSeq]] with a variable sequence of offsets.
* `nulls` in the sequence are converted to `None`s.
*/
- def fill(offsets: Offset*): OffsetSeq = {
- OffsetSeq(offsets.map(Option(_)))
+ def fill(offsets: Offset*): OffsetSeq = OffsetSeq.fill(None, offsets: _*)
+
+ /**
+ * Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets.
+ * `nulls` in the sequence are converted to `None`s.
+ */
+ def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
+ OffsetSeq(offsets.map(Option(_)), metadata)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index d1c9d95be9..cc25b4474b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -33,12 +33,13 @@ import org.apache.spark.sql.SparkSession
* by a newline character. If a source offset is missing, then
* that line will contain a string value defined in the
* SERIALIZED_VOID_OFFSET variable in [[OffsetSeqLog]] companion object.
- * For instance, when dealine wiht [[LongOffset]] types:
- * v1 // version 1
- * {0} // LongOffset 0
- * {3} // LongOffset 3
- * - // No offset for this source i.e., an invalid JSON string
- * {2} // LongOffset 2
+ * For instance, when dealing with [[LongOffset]] types:
+ * v1 // version 1
+ * metadata
+ * {0} // LongOffset 0
+ * {3} // LongOffset 3
+ * - // No offset for this source i.e., an invalid JSON string
+ * {2} // LongOffset 2
* ...
*/
class OffsetSeqLog(sparkSession: SparkSession, path: String)
@@ -58,13 +59,25 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
if (version != OffsetSeqLog.VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
- OffsetSeq.fill(lines.map(parseOffset).toArray: _*)
+
+ // read metadata
+ val metadata = lines.next().trim match {
+ case "" => None
+ case md => Some(md)
+ }
+ OffsetSeq.fill(metadata, lines.map(parseOffset).toArray: _*)
}
- override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = {
+ override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
- metadata.offsets.map(_.map(_.json)).foreach { offset =>
+
+ // write metadata
+ out.write('\n')
+ out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8))
+
+ // write offsets, one per line
+ offsetSeq.offsets.map(_.map(_.json)).foreach { offset =>
out.write('\n')
offset match {
case Some(json: String) => out.write(json.getBytes(UTF_8))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
new file mode 100644
index 0000000000..2cd2157b29
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.SparkFunSuite
+
+class CompactibleFileStreamLogSuite extends SparkFunSuite {
+
+ import CompactibleFileStreamLog._
+
+ test("deriveCompactInterval") {
+ // latestCompactBatchId(4) + 1 <= default(5)
+ // then use latestestCompactBatchId + 1 === 5
+ assert(5 === deriveCompactInterval(5, 4))
+ // First divisor of 10 greater than 4 === 5
+ assert(5 === deriveCompactInterval(4, 9))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b365af76c3..a099153d2e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming
import java.io.File
+import scala.collection.mutable
+
import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
@@ -896,32 +898,38 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
- test("compacat metadata log") {
+ test("compact interval metadata log") {
val _sources = PrivateMethod[Seq[Source]]('sources)
val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)
- def verify(execution: StreamExecution)
- (batchId: Long, expectedBatches: Int): Boolean = {
+ def verify(
+ execution: StreamExecution,
+ batchId: Long,
+ expectedBatches: Int,
+ expectedCompactInterval: Int): Boolean = {
import CompactibleFileStreamLog._
val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
val metadataLog = fileSource invokePrivate _metadataLog()
- if (isCompactionBatch(batchId, 2)) {
+ if (isCompactionBatch(batchId, expectedCompactInterval)) {
val path = metadataLog.batchIdToPath(batchId)
// Assert path name should be ended with compact suffix.
- assert(path.getName.endsWith(COMPACT_FILE_SUFFIX))
+ assert(path.getName.endsWith(COMPACT_FILE_SUFFIX),
+ "path does not end with compact file suffix")
// Compacted batch should include all entries from start.
val entries = metadataLog.get(batchId)
- assert(entries.isDefined)
- assert(entries.get.length === metadataLog.allFiles().length)
- assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length)
+ assert(entries.isDefined, "Entries not defined")
+ assert(entries.get.length === metadataLog.allFiles().length, "clean up check")
+ assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length ===
+ entries.get.length, "Length check")
}
assert(metadataLog.allFiles().sortBy(_.batchId) ===
- metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId))
+ metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId),
+ "Batch id mismatch")
metadataLog.get(None, Some(batchId)).flatMap(_._2).length === expectedBatches
}
@@ -932,26 +940,27 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
) {
val fileStream = createFileStream("text", src.getCanonicalPath)
val filtered = fileStream.filter($"value" contains "keep")
+ val updateConf = Map(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5")
testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3"),
- AssertOnQuery(verify(_)(0L, 1)),
+ AssertOnQuery(verify(_, 0L, 1, 2)),
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
- AssertOnQuery(verify(_)(1L, 2)),
+ AssertOnQuery(verify(_, 1L, 2, 2)),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
- AssertOnQuery(verify(_)(2L, 3)),
+ AssertOnQuery(verify(_, 2L, 3, 2)),
StopStream,
- StartStream(),
- AssertOnQuery(verify(_)(2L, 3)),
+ StartStream(additionalConfs = updateConf),
+ AssertOnQuery(verify(_, 2L, 3, 2)),
AddTextFileData("drop10\nkeep11", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"),
- AssertOnQuery(verify(_)(3L, 4)),
+ AssertOnQuery(verify(_, 3L, 4, 2)),
AddTextFileData("drop12\nkeep13", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"),
- AssertOnQuery(verify(_)(4L, 5))
+ AssertOnQuery(verify(_, 4L, 5, 2))
)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7428330651..a6b2d4b9ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -161,7 +161,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** Starts the stream, resuming if data has already been processed. It must not be running. */
case class StartStream(
trigger: Trigger = ProcessingTime(0),
- triggerClock: Clock = new SystemClock)
+ triggerClock: Clock = new SystemClock,
+ additionalConfs: Map[String, String] = Map.empty)
extends StreamAction
/** Advance the trigger clock's time manually. */
@@ -240,6 +241,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
var lastStream: StreamExecution = null
val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
val sink = new MemorySink(stream.schema, outputMode)
+ val resetConfValues = mutable.Map[String, Option[String]]()
@volatile
var streamDeathCause: Throwable = null
@@ -330,7 +332,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
startedTest.foreach { action =>
logInfo(s"Processing test stream action: $action")
action match {
- case StartStream(trigger, triggerClock) =>
+ case StartStream(trigger, triggerClock, additionalConfs) =>
verify(currentStream == null, "stream already running")
verify(triggerClock.isInstanceOf[SystemClock]
|| triggerClock.isInstanceOf[StreamManualClock],
@@ -338,6 +340,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
if (triggerClock.isInstanceOf[StreamManualClock]) {
manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis()
}
+
+ additionalConfs.foreach(pair => {
+ val value =
+ if (spark.conf.contains(pair._1)) Some(spark.conf.get(pair._1)) else None
+ resetConfValues(pair._1) = value
+ spark.conf.set(pair._1, pair._2)
+ })
+
lastStream = currentStream
currentStream =
spark
@@ -519,6 +529,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
currentStream.stop()
}
spark.streams.removeListener(statusCollector)
+
+ // Rollback prev configuration values
+ resetConfValues.foreach {
+ case (key, Some(value)) => spark.conf.set(key, value)
+ case (key, None) => spark.conf.unset(key)
+ }
}
}