aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-09-06 19:34:11 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-09-06 19:34:11 -0700
commiteb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc (patch)
treec8a3a83ddc538b2a1e83c551441b98e9ad2c2099 /sql
parentd6eede9a36766e2d2294951b054d7557008a5662 (diff)
downloadspark-eb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc.tar.gz
spark-eb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc.tar.bz2
spark-eb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc.zip
[SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to save file names in FileStreamSource
## What changes were proposed in this pull request? When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is a Stream[String]. This is because of this [line](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93), where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79) filters this Stream[String] to remove the seen files, it creates a new Stream[String], which has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] causes NotSerializableException. This will happened even if there is just one file in the dir. Its important to note that this behavior is different in Scala 2.11. There is no $outer reference to FileStreamSource, so it does not throw NotSerializableException. However, with a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is because how Stream class is implemented. Its basically like a linked list, and attempting to serialize a long Stream requires *recursively* going through linked list, thus resulting in StackOverflowError. In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions are true. - file stream defined on a partitioned directory - directory has 10k+ files The right solution is to convert the seq to an array before writing to the log. This PR implements this fix in two ways. - Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq - Added a `require` in HDFSMetadataLog such that it is never used with type Seq ## How was this patch tested? Added unit test that test that ensures the file stream source can handle with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14987 from tdas/SPARK-17372.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala42
6 files changed, 65 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 4254df44c9..7520163522 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -80,7 +80,7 @@ object SinkFileStatus {
* (drops the deleted files).
*/
class FileStreamSinkLog(sparkSession: SparkSession, path: String)
- extends HDFSMetadataLog[Seq[SinkFileStatus]](sparkSession, path) {
+ extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
import FileStreamSinkLog._
@@ -123,11 +123,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
}
}
- override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = {
+ override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
(VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
}
- override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = {
+ override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
val lines = new String(bytes, UTF_8).split("\n")
if (lines.length == 0) {
throw new IllegalStateException("Incomplete log file")
@@ -136,10 +136,10 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
if (version != VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
- lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_))
+ lines.slice(1, lines.length).map(read[SinkFileStatus](_))
}
- override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
+ override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
if (isCompactionBatch(batchId, compactInterval)) {
compact(batchId, logs)
} else {
@@ -186,7 +186,7 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
- if (super.add(batchId, compactLogs(allLogs))) {
+ if (super.add(batchId, compactLogs(allLogs).toArray)) {
if (isDeletingExpiredLog) {
deleteExpiredLog(batchId)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index e8b969b5e0..42fb454c2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -49,7 +49,7 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}
- private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)
+ private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
@@ -98,7 +98,7 @@ class FileStreamSource(
if (batchFiles.nonEmpty) {
maxBatchId += 1
- metadataLog.add(maxBatchId, batchFiles)
+ metadataLog.add(maxBatchId, batchFiles.toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 127ece9ab0..39a0f33413 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -49,6 +49,10 @@ import org.apache.spark.util.UninterruptibleThread
class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {
+ // Avoid serializing generic sequences, see SPARK-17372
+ require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
+ "Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
+
import HDFSMetadataLog._
val metadataPath = new Path(path)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4d05af0b60..5e1e5eeb50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -407,6 +407,9 @@ class StreamExecution(
awaitBatchLock.lock()
try {
awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+ if (streamDeathCause != null) {
+ throw streamDeathCause
+ }
} finally {
awaitBatchLock.unlock()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 39fd1f0cd3..26f8b98cb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -98,7 +98,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
test("serialize") {
withFileStreamSinkLog { sinkLog =>
- val logs = Seq(
+ val logs = Array(
SinkFileStatus(
path = "/a/b/x",
size = 100L,
@@ -132,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
// scalastyle:on
assert(expected === new String(sinkLog.serialize(logs), UTF_8))
- assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8))
+ assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8))
}
}
@@ -196,7 +196,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
for (batchId <- 0 to 10) {
sinkLog.add(
batchId,
- Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
+ Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
val expectedFiles = (0 to batchId).map {
id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION)
}
@@ -230,17 +230,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}.toSet
}
- sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
- sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0", "1") === listBatchFiles())
- sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact") === listBatchFiles())
- sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3") === listBatchFiles())
- sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3", "4") === listBatchFiles())
- sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
assert(Set("5.compact") === listBatchFiles())
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 03222b4a49..886f7be59d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.streaming
import java.io.File
-import java.util.UUID
+
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
@@ -142,6 +144,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
import testImplicits._
+ override val streamingTimeout = 20.seconds
+
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
private def createFileStreamSource(
format: String,
@@ -761,6 +765,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
}
+
+ test("SPARK-17372 - write file names to WAL as Array[String]") {
+ // Note: If this test takes longer than the timeout, then its likely that this is actually
+ // running a Spark job with 10000 tasks. This test tries to avoid that by
+ // 1. Setting the threshold for parallel file listing to very high
+ // 2. Using a query that should use constant folding to eliminate reading of the files
+
+ val numFiles = 10000
+
+ // This is to avoid running a spark job to list of files in parallel
+ // by the ListingFileCatalog.
+ spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
+
+ withTempDirs { case (root, tmp) =>
+ val src = new File(root, "a=1")
+ src.mkdirs()
+
+ (1 to numFiles).map { _.toString }.foreach { i =>
+ val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+ val finalFile = new File(src, tempFile.getName)
+ stringToFile(finalFile, i)
+ }
+ assert(src.listFiles().size === numFiles)
+
+ val files = spark.readStream.text(root.getCanonicalPath).as[String]
+
+ // Note this query will use constant folding to eliminate the file scan.
+ // This is to avoid actually running a Spark job with 10000 tasks
+ val df = files.filter("1 == 0").groupBy().count()
+
+ testStream(df, InternalOutputModes.Complete)(
+ AddTextFileData("0", src, tmp),
+ CheckAnswer(0)
+ )
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {