aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-09-20 10:24:12 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-20 10:24:12 -0700
commita6aade0042d9c065669f46d2dac40ec6ce361e63 (patch)
tree01d6a34c34b222a6d1e406aa8de821f696cfcc67 /sql/core/src/test/scala
parenteb004c66200da7df36dd0a9a11999fc352197916 (diff)
downloadspark-a6aade0042d9c065669f46d2dac40ec6ce361e63.tar.gz
spark-a6aade0042d9c065669f46d2dac40ec6ce361e63.tar.bz2
spark-a6aade0042d9c065669f46d2dac40ec6ce361e63.zip
[SPARK-15698][SQL][STREAMING] Add the ability to remove the old MetadataLog in FileStreamSource
## What changes were proposed in this pull request? Current `metadataLog` in `FileStreamSource` will add a checkpoint file in each batch but do not have the ability to remove/compact, which will lead to large number of small files when running for a long time. So here propose to compact the old logs into one file. This method is quite similar to `FileStreamSinkLog` but simpler. ## How was this patch tested? Unit test added. Author: jerryshao <sshao@hortonworks.com> Closes #13513 from jerryshao/SPARK-15698.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala35
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala99
2 files changed, 116 insertions, 18 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 26f8b98cb3..41a8cc2400 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -25,13 +25,14 @@ import org.apache.spark.sql.test.SharedSQLContext
class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
+ import CompactibleFileStreamLog._
import FileStreamSinkLog._
test("getBatchIdFromFileName") {
assert(1234L === getBatchIdFromFileName("1234"))
assert(1234L === getBatchIdFromFileName("1234.compact"))
intercept[NumberFormatException] {
- FileStreamSinkLog.getBatchIdFromFileName("1234a")
+ getBatchIdFromFileName("1234a")
}
}
@@ -83,17 +84,19 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
test("compactLogs") {
- val logs = Seq(
- newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
- assert(logs === compactLogs(logs))
+ withFileStreamSinkLog { sinkLog =>
+ val logs = Seq(
+ newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
+ assert(logs === sinkLog.compactLogs(logs))
- val logs2 = Seq(
- newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
- assert(logs.dropRight(1) ++ logs2.dropRight(1) === compactLogs(logs ++ logs2))
+ val logs2 = Seq(
+ newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
+ assert(logs.dropRight(1) ++ logs2.dropRight(1) === sinkLog.compactLogs(logs ++ logs2))
+ }
}
test("serialize") {
@@ -125,21 +128,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
action = FileStreamSinkLog.ADD_ACTION))
// scalastyle:off
- val expected = s"""${FileStreamSinkLog.VERSION}
+ val expected = s"""$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
// scalastyle:on
assert(expected === new String(sinkLog.serialize(logs), UTF_8))
- assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8))
+ assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8))
}
}
test("deserialize") {
withFileStreamSinkLog { sinkLog =>
// scalastyle:off
- val logs = s"""${FileStreamSinkLog.VERSION}
+ val logs = s"""$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -173,7 +176,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8)))
- assert(Nil === sinkLog.deserialize(FileStreamSinkLog.VERSION.getBytes(UTF_8)))
+ assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8)))
}
}
@@ -263,7 +266,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
withTempDir { file =>
- val sinkLog = new FileStreamSinkLog(spark, file.getCanonicalPath)
+ val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath)
f(sinkLog)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a02a36c004..55c95ae285 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
import java.io.File
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
@@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class FileStreamSourceTest extends StreamTest with SharedSQLContext {
+class FileStreamSourceTest extends StreamTest with SharedSQLContext with PrivateMethodTester {
import testImplicits._
@@ -804,6 +804,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
)
}
}
+
+ test("compacat metadata log") {
+ val _sources = PrivateMethod[Seq[Source]]('sources)
+ val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)
+
+ def verify(execution: StreamExecution)
+ (batchId: Long, expectedBatches: Int): Boolean = {
+ import CompactibleFileStreamLog._
+
+ val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
+ val metadataLog = fileSource invokePrivate _metadataLog()
+
+ if (isCompactionBatch(batchId, 2)) {
+ val path = metadataLog.batchIdToPath(batchId)
+
+ // Assert path name should be ended with compact suffix.
+ assert(path.getName.endsWith(COMPACT_FILE_SUFFIX))
+
+ // Compacted batch should include all entries from start.
+ val entries = metadataLog.get(batchId)
+ assert(entries.isDefined)
+ assert(entries.get.length === metadataLog.allFiles().length)
+ assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length)
+ }
+
+ assert(metadataLog.allFiles().sortBy(_.batchId) ===
+ metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId))
+
+ metadataLog.get(None, Some(batchId)).flatMap(_._2).length === expectedBatches
+ }
+
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2"
+ ) {
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ AssertOnQuery(verify(_)(0L, 1)),
+ AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AssertOnQuery(verify(_)(1L, 2)),
+ AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
+ AssertOnQuery(verify(_)(2L, 3)),
+ StopStream,
+ StartStream(),
+ AssertOnQuery(verify(_)(2L, 3)),
+ AddTextFileData("drop10\nkeep11", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"),
+ AssertOnQuery(verify(_)(3L, 4)),
+ AddTextFileData("drop12\nkeep13", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"),
+ AssertOnQuery(verify(_)(4L, 5))
+ )
+ }
+ }
+ }
+
+ test("get arbitrary batch from FileStreamSource") {
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+ // Force deleting the old logs
+ SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+ ) {
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("keep1", src, tmp),
+ CheckAnswer("keep1"),
+ AddTextFileData("keep2", src, tmp),
+ CheckAnswer("keep1", "keep2"),
+ AddTextFileData("keep3", src, tmp),
+ CheckAnswer("keep1", "keep2", "keep3"),
+ AssertOnQuery("check getBatch") { execution: StreamExecution =>
+ val _sources = PrivateMethod[Seq[Source]]('sources)
+ val fileSource =
+ (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
+ assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() ===
+ List("keep1", "keep2", "keep3"))
+ assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() ===
+ List("keep2", "keep3"))
+ assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() ===
+ List("keep3"))
+ true
+ }
+ )
+ }
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {