From 51baca2219fda8692b88fc8552548544aec73a1e Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Fri, 18 Nov 2016 11:11:24 -0800 Subject: [SPARK-18187][SQL] CompactibleFileStreamLog should not use "compactInterval" direcly with user setting. ## What changes were proposed in this pull request? CompactibleFileStreamLog relys on "compactInterval" to detect a compaction batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will return wrong answer, resulting data loss. This PR procides a way to check the validity of 'compactInterval', and calculate an appropriate value. ## How was this patch tested? When restart a stream, we change the 'spark.sql.streaming.fileSource.log.compactInterval' different with the former one. The primary solution to this issue was given by uncleGen Added extensions include an additional metadata field in OffsetSeq and CompactibleFileStreamLog APIs. zsxwing Author: Tyson Condie Author: genmao.ygm Closes #15852 from tcondie/spark-18187. --- .../streaming/CompactibleFileStreamLog.scala | 61 +++++++++++++++++++++- .../execution/streaming/FileStreamSinkLog.scala | 8 +-- .../execution/streaming/FileStreamSourceLog.scala | 9 ++-- .../sql/execution/streaming/HDFSMetadataLog.scala | 2 +- .../spark/sql/execution/streaming/OffsetSeq.scala | 12 +++-- .../sql/execution/streaming/OffsetSeqLog.scala | 31 +++++++---- .../streaming/CompactibleFileStreamLogSuite.scala | 33 ++++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 41 +++++++++------ .../apache/spark/sql/streaming/StreamTest.scala | 20 ++++++- 9 files changed, 178 insertions(+), 39 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 8af3db1968..8529ceac30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -63,7 +63,46 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( protected def isDeletingExpiredLog: Boolean - protected def compactInterval: Int + protected def defaultCompactInterval: Int + + protected final lazy val compactInterval: Int = { + // SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval. + // If there are existing log entries, then we should ensure a compatible compactInterval + // is used, irrespective of the defaultCompactInterval. There are three cases: + // + // 1. If there is no '.compact' file, we can use the default setting directly. + // 2. If there are two or more '.compact' files, we use the interval of patch id suffix with + // '.compact' as compactInterval. This case could arise if isDeletingExpiredLog == false. + // 3. If there is only one '.compact' file, then we must find a compact interval + // that is compatible with (i.e., a divisor of) the previous compact file, and that + // faithfully tries to represent the revised default compact interval i.e., is at least + // is large if possible. + // e.g., if defaultCompactInterval is 5 (and previous compact interval could have + // been any 2,3,4,6,12), then a log could be: 11.compact, 12, 13, in which case + // will ensure that the new compactInterval = 6 > 5 and (11 + 1) % 6 == 0 + val compactibleBatchIds = fileManager.list(metadataPath, batchFilesFilter) + .filter(f => f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX)) + .map(f => pathToBatchId(f.getPath)) + .sorted + .reverse + + // Case 1 + var interval = defaultCompactInterval + if (compactibleBatchIds.length >= 2) { + // Case 2 + val latestCompactBatchId = compactibleBatchIds(0) + val previousCompactBatchId = compactibleBatchIds(1) + interval = (latestCompactBatchId - previousCompactBatchId).toInt + } else if (compactibleBatchIds.length == 1) { + // Case 3 + interval = CompactibleFileStreamLog.deriveCompactInterval( + defaultCompactInterval, compactibleBatchIds(0).toInt) + } + assert(interval > 0, s"intervalValue = $interval not positive value.") + logInfo(s"Set the compact interval to $interval " + + s"[defaultCompactInterval: $defaultCompactInterval]") + interval + } /** * Filter out the obsolete logs. @@ -245,4 +284,24 @@ object CompactibleFileStreamLog { def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = { (batchId + compactInterval + 1) / compactInterval * compactInterval - 1 } + + /** + * Derives a compact interval from the latest compact batch id and + * a default compact interval. + */ + def deriveCompactInterval(defaultInterval: Int, latestCompactBatchId: Int) : Int = { + if (latestCompactBatchId + 1 <= defaultInterval) { + latestCompactBatchId + 1 + } else if (defaultInterval < (latestCompactBatchId + 1) / 2) { + // Find the first divisor >= default compact interval + def properDivisors(min: Int, n: Int) = + (min to n/2).view.filter(i => n % i == 0) :+ n + + properDivisors(defaultInterval, latestCompactBatchId + 1).head + } else { + // default compact interval > than any divisor other than latest compact id + latestCompactBatchId + 1 + } + } } + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index b4f14151f1..eb6eed87ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -88,9 +88,11 @@ class FileStreamSinkLog( protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion - protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval - require(compactInterval > 0, - s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + + protected override val defaultCompactInterval = + sparkSession.sessionState.conf.fileSinkLogCompactInterval + + require(defaultCompactInterval > 0, + s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " + "to a positive value.") override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index fe81b15607..327b3ac267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -38,11 +38,12 @@ class FileStreamSourceLog( import CompactibleFileStreamLog._ // Configurations about metadata compaction - protected override val compactInterval = + protected override val defaultCompactInterval: Int = sparkSession.sessionState.conf.fileSourceLogCompactInterval - require(compactInterval > 0, - s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + - s"positive value.") + + require(defaultCompactInterval > 0, + s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} " + + s"(was $defaultCompactInterval) to a positive value.") protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSourceLogCleanupDelay diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index db7057d7da..080729b2ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -70,7 +70,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: /** * A `PathFilter` to filter only batch files */ - private val batchFilesFilter = new PathFilter { + protected val batchFilesFilter = new PathFilter { override def accept(path: Path): Boolean = isBatchFile(path) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index a4e1fe6797..7469caeee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -23,7 +23,7 @@ package org.apache.spark.sql.execution.streaming * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance * vector clock that must progress linearly forward. */ -case class OffsetSeq(offsets: Seq[Option[Offset]]) { +case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) { /** * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of @@ -47,7 +47,13 @@ object OffsetSeq { * Returns a [[OffsetSeq]] with a variable sequence of offsets. * `nulls` in the sequence are converted to `None`s. */ - def fill(offsets: Offset*): OffsetSeq = { - OffsetSeq(offsets.map(Option(_))) + def fill(offsets: Offset*): OffsetSeq = OffsetSeq.fill(None, offsets: _*) + + /** + * Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets. + * `nulls` in the sequence are converted to `None`s. + */ + def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = { + OffsetSeq(offsets.map(Option(_)), metadata) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index d1c9d95be9..cc25b4474b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -33,12 +33,13 @@ import org.apache.spark.sql.SparkSession * by a newline character. If a source offset is missing, then * that line will contain a string value defined in the * SERIALIZED_VOID_OFFSET variable in [[OffsetSeqLog]] companion object. - * For instance, when dealine wiht [[LongOffset]] types: - * v1 // version 1 - * {0} // LongOffset 0 - * {3} // LongOffset 3 - * - // No offset for this source i.e., an invalid JSON string - * {2} // LongOffset 2 + * For instance, when dealing with [[LongOffset]] types: + * v1 // version 1 + * metadata + * {0} // LongOffset 0 + * {3} // LongOffset 3 + * - // No offset for this source i.e., an invalid JSON string + * {2} // LongOffset 2 * ... */ class OffsetSeqLog(sparkSession: SparkSession, path: String) @@ -58,13 +59,25 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) if (version != OffsetSeqLog.VERSION) { throw new IllegalStateException(s"Unknown log version: ${version}") } - OffsetSeq.fill(lines.map(parseOffset).toArray: _*) + + // read metadata + val metadata = lines.next().trim match { + case "" => None + case md => Some(md) + } + OffsetSeq.fill(metadata, lines.map(parseOffset).toArray: _*) } - override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = { + override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller out.write(OffsetSeqLog.VERSION.getBytes(UTF_8)) - metadata.offsets.map(_.map(_.json)).foreach { offset => + + // write metadata + out.write('\n') + out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8)) + + // write offsets, one per line + offsetSeq.offsets.map(_.map(_.json)).foreach { offset => out.write('\n') offset match { case Some(json: String) => out.write(json.getBytes(UTF_8)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala new file mode 100644 index 0000000000..2cd2157b29 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.SparkFunSuite + +class CompactibleFileStreamLogSuite extends SparkFunSuite { + + import CompactibleFileStreamLog._ + + test("deriveCompactInterval") { + // latestCompactBatchId(4) + 1 <= default(5) + // then use latestestCompactBatchId + 1 === 5 + assert(5 === deriveCompactInterval(5, 4)) + // First divisor of 10 greater than 4 === 5 + assert(5 === deriveCompactInterval(4, 9)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b365af76c3..a099153d2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming import java.io.File +import scala.collection.mutable + import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ @@ -896,32 +898,38 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } - test("compacat metadata log") { + test("compact interval metadata log") { val _sources = PrivateMethod[Seq[Source]]('sources) val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog) - def verify(execution: StreamExecution) - (batchId: Long, expectedBatches: Int): Boolean = { + def verify( + execution: StreamExecution, + batchId: Long, + expectedBatches: Int, + expectedCompactInterval: Int): Boolean = { import CompactibleFileStreamLog._ val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] val metadataLog = fileSource invokePrivate _metadataLog() - if (isCompactionBatch(batchId, 2)) { + if (isCompactionBatch(batchId, expectedCompactInterval)) { val path = metadataLog.batchIdToPath(batchId) // Assert path name should be ended with compact suffix. - assert(path.getName.endsWith(COMPACT_FILE_SUFFIX)) + assert(path.getName.endsWith(COMPACT_FILE_SUFFIX), + "path does not end with compact file suffix") // Compacted batch should include all entries from start. val entries = metadataLog.get(batchId) - assert(entries.isDefined) - assert(entries.get.length === metadataLog.allFiles().length) - assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length) + assert(entries.isDefined, "Entries not defined") + assert(entries.get.length === metadataLog.allFiles().length, "clean up check") + assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === + entries.get.length, "Length check") } assert(metadataLog.allFiles().sortBy(_.batchId) === - metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId)) + metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId), + "Batch id mismatch") metadataLog.get(None, Some(batchId)).flatMap(_._2).length === expectedBatches } @@ -932,26 +940,27 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ) { val fileStream = createFileStream("text", src.getCanonicalPath) val filtered = fileStream.filter($"value" contains "keep") + val updateConf = Map(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") testStream(filtered)( AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), CheckAnswer("keep2", "keep3"), - AssertOnQuery(verify(_)(0L, 1)), + AssertOnQuery(verify(_, 0L, 1, 2)), AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6"), - AssertOnQuery(verify(_)(1L, 2)), + AssertOnQuery(verify(_, 1L, 2, 2)), AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"), - AssertOnQuery(verify(_)(2L, 3)), + AssertOnQuery(verify(_, 2L, 3, 2)), StopStream, - StartStream(), - AssertOnQuery(verify(_)(2L, 3)), + StartStream(additionalConfs = updateConf), + AssertOnQuery(verify(_, 2L, 3, 2)), AddTextFileData("drop10\nkeep11", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"), - AssertOnQuery(verify(_)(3L, 4)), + AssertOnQuery(verify(_, 3L, 4, 2)), AddTextFileData("drop12\nkeep13", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"), - AssertOnQuery(verify(_)(4L, 5)) + AssertOnQuery(verify(_, 4L, 5, 2)) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 7428330651..a6b2d4b9ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -161,7 +161,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** Starts the stream, resuming if data has already been processed. It must not be running. */ case class StartStream( trigger: Trigger = ProcessingTime(0), - triggerClock: Clock = new SystemClock) + triggerClock: Clock = new SystemClock, + additionalConfs: Map[String, String] = Map.empty) extends StreamAction /** Advance the trigger clock's time manually. */ @@ -240,6 +241,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { var lastStream: StreamExecution = null val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for val sink = new MemorySink(stream.schema, outputMode) + val resetConfValues = mutable.Map[String, Option[String]]() @volatile var streamDeathCause: Throwable = null @@ -330,7 +332,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { startedTest.foreach { action => logInfo(s"Processing test stream action: $action") action match { - case StartStream(trigger, triggerClock) => + case StartStream(trigger, triggerClock, additionalConfs) => verify(currentStream == null, "stream already running") verify(triggerClock.isInstanceOf[SystemClock] || triggerClock.isInstanceOf[StreamManualClock], @@ -338,6 +340,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { if (triggerClock.isInstanceOf[StreamManualClock]) { manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis() } + + additionalConfs.foreach(pair => { + val value = + if (spark.conf.contains(pair._1)) Some(spark.conf.get(pair._1)) else None + resetConfValues(pair._1) = value + spark.conf.set(pair._1, pair._2) + }) + lastStream = currentStream currentStream = spark @@ -519,6 +529,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { currentStream.stop() } spark.streams.removeListener(statusCollector) + + // Rollback prev configuration values + resetConfValues.foreach { + case (key, Some(value)) => spark.conf.set(key, value) + case (key, None) => spark.conf.unset(key) + } } } -- cgit v1.2.3