aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala40
1 files changed, 21 insertions, 19 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 89de15acf5..73d1b1b1d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -63,6 +63,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
format: String,
path: String,
schema: Option[StructType] = None): FileStreamSource = {
+ val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
val reader =
if (schema.isDefined) {
sqlContext.read.format(format).schema(schema.get)
@@ -71,8 +72,10 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
}
reader.stream(path)
.queryExecution.analyzed
- .collect { case StreamingRelation(s: FileStreamSource, _) => s }
- .head
+ .collect { case StreamingRelation(dataSource, _, _) =>
+ // There is only one source in our tests so just set sourceId to 0
+ dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
+ }.head
}
val valueSchema = new StructType().add("value", StringType)
@@ -96,9 +99,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
reader.stream()
}
df.queryExecution.analyzed
- .collect { case StreamingRelation(s: FileStreamSource, _) => s }
- .head
- .schema
+ .collect { case StreamingRelation(dataSource, _, _) =>
+ dataSource.sourceSchema()
+ }.head._2
}
test("FileStreamSource schema: no path") {
@@ -202,8 +205,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from text files") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -224,8 +227,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from json files") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -258,8 +261,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from json files with inferring schema") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
@@ -279,8 +282,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from parquet files") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema))
val filtered = fileSource.toDF().filter($"value" contains "keep")
@@ -301,7 +304,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("file stream source without schema") {
- val src = Utils.createTempDir("streaming.src")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
// Only "text" doesn't need a schema
createFileStreamSource("text", src.getCanonicalPath)
@@ -318,8 +321,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("fault tolerance") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -338,7 +341,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
}
-
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {
@@ -346,8 +348,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
import testImplicits._
test("file source stress test") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val ds = textSource.toDS[String]().map(_.toInt + 1)