aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-09-26 13:07:11 -0700
committerMichael Armbrust <michael@databricks.com>2016-09-26 13:07:11 -0700
commit8135e0e5ebdb9c7f5ac41c675dc8979a5127a31a (patch)
tree937ab0fdd62bc085dcca32811f5f0a01d50c8c56 /sql/core/src/test/scala
parentbde85f8b70138a51052b613664facbc981378c38 (diff)
downloadspark-8135e0e5ebdb9c7f5ac41c675dc8979a5127a31a.tar.gz
spark-8135e0e5ebdb9c7f5ac41c675dc8979a5127a31a.tar.bz2
spark-8135e0e5ebdb9c7f5ac41c675dc8979a5127a31a.zip
[SPARK-17153][SQL] Should read partition data when reading new files in filestream without globbing
## What changes were proposed in this pull request? When reading file stream with non-globbing path, the results return data with all `null`s for the partitioned columns. E.g., case class A(id: Int, value: Int) val data = spark.createDataset(Seq( A(1, 1), A(2, 2), A(2, 3)) ) val url = "/tmp/test" data.write.partitionBy("id").parquet(url) spark.read.parquet(url).show +-----+---+ |value| id| +-----+---+ | 2| 2| | 3| 2| | 1| 1| +-----+---+ val s = spark.readStream.schema(spark.read.load(url).schema).parquet(url) s.writeStream.queryName("test").format("memory").start() sql("SELECT * FROM test").show +-----+----+ |value| id| +-----+----+ | 2|null| | 3|null| | 1|null| +-----+----+ ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #14803 from viirya/filestreamsource-option.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala83
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala8
2 files changed, 90 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 55c95ae285..3157afe5a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -102,6 +102,12 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext with Private
}
}
+ case class DeleteFile(file: File) extends ExternalAction {
+ def runAction(): Unit = {
+ Utils.deleteRecursively(file)
+ }
+ }
+
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
def createFileStream(
format: String,
@@ -608,6 +614,81 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// =============== other tests ================
+ test("read new files in partitioned table without globbing, should read partition data") {
+ withTempDirs { case (dir, tmp) =>
+ val partitionFooSubDir = new File(dir, "partition=foo")
+ val partitionBarSubDir = new File(dir, "partition=bar")
+
+ val schema = new StructType().add("value", StringType).add("partition", StringType)
+ val fileStream = createFileStream("json", s"${dir.getCanonicalPath}", Some(schema))
+ val filtered = fileStream.filter($"value" contains "keep")
+ testStream(filtered)(
+ // Create new partition=foo sub dir and write to it
+ AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
+ CheckAnswer(("keep2", "foo")),
+
+ // Append to same partition=foo sub dir
+ AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
+ CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+ // Create new partition sub dir and write to it
+ AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
+ CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+ // Append to same partition=bar sub dir
+ AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
+ CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
+ )
+ }
+ }
+
+ test("when schema inference is turned on, should read partition data") {
+ def createFile(content: String, src: File, tmp: File): Unit = {
+ val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+ val finalFile = new File(src, tempFile.getName)
+ src.mkdirs()
+ require(stringToFile(tempFile, content).renameTo(finalFile))
+ }
+
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ withTempDirs { case (dir, tmp) =>
+ val partitionFooSubDir = new File(dir, "partition=foo")
+ val partitionBarSubDir = new File(dir, "partition=bar")
+
+ // Create file in partition, so we can infer the schema.
+ createFile("{'value': 'drop0'}", partitionFooSubDir, tmp)
+
+ val fileStream = createFileStream("json", s"${dir.getCanonicalPath}")
+ val filtered = fileStream.filter($"value" contains "keep")
+ testStream(filtered)(
+ // Append to same partition=foo sub dir
+ AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
+ CheckAnswer(("keep2", "foo")),
+
+ // Append to same partition=foo sub dir
+ AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
+ CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+ // Create new partition sub dir and write to it
+ AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
+ CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+ // Append to same partition=bar sub dir
+ AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
+ CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")),
+
+ // Delete the two partition dirs
+ DeleteFile(partitionFooSubDir),
+ DeleteFile(partitionBarSubDir),
+
+ AddTextFileData("{'value': 'keep6'}", partitionBarSubDir, tmp),
+ CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"),
+ ("keep6", "bar"))
+ )
+ }
+ }
+ }
+
test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)
@@ -792,7 +873,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
assert(src.listFiles().size === numFiles)
- val files = spark.readStream.text(root.getCanonicalPath).as[String]
+ val files = spark.readStream.text(root.getCanonicalPath).as[(String, Int)]
// Note this query will use constant folding to eliminate the file scan.
// This is to avoid actually running a Spark job with 10000 tasks
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 6c5b170d9c..aa6515bc7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -95,6 +95,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def addData(query: Option[StreamExecution]): (Source, Offset)
}
+ /** A trait that can be extended when testing a source. */
+ trait ExternalAction extends StreamAction {
+ def runAction(): Unit
+ }
+
case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
override def toString: String = s"AddData to $source: ${data.mkString(",")}"
@@ -429,6 +434,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
failTest("Error adding data", e)
}
+ case e: ExternalAction =>
+ e.runAction()
+
case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
verify(currentStream != null, "stream not running")
// Get the map of source index to the current source objects