aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-08-26 11:30:23 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-08-26 11:30:23 -0700
commit9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88 (patch)
treea4c5a9776e39309391d1a01b264fa718a7d3926e /sql/core/src/test/scala
parent261c55dd8808502fb7f3384eb537d26a4a8123d7 (diff)
downloadspark-9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88.tar.gz
spark-9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88.tar.bz2
spark-9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88.zip
[SPARK-17165][SQL] FileStreamSource should not track the list of seen files indefinitely
## What changes were proposed in this pull request? Before this change, FileStreamSource uses an in-memory hash set to track the list of files processed by the engine. The list can grow indefinitely, leading to OOM or overflow of the hash set. This patch introduces a new user-defined option called "maxFileAge", default to 24 hours. If a file is older than this age, FileStreamSource will purge it from the in-memory map that was used to track the list of files that have been processed. ## How was this patch tested? Added unit tests for the underlying utility, and also added an end-to-end test to validate the purge in FileStreamSourceSuite. Also verified the new test cases would fail when the timeout was set to a very large number. Author: petermaxlee <petermaxlee@gmail.com> Closes #14728 from petermaxlee/SPARK-17165.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala40
2 files changed, 113 insertions, 3 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
new file mode 100644
index 0000000000..c6db2fd3f9
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.SparkFunSuite
+
+class FileStreamSourceSuite extends SparkFunSuite {
+
+ import FileStreamSource._
+
+ test("SeenFilesMap") {
+ val map = new SeenFilesMap(maxAgeMs = 10)
+
+ map.add(FileEntry("a", 5))
+ assert(map.size == 1)
+ map.purge()
+ assert(map.size == 1)
+
+ // Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
+ map.add(FileEntry("b", 15))
+ assert(map.size == 2)
+ map.purge()
+ assert(map.size == 2)
+
+ // Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
+ map.add(FileEntry("c", 16))
+ assert(map.size == 3)
+ map.purge()
+ assert(map.size == 2)
+
+ // Override existing entry shouldn't change the size
+ map.add(FileEntry("c", 25))
+ assert(map.size == 2)
+
+ // Not a new file because we have seen c before
+ assert(!map.isNewFile(FileEntry("c", 20)))
+
+ // Not a new file because timestamp is too old
+ assert(!map.isNewFile(FileEntry("d", 5)))
+
+ // Finally a new file: never seen and not too old
+ assert(map.isNewFile(FileEntry("e", 20)))
+ }
+
+ test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
+ val map = new SeenFilesMap(maxAgeMs = 10)
+
+ map.add(FileEntry("a", 20))
+ assert(map.size == 1)
+
+ // Timestamp 5 should still considered a new file because purge time should be 0
+ assert(map.isNewFile(FileEntry("b", 9)))
+ assert(map.isNewFile(FileEntry("b", 10)))
+
+ // Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
+ map.purge()
+ assert(!map.isNewFile(FileEntry("b", 9)))
+ assert(map.isNewFile(FileEntry("b", 10)))
+ }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 47260a23c7..03222b4a49 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -104,12 +104,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
def createFileStream(
format: String,
path: String,
- schema: Option[StructType] = None): DataFrame = {
+ schema: Option[StructType] = None,
+ options: Map[String, String] = Map.empty): DataFrame = {
val reader =
if (schema.isDefined) {
- spark.readStream.format(format).schema(schema.get)
+ spark.readStream.format(format).schema(schema.get).options(options)
} else {
- spark.readStream.format(format)
+ spark.readStream.format(format).options(options)
}
reader.load(path)
}
@@ -331,6 +332,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("SPARK-17165 should not track the list of seen files indefinitely") {
+ // This test works by:
+ // 1. Create a file
+ // 2. Get it processed
+ // 3. Sleeps for a very short amount of time (larger than maxFileAge
+ // 4. Add another file (at this point the original file should have been purged
+ // 5. Test the size of the seenFiles internal data structure
+
+ // Note that if we change maxFileAge to a very large number, the last step should fail.
+ withTempDirs { case (src, tmp) =>
+ val textStream: DataFrame =
+ createFileStream("text", src.getCanonicalPath, options = Map("maxFileAge" -> "5ms"))
+
+ testStream(textStream)(
+ AddTextFileData("a\nb", src, tmp),
+ CheckAnswer("a", "b"),
+
+ // SLeeps longer than 5ms (maxFileAge)
+ AssertOnQuery { _ => Thread.sleep(10); true },
+
+ AddTextFileData("c\nd", src, tmp),
+ CheckAnswer("a", "b", "c", "d"),
+
+ AssertOnQuery("seen files should contain only one entry") { streamExecution =>
+ val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
+ e.source.asInstanceOf[FileStreamSource]
+ }.head
+ source.seenFiles.size == 1
+ }
+ )
+ }
+ }
+
// =============== JSON file stream tests ================
test("read from json files") {