aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala239
3 files changed, 166 insertions, 91 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e5dd4d81d6..d0853f67b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
@@ -186,6 +187,16 @@ case class DataSource(
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
+ val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
+ val isTextSource = providingClass == classOf[text.DefaultSource]
+ // If the schema inference is disabled, only text sources require schema to be specified
+ if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
+ throw new IllegalArgumentException(
+ "Schema must be specified when creating a streaming source DataFrame. " +
+ "If some files already exist in the directory, then depending on the file format " +
+ "you may be able to create a static DataFrame on that directory with " +
+ "'spark.read.load(directory)' and infer schema from it.")
+ }
SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
case _ =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f3064eb6ac..b91518acce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -516,6 +516,13 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(60 * 1000L) // 10 minutes
+ val STREAMING_SCHEMA_INFERENCE =
+ SQLConfigBuilder("spark.sql.streaming.schemaInference")
+ .internal()
+ .doc("Whether file-based streaming sources will infer its own schema")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index c97304c0ec..1d784f1f4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -165,19 +166,36 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
.collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
}
+ // ============= Basic parameter exists tests ================
+
test("FileStreamSource schema: no path") {
- val e = intercept[IllegalArgumentException] {
- createFileStreamSourceAndGetSchema(format = None, path = None, schema = None)
+ def testError(): Unit = {
+ val e = intercept[IllegalArgumentException] {
+ createFileStreamSourceAndGetSchema(format = None, path = None, schema = None)
+ }
+ assert(e.getMessage.contains("path")) // reason is path, not schema
}
- assert("'path' is not specified" === e.getMessage)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { testError() }
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() }
}
- test("FileStreamSource schema: path doesn't exist") {
- intercept[AnalysisException] {
+ test("FileStreamSource schema: path doesn't exist, no schema") {
+ val e = intercept[IllegalArgumentException] {
createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None)
}
+ assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path
+ }
+
+ test("FileStreamSource schema: path doesn't exist, with schema") {
+ val userSchema = new StructType().add(new StructField("value", IntegerType))
+ val schema = createFileStreamSourceAndGetSchema(
+ format = None, path = Some("/a/b/c"), schema = Some(userSchema))
+ assert(schema === userSchema)
}
+
+ // =============== Text file stream schema tests ================
+
test("FileStreamSource schema: text, no existing files, no schema") {
withTempDir { src =>
val schema = createFileStreamSourceAndGetSchema(
@@ -205,13 +223,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== Parquet file stream schema tests ================
+
test("FileStreamSource schema: parquet, no existing files, no schema") {
withTempDir { src =>
- val e = intercept[AnalysisException] {
- createFileStreamSourceAndGetSchema(
- format = Some("parquet"), path = Some(new File(src, "1").getCanonicalPath), schema = None)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ val e = intercept[AnalysisException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("parquet"),
+ path = Some(new File(src, "1").getCanonicalPath),
+ schema = None)
+ }
+ assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
- assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
}
@@ -220,9 +244,21 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.parquet(src.getCanonicalPath)
- val schema = createFileStreamSourceAndGetSchema(
- format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
- assert(schema === new StructType().add("value", StringType))
+
+ // Without schema inference, should throw error
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+ intercept[IllegalArgumentException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
+ }
+ }
+
+ // With schema inference, should infer correct schema
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ val schema = createFileStreamSourceAndGetSchema(
+ format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
+ assert(schema === new StructType().add("value", StringType))
+ }
}
}
@@ -237,22 +273,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== JSON file stream schema tests ================
+
test("FileStreamSource schema: json, no existing files, no schema") {
withTempDir { src =>
- val e = intercept[AnalysisException] {
- createFileStreamSourceAndGetSchema(
- format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+
+ val e = intercept[AnalysisException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ }
+ assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
- assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
}
test("FileStreamSource schema: json, existing files, no schema") {
withTempDir { src =>
- stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}")
- val schema = createFileStreamSourceAndGetSchema(
- format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
- assert(schema === new StructType().add("c", StringType))
+
+ // Without schema inference, should throw error
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+ intercept[IllegalArgumentException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ }
+ }
+
+ // With schema inference, should infer correct schema
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}")
+ val schema = createFileStreamSourceAndGetSchema(
+ format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ assert(schema === new StructType().add("c", StringType))
+ }
}
}
@@ -266,6 +319,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== Text file stream tests ================
+
test("read from text files") {
withTempDirs { case (src, tmp) =>
val textStream = createFileStream("text", src.getCanonicalPath)
@@ -284,6 +339,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== JSON file stream tests ================
+
test("read from json files") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema))
@@ -313,74 +370,82 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
test("read from json files with inferring schema") {
withTempDirs { case (src, tmp) =>
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
- val fileStream = createFileStream("json", src.getCanonicalPath)
- assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
+ val fileStream = createFileStream("json", src.getCanonicalPath)
+ assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
- // FileStreamSource should infer the column "c"
- val filtered = fileStream.filter($"c" contains "keep")
+ // FileStreamSource should infer the column "c"
+ val filtered = fileStream.filter($"c" contains "keep")
- testStream(filtered)(
- AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6")
- )
+ testStream(filtered)(
+ AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6")
+ )
+ }
}
}
test("reading from json files inside partitioned directory") {
withTempDirs { case (baseSrc, tmp) =>
- val src = new File(baseSrc, "type=X")
- src.mkdirs()
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ val src = new File(baseSrc, "type=X")
+ src.mkdirs()
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
- val fileStream = createFileStream("json", src.getCanonicalPath)
+ val fileStream = createFileStream("json", src.getCanonicalPath)
- // FileStreamSource should infer the column "c"
- val filtered = fileStream.filter($"c" contains "keep")
+ // FileStreamSource should infer the column "c"
+ val filtered = fileStream.filter($"c" contains "keep")
- testStream(filtered)(
- AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6")
- )
+ testStream(filtered)(
+ AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6")
+ )
+ }
}
}
test("reading from json files with changing schema") {
withTempDirs { case (src, tmp) =>
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'k': 'value0'}")
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'k': 'value0'}")
- val fileStream = createFileStream("json", src.getCanonicalPath)
+ val fileStream = createFileStream("json", src.getCanonicalPath)
- // FileStreamSource should infer the column "k"
- assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
+ // FileStreamSource should infer the column "k"
+ assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
- // After creating DF and before starting stream, add data with different schema
- // Should not affect the inferred schema any more
- stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
+ // After creating DF and before starting stream, add data with different schema
+ // Should not affect the inferred schema any more
+ stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
- testStream(fileStream)(
+ testStream(fileStream)(
- // Should not pick up column v in the file added before start
- AddTextFileData("{'k': 'value2'}", src, tmp),
- CheckAnswer("value0", "value1", "value2"),
+ // Should not pick up column v in the file added before start
+ AddTextFileData("{'k': 'value2'}", src, tmp),
+ CheckAnswer("value0", "value1", "value2"),
- // Should read data in column k, and ignore v
- AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3"),
+ // Should read data in column k, and ignore v
+ AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3"),
- // Should ignore rows that do not have the necessary k column
- AddTextFileData("{'v': 'value4'}", src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3", null))
+ // Should ignore rows that do not have the necessary k column
+ AddTextFileData("{'v': 'value4'}", src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3", null))
+ }
}
}
+ // =============== Parquet file stream tests ================
+
test("read from parquet files") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema))
@@ -402,49 +467,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
test("read from parquet files with changing schema") {
withTempDirs { case (src, tmp) =>
- // Add a file so that we can infer its schema
- AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
-
- val fileStream = createFileStream("parquet", src.getCanonicalPath)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- // FileStreamSource should infer the column "k"
- assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
+ // Add a file so that we can infer its schema
+ AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
- // After creating DF and before starting stream, add data with different schema
- // Should not affect the inferred schema any more
- AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
+ val fileStream = createFileStream("parquet", src.getCanonicalPath)
- testStream(fileStream)(
- // Should not pick up column v in the file added before start
- AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
- CheckAnswer("value0", "value1", "value2"),
+ // FileStreamSource should infer the column "k"
+ assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
- // Should read data in column k, and ignore v
- AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3"),
+ // After creating DF and before starting stream, add data with different schema
+ // Should not affect the inferred schema any more
+ AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
- // Should ignore rows that do not have the necessary k column
- AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3", null)
- )
- }
- }
+ testStream(fileStream)(
+ // Should not pick up column v in the file added before start
+ AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
+ CheckAnswer("value0", "value1", "value2"),
- test("file stream source without schema") {
- withTempDir { src =>
- // Only "text" doesn't need a schema
- createFileStream("text", src.getCanonicalPath)
+ // Should read data in column k, and ignore v
+ AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3"),
- // Both "json" and "parquet" require a schema if no existing file to infer
- intercept[AnalysisException] {
- createFileStream("json", src.getCanonicalPath)
- }
- intercept[AnalysisException] {
- createFileStream("parquet", src.getCanonicalPath)
+ // Should ignore rows that do not have the necessary k column
+ AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3", null)
+ )
}
}
}
+ // =============== file stream globbing tests ================
+
test("read new files in nested directories with globbing") {
withTempDirs { case (dir, tmp) =>
@@ -518,6 +573,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== other tests ================
+
test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)