aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-14 19:28:13 -0700
committerMichael Armbrust <michael@databricks.com>2016-03-14 19:28:13 -0700
commitb5e3bd87f5cfa3dc59e5b68d032756feee6b4e25 (patch)
treee0ac2228bd3732915e0946b4c222e99172f0d3d6 /sql/core/src
parent8e0b030606927741f91317660cd14a8a5ed6e5f9 (diff)
downloadspark-b5e3bd87f5cfa3dc59e5b68d032756feee6b4e25.tar.gz
spark-b5e3bd87f5cfa3dc59e5b68d032756feee6b4e25.tar.bz2
spark-b5e3bd87f5cfa3dc59e5b68d032756feee6b4e25.zip
[SPARK-13791][SQL] Add MetadataLog and HDFSMetadataLog
## What changes were proposed in this pull request? - Add a MetadataLog interface for metadata reliably storage. - Add HDFSMetadataLog as a MetadataLog implementation based on HDFS. - Update FileStreamSource to use HDFSMetadataLog instead of managing metadata by itself. ## How was this patch tested? unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11625 from zsxwing/metadata-log.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala125
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala193
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala51
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala103
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala58
5 files changed, 357 insertions, 173 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index cce4b74ff2..25c8a69b1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,13 +17,9 @@
package org.apache.spark.sql.execution.streaming
-import java.io._
-import java.nio.charset.StandardCharsets
+import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.io.Codec
-
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -44,33 +40,12 @@ class FileStreamSource(
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
- private var maxBatchId = -1
- private val seenFiles = new OpenHashSet[String]
+ private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
+ private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
- /** Map of batch id to files. This map is also stored in `metadataPath`. */
- private val batchToMetadata = new HashMap[Long, Seq[String]]
-
- {
- // Restore file paths from the metadata files
- val existingBatchFiles = fetchAllBatchFiles()
- if (existingBatchFiles.nonEmpty) {
- val existingBatchIds = existingBatchFiles.map(_.getPath.getName.toInt)
- maxBatchId = existingBatchIds.max
- // Recover "batchToMetadata" and "seenFiles" from existing metadata files.
- existingBatchIds.sorted.foreach { batchId =>
- val files = readBatch(batchId)
- if (files.isEmpty) {
- // Assert that the corrupted file must be the latest metadata file.
- if (batchId != maxBatchId) {
- throw new IllegalStateException("Invalid metadata files")
- }
- maxBatchId = maxBatchId - 1
- } else {
- batchToMetadata(batchId) = files
- files.foreach(seenFiles.add)
- }
- }
- }
+ private val seenFiles = new OpenHashSet[String]
+ metadataLog.get(None, maxBatchId).foreach { case (batchId, files) =>
+ files.foreach(seenFiles.add)
}
/** Returns the schema of the data from this source */
@@ -112,7 +87,7 @@ class FileStreamSource(
if (newFiles.nonEmpty) {
maxBatchId += 1
- writeBatch(maxBatchId, newFiles)
+ metadataLog.add(maxBatchId, newFiles)
}
new LongOffset(maxBatchId)
@@ -140,9 +115,7 @@ class FileStreamSource(
val endId = end.offset
if (startId + 1 <= endId) {
- val files = (startId + 1 to endId).filter(_ >= 0).flatMap { batchId =>
- batchToMetadata.getOrElse(batchId, Nil)
- }.toArray
+ val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
logDebug(s"Return files from batches ${startId + 1}:$endId")
logDebug(s"Streaming ${files.mkString(", ")}")
Some(new Batch(end, dataFrameBuilder(files)))
@@ -152,89 +125,9 @@ class FileStreamSource(
}
}
- private def fetchAllBatchFiles(): Seq[FileStatus] = {
- try fs.listStatus(new Path(metadataPath)) catch {
- case _: java.io.FileNotFoundException =>
- fs.mkdirs(new Path(metadataPath))
- Seq.empty
- }
- }
-
private def fetchAllFiles(): Seq[String] = {
fs.listStatus(new Path(path))
.filterNot(_.getPath.getName.startsWith("_"))
.map(_.getPath.toUri.toString)
}
-
- /**
- * Write the metadata of a batch to disk. The file format is as follows:
- *
- * {{{
- * <FileStreamSource.VERSION>
- * START
- * -/a/b/c
- * -/d/e/f
- * ...
- * END
- * }}}
- *
- * Note: <FileStreamSource.VERSION> means the value of `FileStreamSource.VERSION`. Every file
- * path starts with "-" so that we can know if a line is a file path easily.
- */
- private def writeBatch(id: Int, files: Seq[String]): Unit = {
- assert(files.nonEmpty, "create a new batch without any file")
- val output = fs.create(new Path(metadataPath + "/" + id), true)
- val writer = new PrintWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8))
- try {
- // scalastyle:off println
- writer.println(FileStreamSource.VERSION)
- writer.println(FileStreamSource.START_TAG)
- files.foreach(file => writer.println(FileStreamSource.PATH_PREFIX + file))
- writer.println(FileStreamSource.END_TAG)
- // scalastyle:on println
- } finally {
- writer.close()
- }
- batchToMetadata(id) = files
- }
-
- /** Read the file names of the specified batch id from the metadata file */
- private def readBatch(id: Int): Seq[String] = {
- val input = fs.open(new Path(metadataPath + "/" + id))
- try {
- FileStreamSource.readBatch(input)
- } finally {
- input.close()
- }
- }
-}
-
-object FileStreamSource {
-
- private val START_TAG = "START"
- private val END_TAG = "END"
- private val PATH_PREFIX = "-"
- val VERSION = "FILESTREAM_V1"
-
- /**
- * Parse a metadata file and return the content. If the metadata file is corrupted, it will return
- * an empty `Seq`.
- */
- def readBatch(input: InputStream): Seq[String] = {
- val lines = scala.io.Source.fromInputStream(input)(Codec.UTF8).getLines().toArray
- if (lines.length < 4) {
- // version + start tag + end tag + at least one file path
- return Nil
- }
- if (lines.head != VERSION) {
- return Nil
- }
- if (lines(1) != START_TAG) {
- return Nil
- }
- if (lines.last != END_TAG) {
- return Nil
- }
- lines.slice(2, lines.length - 1).map(_.stripPrefix(PATH_PREFIX)) // Drop character "-"
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
new file mode 100644
index 0000000000..ac2842b6d5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -0,0 +1,193 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException}
+import java.nio.ByteBuffer
+import java.util.{ConcurrentModificationException, EnumSet}
+
+import scala.reflect.ClassTag
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.sql.SQLContext
+
+/**
+ * A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path`
+ * as the metadata storage.
+ *
+ * When writing a new batch, [[HDFSMetadataLog]] will firstly write to a temp file and then rename
+ * it to the final batch file. If the rename step fails, there must be multiple writers and only
+ * one of them will succeed and the others will fail.
+ *
+ * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
+ * files in a directory always shows the latest files.
+ */
+class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] {
+
+ private val metadataPath = new Path(path)
+
+ private val fc =
+ if (metadataPath.toUri.getScheme == null) {
+ FileContext.getFileContext(sqlContext.sparkContext.hadoopConfiguration)
+ } else {
+ FileContext.getFileContext(metadataPath.toUri, sqlContext.sparkContext.hadoopConfiguration)
+ }
+
+ if (!fc.util().exists(metadataPath)) {
+ fc.mkdir(metadataPath, FsPermission.getDirDefault, true)
+ }
+
+ /**
+ * A `PathFilter` to filter only batch files
+ */
+ private val batchFilesFilter = new PathFilter {
+ override def accept(path: Path): Boolean = try {
+ path.getName.toLong
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }
+
+ private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance()
+
+ private def batchFile(batchId: Long): Path = {
+ new Path(metadataPath, batchId.toString)
+ }
+
+ override def add(batchId: Long, metadata: T): Boolean = {
+ get(batchId).map(_ => false).getOrElse {
+ // Only write metadata when the batch has not yet been written.
+ val buffer = serializer.serialize(metadata)
+ try {
+ writeBatch(batchId, JavaUtils.bufferToArray(buffer))
+ true
+ } catch {
+ case e: IOException if "java.lang.InterruptedException" == e.getMessage =>
+ // create may convert InterruptedException to IOException. Let's convert it back to
+ // InterruptedException so that this failure won't crash StreamExecution
+ throw new InterruptedException("Creating file is interrupted")
+ }
+ }
+ }
+
+ /**
+ * Write a batch to a temp file then rename it to the batch file.
+ *
+ * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
+ * valid behavior, we still need to prevent it from destroying the files.
+ */
+ private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = {
+ // Use nextId to create a temp file
+ var nextId = 0
+ while (true) {
+ val tempPath = new Path(metadataPath, s".${batchId}_$nextId.tmp")
+ fc.deleteOnExit(tempPath)
+ try {
+ val output = fc.create(tempPath, EnumSet.of(CreateFlag.CREATE))
+ try {
+ output.write(bytes)
+ } finally {
+ output.close()
+ }
+ try {
+ // Try to commit the batch
+ // It will fail if there is an existing file (someone has committed the batch)
+ fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
+ return
+ } catch {
+ case e: IOException if isFileAlreadyExistsException(e) =>
+ // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
+ // So throw an exception to tell the user this is not a valid behavior.
+ throw new ConcurrentModificationException(
+ s"Multiple HDFSMetadataLog are using $path", e)
+ case e: FileNotFoundException =>
+ // Sometimes, "create" will succeed when multiple writers are calling it at the same
+ // time. However, only one writer can call "rename" successfully, others will get
+ // FileNotFoundException because the first writer has removed it.
+ throw new ConcurrentModificationException(
+ s"Multiple HDFSMetadataLog are using $path", e)
+ }
+ } catch {
+ case e: IOException if isFileAlreadyExistsException(e) =>
+ // Failed to create "tempPath". There are two cases:
+ // 1. Someone is creating "tempPath" too.
+ // 2. This is a restart. "tempPath" has already been created but not moved to the final
+ // batch file (not committed).
+ //
+ // For both cases, the batch has not yet been committed. So we can retry it.
+ //
+ // Note: there is a potential risk here: if HDFSMetadataLog A is running, people can use
+ // the same metadata path to create "HDFSMetadataLog" and fail A. However, this is not a
+ // big problem because it requires the attacker must have the permission to write the
+ // metadata path. In addition, the old Streaming also have this issue, people can create
+ // malicious checkpoint files to crash a Streaming application too.
+ nextId += 1
+ }
+ }
+ }
+
+ private def isFileAlreadyExistsException(e: IOException): Boolean = {
+ e.isInstanceOf[FileAlreadyExistsException] ||
+ // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
+ // HADOOP-9361, we still need to support old Hadoop versions.
+ (e.getMessage != null && e.getMessage.startsWith("File already exists: "))
+ }
+
+ override def get(batchId: Long): Option[T] = {
+ val batchMetadataFile = batchFile(batchId)
+ if (fc.util().exists(batchMetadataFile)) {
+ val input = fc.open(batchMetadataFile)
+ val bytes = IOUtils.toByteArray(input)
+ Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
+ } else {
+ None
+ }
+ }
+
+ override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = {
+ val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+ .map(_.getPath.getName.toLong)
+ .filter { batchId =>
+ batchId <= endId && (startId.isEmpty || batchId >= startId.get)
+ }
+ batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
+ case (batchId, metadataOption) =>
+ (batchId, metadataOption.get)
+ }
+ }
+
+ override def getLatest(): Option[(Long, T)] = {
+ val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+ .map(_.getPath.getName.toLong)
+ .sorted
+ .reverse
+ for (batchId <- batchIds) {
+ val batch = get(batchId)
+ if (batch.isDefined) {
+ return Some((batchId, batch.get))
+ }
+ }
+ None
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
new file mode 100644
index 0000000000..3f9896d23c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * A general MetadataLog that supports the following features:
+ *
+ * - Allow the user to store a metadata object for each batch.
+ * - Allow the user to query the latest batch id.
+ * - Allow the user to query the metadata object of a specified batch id.
+ * - Allow the user to query metadata objects in a range of batch ids.
+ */
+trait MetadataLog[T] {
+
+ /**
+ * Store the metadata for the specified batchId and return `true` if successful. If the batchId's
+ * metadata has already been stored, this method will return `false`.
+ */
+ def add(batchId: Long, metadata: T): Boolean
+
+ /**
+ * Return the metadata for the specified batchId if it's stored. Otherwise, return None.
+ */
+ def get(batchId: Long): Option[T]
+
+ /**
+ * Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is
+ * `None`, just return all batches before endId (inclusive).
+ */
+ def get(startId: Option[Long], endId: Long): Array[(Long, T)]
+
+ /**
+ * Return the latest batch Id and its metadata if exist.
+ */
+ def getLatest(): Option[(Long, T)]
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
new file mode 100644
index 0000000000..4ddc218455
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.ConcurrentModificationException
+
+import org.scalatest.concurrent.AsyncAssertions._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
+
+ test("basic") {
+ withTempDir { temp =>
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+ assert(metadataLog.add(0, "batch0"))
+ assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+ assert(metadataLog.get(0) === Some("batch0"))
+ assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+ assert(metadataLog.get(None, 0) === Array(0 -> "batch0"))
+
+ assert(metadataLog.add(1, "batch1"))
+ assert(metadataLog.get(0) === Some("batch0"))
+ assert(metadataLog.get(1) === Some("batch1"))
+ assert(metadataLog.getLatest() === Some(1 -> "batch1"))
+ assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+
+ // Adding the same batch does nothing
+ metadataLog.add(1, "batch1-duplicated")
+ assert(metadataLog.get(0) === Some("batch0"))
+ assert(metadataLog.get(1) === Some("batch1"))
+ assert(metadataLog.getLatest() === Some(1 -> "batch1"))
+ assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+ }
+ }
+
+ test("restart") {
+ withTempDir { temp =>
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+ assert(metadataLog.add(0, "batch0"))
+ assert(metadataLog.add(1, "batch1"))
+ assert(metadataLog.get(0) === Some("batch0"))
+ assert(metadataLog.get(1) === Some("batch1"))
+ assert(metadataLog.getLatest() === Some(1 -> "batch1"))
+ assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+
+ val metadataLog2 = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+ assert(metadataLog2.get(0) === Some("batch0"))
+ assert(metadataLog2.get(1) === Some("batch1"))
+ assert(metadataLog2.getLatest() === Some(1 -> "batch1"))
+ assert(metadataLog2.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+ }
+ }
+
+ test("metadata directory collision") {
+ withTempDir { temp =>
+ val waiter = new Waiter
+ val maxBatchId = 100
+ for (id <- 0 until 10) {
+ new Thread() {
+ override def run(): Unit = waiter {
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+ try {
+ var nextBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
+ nextBatchId += 1
+ while (nextBatchId <= maxBatchId) {
+ metadataLog.add(nextBatchId, nextBatchId.toString)
+ nextBatchId += 1
+ }
+ } catch {
+ case e: ConcurrentModificationException =>
+ // This is expected since there are multiple writers
+ } finally {
+ waiter.dismiss()
+ }
+ }
+ }.start()
+ }
+
+ waiter.await(timeout(10.seconds), dismissals(10))
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+ assert(metadataLog.getLatest() === Some(maxBatchId -> maxBatchId.toString))
+ assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString)))
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index e6889bcc78..4c18e38db8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -17,13 +17,11 @@
package org.apache.spark.sql.streaming
-import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream}
-import java.nio.charset.StandardCharsets
+import java.io.File
import org.apache.spark.sql.{AnalysisException, StreamTest}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.FileStreamSource._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Utils
@@ -359,60 +357,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Utils.deleteRecursively(tmp)
}
- test("fault tolerance with corrupted metadata file") {
- val src = Utils.createTempDir("streaming.src")
- assert(new File(src, "_metadata").mkdirs())
- stringToFile(
- new File(src, "_metadata/0"),
- s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
- stringToFile(new File(src, "_metadata/1"), s"${FileStreamSource.VERSION}\nSTART\n-")
-
- val textSource = createFileStreamSource("text", src.getCanonicalPath)
- // the metadata file of batch is corrupted, so currentOffset should be 0
- assert(textSource.currentOffset === LongOffset(0))
-
- Utils.deleteRecursively(src)
- }
-
- test("fault tolerance with normal metadata file") {
- val src = Utils.createTempDir("streaming.src")
- assert(new File(src, "_metadata").mkdirs())
- stringToFile(
- new File(src, "_metadata/0"),
- s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
- stringToFile(
- new File(src, "_metadata/1"),
- s"${FileStreamSource.VERSION}\nSTART\n-/x/y/z\nEND\n")
-
- val textSource = createFileStreamSource("text", src.getCanonicalPath)
- assert(textSource.currentOffset === LongOffset(1))
-
- Utils.deleteRecursively(src)
- }
-
- test("readBatch") {
- def stringToStream(str: String): InputStream =
- new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))
-
- // Invalid metadata
- assert(readBatch(stringToStream("")) === Nil)
- assert(readBatch(stringToStream(FileStreamSource.VERSION)) === Nil)
- assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\n")) === Nil)
- assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART")) === Nil)
- assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-")) === Nil)
- assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c")) === Nil)
- assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n")) === Nil)
- assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEN")) === Nil)
-
- // Valid metadata
- assert(readBatch(stringToStream(
- s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND")) === Seq("/a/b/c"))
- assert(readBatch(stringToStream(
- s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND\n")) === Seq("/a/b/c"))
- assert(readBatch(stringToStream(
- s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n"))
- === Seq("/a/b/c", "/e/f/g"))
- }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {