aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala42
6 files changed, 65 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 4254df44c9..7520163522 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -80,7 +80,7 @@ object SinkFileStatus {
* (drops the deleted files).
*/
class FileStreamSinkLog(sparkSession: SparkSession, path: String)
- extends HDFSMetadataLog[Seq[SinkFileStatus]](sparkSession, path) {
+ extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
import FileStreamSinkLog._
@@ -123,11 +123,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
}
}
- override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = {
+ override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
(VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
}
- override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = {
+ override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
val lines = new String(bytes, UTF_8).split("\n")
if (lines.length == 0) {
throw new IllegalStateException("Incomplete log file")
@@ -136,10 +136,10 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
if (version != VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
- lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_))
+ lines.slice(1, lines.length).map(read[SinkFileStatus](_))
}
- override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
+ override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
if (isCompactionBatch(batchId, compactInterval)) {
compact(batchId, logs)
} else {
@@ -186,7 +186,7 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
- if (super.add(batchId, compactLogs(allLogs))) {
+ if (super.add(batchId, compactLogs(allLogs).toArray)) {
if (isDeletingExpiredLog) {
deleteExpiredLog(batchId)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index e8b969b5e0..42fb454c2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -49,7 +49,7 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}
- private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)
+ private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
@@ -98,7 +98,7 @@ class FileStreamSource(
if (batchFiles.nonEmpty) {
maxBatchId += 1
- metadataLog.add(maxBatchId, batchFiles)
+ metadataLog.add(maxBatchId, batchFiles.toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 127ece9ab0..39a0f33413 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -49,6 +49,10 @@ import org.apache.spark.util.UninterruptibleThread
class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {
+ // Avoid serializing generic sequences, see SPARK-17372
+ require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
+ "Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
+
import HDFSMetadataLog._
val metadataPath = new Path(path)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4d05af0b60..5e1e5eeb50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -407,6 +407,9 @@ class StreamExecution(
awaitBatchLock.lock()
try {
awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+ if (streamDeathCause != null) {
+ throw streamDeathCause
+ }
} finally {
awaitBatchLock.unlock()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 39fd1f0cd3..26f8b98cb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -98,7 +98,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
test("serialize") {
withFileStreamSinkLog { sinkLog =>
- val logs = Seq(
+ val logs = Array(
SinkFileStatus(
path = "/a/b/x",
size = 100L,
@@ -132,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
// scalastyle:on
assert(expected === new String(sinkLog.serialize(logs), UTF_8))
- assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8))
+ assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8))
}
}
@@ -196,7 +196,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
for (batchId <- 0 to 10) {
sinkLog.add(
batchId,
- Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
+ Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
val expectedFiles = (0 to batchId).map {
id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION)
}
@@ -230,17 +230,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}.toSet
}
- sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
- sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0", "1") === listBatchFiles())
- sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact") === listBatchFiles())
- sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3") === listBatchFiles())
- sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3", "4") === listBatchFiles())
- sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
+ sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
assert(Set("5.compact") === listBatchFiles())
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 03222b4a49..886f7be59d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.streaming
import java.io.File
-import java.util.UUID
+
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
@@ -142,6 +144,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
import testImplicits._
+ override val streamingTimeout = 20.seconds
+
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
private def createFileStreamSource(
format: String,
@@ -761,6 +765,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
}
+
+ test("SPARK-17372 - write file names to WAL as Array[String]") {
+ // Note: If this test takes longer than the timeout, then its likely that this is actually
+ // running a Spark job with 10000 tasks. This test tries to avoid that by
+ // 1. Setting the threshold for parallel file listing to very high
+ // 2. Using a query that should use constant folding to eliminate reading of the files
+
+ val numFiles = 10000
+
+ // This is to avoid running a spark job to list of files in parallel
+ // by the ListingFileCatalog.
+ spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
+
+ withTempDirs { case (root, tmp) =>
+ val src = new File(root, "a=1")
+ src.mkdirs()
+
+ (1 to numFiles).map { _.toString }.foreach { i =>
+ val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+ val finalFile = new File(src, tempFile.getName)
+ stringToFile(finalFile, i)
+ }
+ assert(src.listFiles().size === numFiles)
+
+ val files = spark.readStream.text(root.getCanonicalPath).as[String]
+
+ // Note this query will use constant folding to eliminate the file scan.
+ // This is to avoid actually running a Spark job with 10000 tasks
+ val df = files.filter("1 == 0").groupBy().count()
+
+ testStream(df, InternalOutputModes.Complete)(
+ AddTextFileData("0", src, tmp),
+ CheckAnswer(0)
+ )
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {