aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-08-26 11:30:23 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-08-26 11:30:23 -0700
commit9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88 (patch)
treea4c5a9776e39309391d1a01b264fa718a7d3926e
parent261c55dd8808502fb7f3384eb537d26a4a8123d7 (diff)
downloadspark-9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88.tar.gz
spark-9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88.tar.bz2
spark-9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88.zip
[SPARK-17165][SQL] FileStreamSource should not track the list of seen files indefinitely
## What changes were proposed in this pull request? Before this change, FileStreamSource uses an in-memory hash set to track the list of files processed by the engine. The list can grow indefinitely, leading to OOM or overflow of the hash set. This patch introduces a new user-defined option called "maxFileAge", default to 24 hours. If a file is older than this age, FileStreamSource will purge it from the in-memory map that was used to track the list of files that have been processed. ## How was this patch tested? Added unit tests for the underlying utility, and also added an end-to-end test to validate the purge in FileStreamSourceSuite. Also verified the new test cases would fail when the timeout was set to a very large number. Author: petermaxlee <petermaxlee@gmail.com> Closes #14728 from petermaxlee/SPARK-17165.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala54
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala149
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala40
5 files changed, 285 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
new file mode 100644
index 0000000000..3efc20c1d6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
+import org.apache.spark.util.Utils
+
+/**
+ * User specified options for file streams.
+ */
+class FileStreamOptions(parameters: Map[String, String]) extends Logging {
+
+ val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
+ Try(str.toInt).toOption.filter(_ > 0).getOrElse {
+ throw new IllegalArgumentException(
+ s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
+ }
+ }
+
+ /**
+ * Maximum age of a file that can be found in this directory, before it is deleted.
+ *
+ * The max age is specified with respect to the timestamp of the latest file, and not the
+ * timestamp of the current system. That this means if the last file has timestamp 1000, and the
+ * current system time is 2000, and max age is 200, the system will purge files older than
+ * 800 (rather than 1800) from the internal state.
+ *
+ * Default to a week.
+ */
+ val maxFileAgeMs: Long =
+ Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d"))
+
+ /** Options as specified by the user, in a case-insensitive map, without "path" set. */
+ val optionMapWithoutPath: Map[String, String] =
+ new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0cfad659dc..e8b969b5e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,21 +17,20 @@
package org.apache.spark.sql.execution.streaming
-import scala.util.Try
+import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.collection.OpenHashSet
/**
- * A very simple source that reads text files from the given directory as they appear.
+ * A very simple source that reads files from the given directory as they appear.
*
- * TODO Clean up the metadata files periodically
+ * TODO: Clean up the metadata log files periodically.
*/
class FileStreamSource(
sparkSession: SparkSession,
@@ -41,19 +40,34 @@ class FileStreamSource(
metadataPath: String,
options: Map[String, String]) extends Source with Logging {
- private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
- private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
- private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
+ import FileStreamSource._
+
+ private val sourceOptions = new FileStreamOptions(options)
+
+ private val qualifiedBasePath: Path = {
+ val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
+ fs.makeQualified(new Path(path)) // can contains glob patterns
+ }
+
+ private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)
+
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
/** Maximum number of new files to be considered in each batch */
- private val maxFilesPerBatch = getMaxFilesPerBatch()
+ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
+
+ /** A mapping from a file that we have processed to some timestamp it was last modified. */
+ // Visible for testing and debugging in production.
+ val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
- private val seenFiles = new OpenHashSet[String]
- metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
- files.foreach(seenFiles.add)
+ metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) =>
+ entry.foreach(seenFiles.add)
+ // TODO: move purge call out of the loop once we truncate logs.
+ seenFiles.purge()
}
+ logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")
+
/**
* Returns the maximum offset that can be retrieved from the source.
*
@@ -61,16 +75,27 @@ class FileStreamSource(
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
- val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
+ // All the new files found - ignore aged files and files that we have seen.
+ val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
+
+ // Obey user's setting to limit the number of files in this batch trigger.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
+
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}
- logTrace(s"Number of new files = ${newFiles.size})")
- logTrace(s"Number of files selected for batch = ${batchFiles.size}")
- logTrace(s"Number of seen files = ${seenFiles.size}")
+ val numPurged = seenFiles.purge()
+
+ logTrace(
+ s"""
+ |Number of new files = ${newFiles.size}
+ |Number of files selected for batch = ${batchFiles.size}
+ |Number of seen files = ${seenFiles.size}
+ |Number of files purged from tracking map = $numPurged
+ """.stripMargin)
+
if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
@@ -104,22 +129,26 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
- val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
val newDataSource =
DataSource(
sparkSession,
- paths = files,
+ paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
- options = newOptions)
+ options = sourceOptions.optionMapWithoutPath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}
- private def fetchAllFiles(): Seq[String] = {
+ /**
+ * Returns a list of files found, sorted by their timestamp.
+ */
+ private def fetchAllFiles(): Seq[FileEntry] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
- val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString)
+ val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
+ FileEntry(status.getPath.toUri.toString, status.getModificationTime)
+ }
val endTime = System.nanoTime
val listingTimeMs = (endTime.toDouble - startTime) / 1000000
if (listingTimeMs > 2000) {
@@ -132,20 +161,76 @@ class FileStreamSource(
files
}
- private def getMaxFilesPerBatch(): Option[Int] = {
- new CaseInsensitiveMap(options)
- .get("maxFilesPerTrigger")
- .map { str =>
- Try(str.toInt).toOption.filter(_ > 0).getOrElse {
- throw new IllegalArgumentException(
- s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
- }
- }
- }
-
override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
override def stop() {}
}
+
+
+object FileStreamSource {
+
+ /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
+ type Timestamp = Long
+
+ case class FileEntry(path: String, timestamp: Timestamp) extends Serializable
+
+ /**
+ * A custom hash map used to track the list of files seen. This map is not thread-safe.
+ *
+ * To prevent the hash map from growing indefinitely, a purge function is available to
+ * remove files "maxAgeMs" older than the latest file.
+ */
+ class SeenFilesMap(maxAgeMs: Long) {
+ require(maxAgeMs >= 0)
+
+ /** Mapping from file to its timestamp. */
+ private val map = new java.util.HashMap[String, Timestamp]
+
+ /** Timestamp of the latest file. */
+ private var latestTimestamp: Timestamp = 0L
+
+ /** Timestamp for the last purge operation. */
+ private var lastPurgeTimestamp: Timestamp = 0L
+
+ /** Add a new file to the map. */
+ def add(file: FileEntry): Unit = {
+ map.put(file.path, file.timestamp)
+ if (file.timestamp > latestTimestamp) {
+ latestTimestamp = file.timestamp
+ }
+ }
+
+ /**
+ * Returns true if we should consider this file a new file. The file is only considered "new"
+ * if it is new enough that we are still tracking, and we have not seen it before.
+ */
+ def isNewFile(file: FileEntry): Boolean = {
+ // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
+ // is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
+ file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
+ }
+
+ /** Removes aged entries and returns the number of files removed. */
+ def purge(): Int = {
+ lastPurgeTimestamp = latestTimestamp - maxAgeMs
+ val iter = map.entrySet().iterator()
+ var count = 0
+ while (iter.hasNext) {
+ val entry = iter.next()
+ if (entry.getValue < lastPurgeTimestamp) {
+ count += 1
+ iter.remove()
+ }
+ }
+ count
+ }
+
+ def size: Int = map.size()
+
+ def allEntries: Seq[FileEntry] = {
+ map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 698f07b0a1..2b6f76ca28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -180,7 +180,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
private def isFileAlreadyExistsException(e: IOException): Boolean = {
e.isInstanceOf[FileAlreadyExistsException] ||
// Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
- // HADOOP-9361, we still need to support old Hadoop versions.
+ // HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions.
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
new file mode 100644
index 0000000000..c6db2fd3f9
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.SparkFunSuite
+
+class FileStreamSourceSuite extends SparkFunSuite {
+
+ import FileStreamSource._
+
+ test("SeenFilesMap") {
+ val map = new SeenFilesMap(maxAgeMs = 10)
+
+ map.add(FileEntry("a", 5))
+ assert(map.size == 1)
+ map.purge()
+ assert(map.size == 1)
+
+ // Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
+ map.add(FileEntry("b", 15))
+ assert(map.size == 2)
+ map.purge()
+ assert(map.size == 2)
+
+ // Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
+ map.add(FileEntry("c", 16))
+ assert(map.size == 3)
+ map.purge()
+ assert(map.size == 2)
+
+ // Override existing entry shouldn't change the size
+ map.add(FileEntry("c", 25))
+ assert(map.size == 2)
+
+ // Not a new file because we have seen c before
+ assert(!map.isNewFile(FileEntry("c", 20)))
+
+ // Not a new file because timestamp is too old
+ assert(!map.isNewFile(FileEntry("d", 5)))
+
+ // Finally a new file: never seen and not too old
+ assert(map.isNewFile(FileEntry("e", 20)))
+ }
+
+ test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
+ val map = new SeenFilesMap(maxAgeMs = 10)
+
+ map.add(FileEntry("a", 20))
+ assert(map.size == 1)
+
+ // Timestamp 5 should still considered a new file because purge time should be 0
+ assert(map.isNewFile(FileEntry("b", 9)))
+ assert(map.isNewFile(FileEntry("b", 10)))
+
+ // Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
+ map.purge()
+ assert(!map.isNewFile(FileEntry("b", 9)))
+ assert(map.isNewFile(FileEntry("b", 10)))
+ }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 47260a23c7..03222b4a49 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -104,12 +104,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
def createFileStream(
format: String,
path: String,
- schema: Option[StructType] = None): DataFrame = {
+ schema: Option[StructType] = None,
+ options: Map[String, String] = Map.empty): DataFrame = {
val reader =
if (schema.isDefined) {
- spark.readStream.format(format).schema(schema.get)
+ spark.readStream.format(format).schema(schema.get).options(options)
} else {
- spark.readStream.format(format)
+ spark.readStream.format(format).options(options)
}
reader.load(path)
}
@@ -331,6 +332,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("SPARK-17165 should not track the list of seen files indefinitely") {
+ // This test works by:
+ // 1. Create a file
+ // 2. Get it processed
+ // 3. Sleeps for a very short amount of time (larger than maxFileAge
+ // 4. Add another file (at this point the original file should have been purged
+ // 5. Test the size of the seenFiles internal data structure
+
+ // Note that if we change maxFileAge to a very large number, the last step should fail.
+ withTempDirs { case (src, tmp) =>
+ val textStream: DataFrame =
+ createFileStream("text", src.getCanonicalPath, options = Map("maxFileAge" -> "5ms"))
+
+ testStream(textStream)(
+ AddTextFileData("a\nb", src, tmp),
+ CheckAnswer("a", "b"),
+
+ // SLeeps longer than 5ms (maxFileAge)
+ AssertOnQuery { _ => Thread.sleep(10); true },
+
+ AddTextFileData("c\nd", src, tmp),
+ CheckAnswer("a", "b", "c", "d"),
+
+ AssertOnQuery("seen files should contain only one entry") { streamExecution =>
+ val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
+ e.source.asInstanceOf[FileStreamSource]
+ }.head
+ source.seenFiles.size == 1
+ }
+ )
+ }
+ }
+
// =============== JSON file stream tests ================
test("read from json files") {