aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala132
1 files changed, 132 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
new file mode 100644
index 0000000000..8103309aff
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.{LinkedHashMap => JLinkedHashMap}
+import java.util.Map.Entry
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+ metadataLogVersion: String,
+ sparkSession: SparkSession,
+ path: String)
+ extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {
+
+ import CompactibleFileStreamLog._
+
+ // Configurations about metadata compaction
+ protected override val compactInterval =
+ sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+ require(compactInterval > 0,
+ s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
+ s"positive value.")
+
+ protected override val fileCleanupDelayMs =
+ sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+ protected override val isDeletingExpiredLog =
+ sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+ private implicit val formats = Serialization.formats(NoTypeHints)
+
+ // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is
+ // used to avoid scanning the compacted log file to retrieve it's own batch data.
+ private val cacheSize = compactInterval
+ private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] {
+ override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): Boolean = {
+ size() > cacheSize
+ }
+ }
+
+ protected override def serializeData(data: FileEntry): String = {
+ Serialization.write(data)
+ }
+
+ protected override def deserializeData(encodedString: String): FileEntry = {
+ Serialization.read[FileEntry](encodedString)
+ }
+
+ def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+ logs
+ }
+
+ override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+ if (super.add(batchId, logs)) {
+ if (isCompactionBatch(batchId, compactInterval)) {
+ fileEntryCache.put(batchId, logs)
+ }
+ true
+ } else {
+ false
+ }
+ }
+
+ override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = {
+ val startBatchId = startId.getOrElse(0L)
+ val endBatchId = getLatest().map(_._1).getOrElse(0L)
+
+ val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id =>
+ if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) {
+ (id, Some(fileEntryCache.get(id)))
+ } else {
+ val logs = super.get(id).map(_.filter(_.batchId == id))
+ (id, logs)
+ }
+ }.partition(_._2.isDefined)
+
+ // The below code may only be happened when original metadata log file has been removed, so we
+ // have to get the batch from latest compacted log file. This is quite time-consuming and may
+ // not be happened in the current FileStreamSource code path, since we only fetch the
+ // latest metadata log file.
+ val searchKeys = removedBatches.map(_._1)
+ val retrievedBatches = if (searchKeys.nonEmpty) {
+ logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!")
+ val latestBatchId = getLatest().map(_._1).getOrElse(-1L)
+ if (latestBatchId < 0) {
+ Map.empty[Long, Option[Array[FileEntry]]]
+ } else {
+ val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0)
+ val allLogs = new mutable.HashMap[Long, mutable.ArrayBuffer[FileEntry]]
+
+ super.get(latestCompactedBatchId).foreach { entries =>
+ entries.foreach { e =>
+ allLogs.put(e.batchId, allLogs.getOrElse(e.batchId, mutable.ArrayBuffer()) += e)
+ }
+ }
+
+ searchKeys.map(id => id -> allLogs.get(id).map(_.toArray)).filter(_._2.isDefined).toMap
+ }
+ } else {
+ Map.empty[Long, Option[Array[FileEntry]]]
+ }
+
+ (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1)
+ }
+}
+
+object FileStreamSourceLog {
+ val VERSION = "v1"
+}