From b5e3bd87f5cfa3dc59e5b68d032756feee6b4e25 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 14 Mar 2016 19:28:13 -0700 Subject: [SPARK-13791][SQL] Add MetadataLog and HDFSMetadataLog ## What changes were proposed in this pull request? - Add a MetadataLog interface for metadata reliably storage. - Add HDFSMetadataLog as a MetadataLog implementation based on HDFS. - Update FileStreamSource to use HDFSMetadataLog instead of managing metadata by itself. ## How was this patch tested? unit tests Author: Shixiong Zhu Closes #11625 from zsxwing/metadata-log. --- .../sql/execution/streaming/FileStreamSource.scala | 125 +------------ .../sql/execution/streaming/HDFSMetadataLog.scala | 193 +++++++++++++++++++++ .../sql/execution/streaming/MetadataLog.scala | 51 ++++++ .../execution/streaming/HDFSMetadataLogSuite.scala | 103 +++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 58 +------ 5 files changed, 357 insertions(+), 173 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala (limited to 'sql/core/src') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index cce4b74ff2..25c8a69b1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,13 +17,9 @@ package org.apache.spark.sql.execution.streaming -import java.io._ -import java.nio.charset.StandardCharsets +import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.{ArrayBuffer, HashMap} -import scala.io.Codec - -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, SQLContext} @@ -44,33 +40,12 @@ class FileStreamSource( dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging { private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration) - private var maxBatchId = -1 - private val seenFiles = new OpenHashSet[String] + private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath) + private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) - /** Map of batch id to files. This map is also stored in `metadataPath`. */ - private val batchToMetadata = new HashMap[Long, Seq[String]] - - { - // Restore file paths from the metadata files - val existingBatchFiles = fetchAllBatchFiles() - if (existingBatchFiles.nonEmpty) { - val existingBatchIds = existingBatchFiles.map(_.getPath.getName.toInt) - maxBatchId = existingBatchIds.max - // Recover "batchToMetadata" and "seenFiles" from existing metadata files. - existingBatchIds.sorted.foreach { batchId => - val files = readBatch(batchId) - if (files.isEmpty) { - // Assert that the corrupted file must be the latest metadata file. - if (batchId != maxBatchId) { - throw new IllegalStateException("Invalid metadata files") - } - maxBatchId = maxBatchId - 1 - } else { - batchToMetadata(batchId) = files - files.foreach(seenFiles.add) - } - } - } + private val seenFiles = new OpenHashSet[String] + metadataLog.get(None, maxBatchId).foreach { case (batchId, files) => + files.foreach(seenFiles.add) } /** Returns the schema of the data from this source */ @@ -112,7 +87,7 @@ class FileStreamSource( if (newFiles.nonEmpty) { maxBatchId += 1 - writeBatch(maxBatchId, newFiles) + metadataLog.add(maxBatchId, newFiles) } new LongOffset(maxBatchId) @@ -140,9 +115,7 @@ class FileStreamSource( val endId = end.offset if (startId + 1 <= endId) { - val files = (startId + 1 to endId).filter(_ >= 0).flatMap { batchId => - batchToMetadata.getOrElse(batchId, Nil) - }.toArray + val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten logDebug(s"Return files from batches ${startId + 1}:$endId") logDebug(s"Streaming ${files.mkString(", ")}") Some(new Batch(end, dataFrameBuilder(files))) @@ -152,89 +125,9 @@ class FileStreamSource( } } - private def fetchAllBatchFiles(): Seq[FileStatus] = { - try fs.listStatus(new Path(metadataPath)) catch { - case _: java.io.FileNotFoundException => - fs.mkdirs(new Path(metadataPath)) - Seq.empty - } - } - private def fetchAllFiles(): Seq[String] = { fs.listStatus(new Path(path)) .filterNot(_.getPath.getName.startsWith("_")) .map(_.getPath.toUri.toString) } - - /** - * Write the metadata of a batch to disk. The file format is as follows: - * - * {{{ - * - * START - * -/a/b/c - * -/d/e/f - * ... - * END - * }}} - * - * Note: means the value of `FileStreamSource.VERSION`. Every file - * path starts with "-" so that we can know if a line is a file path easily. - */ - private def writeBatch(id: Int, files: Seq[String]): Unit = { - assert(files.nonEmpty, "create a new batch without any file") - val output = fs.create(new Path(metadataPath + "/" + id), true) - val writer = new PrintWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)) - try { - // scalastyle:off println - writer.println(FileStreamSource.VERSION) - writer.println(FileStreamSource.START_TAG) - files.foreach(file => writer.println(FileStreamSource.PATH_PREFIX + file)) - writer.println(FileStreamSource.END_TAG) - // scalastyle:on println - } finally { - writer.close() - } - batchToMetadata(id) = files - } - - /** Read the file names of the specified batch id from the metadata file */ - private def readBatch(id: Int): Seq[String] = { - val input = fs.open(new Path(metadataPath + "/" + id)) - try { - FileStreamSource.readBatch(input) - } finally { - input.close() - } - } -} - -object FileStreamSource { - - private val START_TAG = "START" - private val END_TAG = "END" - private val PATH_PREFIX = "-" - val VERSION = "FILESTREAM_V1" - - /** - * Parse a metadata file and return the content. If the metadata file is corrupted, it will return - * an empty `Seq`. - */ - def readBatch(input: InputStream): Seq[String] = { - val lines = scala.io.Source.fromInputStream(input)(Codec.UTF8).getLines().toArray - if (lines.length < 4) { - // version + start tag + end tag + at least one file path - return Nil - } - if (lines.head != VERSION) { - return Nil - } - if (lines(1) != START_TAG) { - return Nil - } - if (lines.last != END_TAG) { - return Nil - } - lines.slice(2, lines.length - 1).map(_.stripPrefix(PATH_PREFIX)) // Drop character "-" - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala new file mode 100644 index 0000000000..ac2842b6d5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -0,0 +1,193 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.streaming + +import java.io.{FileNotFoundException, IOException} +import java.nio.ByteBuffer +import java.util.{ConcurrentModificationException, EnumSet} + +import scala.reflect.ClassTag + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.sql.SQLContext + +/** + * A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path` + * as the metadata storage. + * + * When writing a new batch, [[HDFSMetadataLog]] will firstly write to a temp file and then rename + * it to the final batch file. If the rename step fails, there must be multiple writers and only + * one of them will succeed and the others will fail. + * + * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing + * files in a directory always shows the latest files. + */ +class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] { + + private val metadataPath = new Path(path) + + private val fc = + if (metadataPath.toUri.getScheme == null) { + FileContext.getFileContext(sqlContext.sparkContext.hadoopConfiguration) + } else { + FileContext.getFileContext(metadataPath.toUri, sqlContext.sparkContext.hadoopConfiguration) + } + + if (!fc.util().exists(metadataPath)) { + fc.mkdir(metadataPath, FsPermission.getDirDefault, true) + } + + /** + * A `PathFilter` to filter only batch files + */ + private val batchFilesFilter = new PathFilter { + override def accept(path: Path): Boolean = try { + path.getName.toLong + true + } catch { + case _: NumberFormatException => false + } + } + + private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance() + + private def batchFile(batchId: Long): Path = { + new Path(metadataPath, batchId.toString) + } + + override def add(batchId: Long, metadata: T): Boolean = { + get(batchId).map(_ => false).getOrElse { + // Only write metadata when the batch has not yet been written. + val buffer = serializer.serialize(metadata) + try { + writeBatch(batchId, JavaUtils.bufferToArray(buffer)) + true + } catch { + case e: IOException if "java.lang.InterruptedException" == e.getMessage => + // create may convert InterruptedException to IOException. Let's convert it back to + // InterruptedException so that this failure won't crash StreamExecution + throw new InterruptedException("Creating file is interrupted") + } + } + } + + /** + * Write a batch to a temp file then rename it to the batch file. + * + * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a + * valid behavior, we still need to prevent it from destroying the files. + */ + private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = { + // Use nextId to create a temp file + var nextId = 0 + while (true) { + val tempPath = new Path(metadataPath, s".${batchId}_$nextId.tmp") + fc.deleteOnExit(tempPath) + try { + val output = fc.create(tempPath, EnumSet.of(CreateFlag.CREATE)) + try { + output.write(bytes) + } finally { + output.close() + } + try { + // Try to commit the batch + // It will fail if there is an existing file (someone has committed the batch) + fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE) + return + } catch { + case e: IOException if isFileAlreadyExistsException(e) => + // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch. + // So throw an exception to tell the user this is not a valid behavior. + throw new ConcurrentModificationException( + s"Multiple HDFSMetadataLog are using $path", e) + case e: FileNotFoundException => + // Sometimes, "create" will succeed when multiple writers are calling it at the same + // time. However, only one writer can call "rename" successfully, others will get + // FileNotFoundException because the first writer has removed it. + throw new ConcurrentModificationException( + s"Multiple HDFSMetadataLog are using $path", e) + } + } catch { + case e: IOException if isFileAlreadyExistsException(e) => + // Failed to create "tempPath". There are two cases: + // 1. Someone is creating "tempPath" too. + // 2. This is a restart. "tempPath" has already been created but not moved to the final + // batch file (not committed). + // + // For both cases, the batch has not yet been committed. So we can retry it. + // + // Note: there is a potential risk here: if HDFSMetadataLog A is running, people can use + // the same metadata path to create "HDFSMetadataLog" and fail A. However, this is not a + // big problem because it requires the attacker must have the permission to write the + // metadata path. In addition, the old Streaming also have this issue, people can create + // malicious checkpoint files to crash a Streaming application too. + nextId += 1 + } + } + } + + private def isFileAlreadyExistsException(e: IOException): Boolean = { + e.isInstanceOf[FileAlreadyExistsException] || + // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in + // HADOOP-9361, we still need to support old Hadoop versions. + (e.getMessage != null && e.getMessage.startsWith("File already exists: ")) + } + + override def get(batchId: Long): Option[T] = { + val batchMetadataFile = batchFile(batchId) + if (fc.util().exists(batchMetadataFile)) { + val input = fc.open(batchMetadataFile) + val bytes = IOUtils.toByteArray(input) + Some(serializer.deserialize[T](ByteBuffer.wrap(bytes))) + } else { + None + } + } + + override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = { + val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter) + .map(_.getPath.getName.toLong) + .filter { batchId => + batchId <= endId && (startId.isEmpty || batchId >= startId.get) + } + batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { + case (batchId, metadataOption) => + (batchId, metadataOption.get) + } + } + + override def getLatest(): Option[(Long, T)] = { + val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter) + .map(_.getPath.getName.toLong) + .sorted + .reverse + for (batchId <- batchIds) { + val batch = get(batchId) + if (batch.isDefined) { + return Some((batchId, batch.get)) + } + } + None + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala new file mode 100644 index 0000000000..3f9896d23c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.streaming + +/** + * A general MetadataLog that supports the following features: + * + * - Allow the user to store a metadata object for each batch. + * - Allow the user to query the latest batch id. + * - Allow the user to query the metadata object of a specified batch id. + * - Allow the user to query metadata objects in a range of batch ids. + */ +trait MetadataLog[T] { + + /** + * Store the metadata for the specified batchId and return `true` if successful. If the batchId's + * metadata has already been stored, this method will return `false`. + */ + def add(batchId: Long, metadata: T): Boolean + + /** + * Return the metadata for the specified batchId if it's stored. Otherwise, return None. + */ + def get(batchId: Long): Option[T] + + /** + * Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is + * `None`, just return all batches before endId (inclusive). + */ + def get(startId: Option[Long], endId: Long): Array[(Long, T)] + + /** + * Return the latest batch Id and its metadata if exist. + */ + def getLatest(): Option[(Long, T)] +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala new file mode 100644 index 0000000000..4ddc218455 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.ConcurrentModificationException + +import org.scalatest.concurrent.AsyncAssertions._ +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSQLContext + +class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { + + test("basic") { + withTempDir { temp => + val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) + assert(metadataLog.add(0, "batch0")) + assert(metadataLog.getLatest() === Some(0 -> "batch0")) + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.getLatest() === Some(0 -> "batch0")) + assert(metadataLog.get(None, 0) === Array(0 -> "batch0")) + + assert(metadataLog.add(1, "batch1")) + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.get(1) === Some("batch1")) + assert(metadataLog.getLatest() === Some(1 -> "batch1")) + assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1")) + + // Adding the same batch does nothing + metadataLog.add(1, "batch1-duplicated") + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.get(1) === Some("batch1")) + assert(metadataLog.getLatest() === Some(1 -> "batch1")) + assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1")) + } + } + + test("restart") { + withTempDir { temp => + val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) + assert(metadataLog.add(0, "batch0")) + assert(metadataLog.add(1, "batch1")) + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.get(1) === Some("batch1")) + assert(metadataLog.getLatest() === Some(1 -> "batch1")) + assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1")) + + val metadataLog2 = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) + assert(metadataLog2.get(0) === Some("batch0")) + assert(metadataLog2.get(1) === Some("batch1")) + assert(metadataLog2.getLatest() === Some(1 -> "batch1")) + assert(metadataLog2.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1")) + } + } + + test("metadata directory collision") { + withTempDir { temp => + val waiter = new Waiter + val maxBatchId = 100 + for (id <- 0 until 10) { + new Thread() { + override def run(): Unit = waiter { + val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) + try { + var nextBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) + nextBatchId += 1 + while (nextBatchId <= maxBatchId) { + metadataLog.add(nextBatchId, nextBatchId.toString) + nextBatchId += 1 + } + } catch { + case e: ConcurrentModificationException => + // This is expected since there are multiple writers + } finally { + waiter.dismiss() + } + } + }.start() + } + + waiter.await(timeout(10.seconds), dismissals(10)) + val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) + assert(metadataLog.getLatest() === Some(maxBatchId -> maxBatchId.toString)) + assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString))) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index e6889bcc78..4c18e38db8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql.streaming -import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream} -import java.nio.charset.StandardCharsets +import java.io.File import org.apache.spark.sql.{AnalysisException, StreamTest} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.FileStreamSource._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -359,60 +357,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Utils.deleteRecursively(tmp) } - test("fault tolerance with corrupted metadata file") { - val src = Utils.createTempDir("streaming.src") - assert(new File(src, "_metadata").mkdirs()) - stringToFile( - new File(src, "_metadata/0"), - s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n") - stringToFile(new File(src, "_metadata/1"), s"${FileStreamSource.VERSION}\nSTART\n-") - - val textSource = createFileStreamSource("text", src.getCanonicalPath) - // the metadata file of batch is corrupted, so currentOffset should be 0 - assert(textSource.currentOffset === LongOffset(0)) - - Utils.deleteRecursively(src) - } - - test("fault tolerance with normal metadata file") { - val src = Utils.createTempDir("streaming.src") - assert(new File(src, "_metadata").mkdirs()) - stringToFile( - new File(src, "_metadata/0"), - s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n") - stringToFile( - new File(src, "_metadata/1"), - s"${FileStreamSource.VERSION}\nSTART\n-/x/y/z\nEND\n") - - val textSource = createFileStreamSource("text", src.getCanonicalPath) - assert(textSource.currentOffset === LongOffset(1)) - - Utils.deleteRecursively(src) - } - - test("readBatch") { - def stringToStream(str: String): InputStream = - new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)) - - // Invalid metadata - assert(readBatch(stringToStream("")) === Nil) - assert(readBatch(stringToStream(FileStreamSource.VERSION)) === Nil) - assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\n")) === Nil) - assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART")) === Nil) - assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-")) === Nil) - assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c")) === Nil) - assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n")) === Nil) - assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEN")) === Nil) - - // Valid metadata - assert(readBatch(stringToStream( - s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND")) === Seq("/a/b/c")) - assert(readBatch(stringToStream( - s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND\n")) === Seq("/a/b/c")) - assert(readBatch(stringToStream( - s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")) - === Seq("/a/b/c", "/e/f/g")) - } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext { -- cgit v1.2.3