aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-24 14:27:39 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-05-24 14:27:39 -0700
commite631b819fe348729aab062207a452b8f1d1511bd (patch)
tree714a70671ec2616dada8ba189eb026ab466599ef /sql
parent20900e5feced76e87f0a12823d0e3f07e082105f (diff)
downloadspark-e631b819fe348729aab062207a452b8f1d1511bd.tar.gz
spark-e631b819fe348729aab062207a452b8f1d1511bd.tar.bz2
spark-e631b819fe348729aab062207a452b8f1d1511bd.zip
[SPARK-15458][SQL][STREAMING] Disable schema inference for streaming datasets on file streams
## What changes were proposed in this pull request? If the user relies on the schema to be inferred in file streams can break easily for multiple reasons - accidentally running on a directory which has no data - schema changing underneath - on restart, the query will infer schema again, and may unexpectedly infer incorrect schema, as the file in the directory may be different at the time of the restart. To avoid these complicated scenarios, for Spark 2.0, we are going to disable schema inferencing by default with a config, so that user is forced to consider explicitly what is the schema it wants, rather than the system trying to infer it and run into weird corner cases. In this PR, I introduce a SQLConf that determines whether schema inference for file streams is allowed or not. It is disabled by default. ## How was this patch tested? Updated unit tests that test error behavior with and without schema inference enabled. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13238 from tdas/SPARK-15458.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala239
3 files changed, 166 insertions, 91 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e5dd4d81d6..d0853f67b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
@@ -186,6 +187,16 @@ case class DataSource(
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
+ val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
+ val isTextSource = providingClass == classOf[text.DefaultSource]
+ // If the schema inference is disabled, only text sources require schema to be specified
+ if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
+ throw new IllegalArgumentException(
+ "Schema must be specified when creating a streaming source DataFrame. " +
+ "If some files already exist in the directory, then depending on the file format " +
+ "you may be able to create a static DataFrame on that directory with " +
+ "'spark.read.load(directory)' and infer schema from it.")
+ }
SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
case _ =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f3064eb6ac..b91518acce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -516,6 +516,13 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(60 * 1000L) // 10 minutes
+ val STREAMING_SCHEMA_INFERENCE =
+ SQLConfigBuilder("spark.sql.streaming.schemaInference")
+ .internal()
+ .doc("Whether file-based streaming sources will infer its own schema")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index c97304c0ec..1d784f1f4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -165,19 +166,36 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
.collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
}
+ // ============= Basic parameter exists tests ================
+
test("FileStreamSource schema: no path") {
- val e = intercept[IllegalArgumentException] {
- createFileStreamSourceAndGetSchema(format = None, path = None, schema = None)
+ def testError(): Unit = {
+ val e = intercept[IllegalArgumentException] {
+ createFileStreamSourceAndGetSchema(format = None, path = None, schema = None)
+ }
+ assert(e.getMessage.contains("path")) // reason is path, not schema
}
- assert("'path' is not specified" === e.getMessage)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { testError() }
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() }
}
- test("FileStreamSource schema: path doesn't exist") {
- intercept[AnalysisException] {
+ test("FileStreamSource schema: path doesn't exist, no schema") {
+ val e = intercept[IllegalArgumentException] {
createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None)
}
+ assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path
+ }
+
+ test("FileStreamSource schema: path doesn't exist, with schema") {
+ val userSchema = new StructType().add(new StructField("value", IntegerType))
+ val schema = createFileStreamSourceAndGetSchema(
+ format = None, path = Some("/a/b/c"), schema = Some(userSchema))
+ assert(schema === userSchema)
}
+
+ // =============== Text file stream schema tests ================
+
test("FileStreamSource schema: text, no existing files, no schema") {
withTempDir { src =>
val schema = createFileStreamSourceAndGetSchema(
@@ -205,13 +223,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== Parquet file stream schema tests ================
+
test("FileStreamSource schema: parquet, no existing files, no schema") {
withTempDir { src =>
- val e = intercept[AnalysisException] {
- createFileStreamSourceAndGetSchema(
- format = Some("parquet"), path = Some(new File(src, "1").getCanonicalPath), schema = None)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ val e = intercept[AnalysisException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("parquet"),
+ path = Some(new File(src, "1").getCanonicalPath),
+ schema = None)
+ }
+ assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
- assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
}
@@ -220,9 +244,21 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.parquet(src.getCanonicalPath)
- val schema = createFileStreamSourceAndGetSchema(
- format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
- assert(schema === new StructType().add("value", StringType))
+
+ // Without schema inference, should throw error
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+ intercept[IllegalArgumentException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
+ }
+ }
+
+ // With schema inference, should infer correct schema
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ val schema = createFileStreamSourceAndGetSchema(
+ format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
+ assert(schema === new StructType().add("value", StringType))
+ }
}
}
@@ -237,22 +273,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== JSON file stream schema tests ================
+
test("FileStreamSource schema: json, no existing files, no schema") {
withTempDir { src =>
- val e = intercept[AnalysisException] {
- createFileStreamSourceAndGetSchema(
- format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+
+ val e = intercept[AnalysisException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ }
+ assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
- assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
}
test("FileStreamSource schema: json, existing files, no schema") {
withTempDir { src =>
- stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}")
- val schema = createFileStreamSourceAndGetSchema(
- format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
- assert(schema === new StructType().add("c", StringType))
+
+ // Without schema inference, should throw error
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+ intercept[IllegalArgumentException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ }
+ }
+
+ // With schema inference, should infer correct schema
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}")
+ val schema = createFileStreamSourceAndGetSchema(
+ format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+ assert(schema === new StructType().add("c", StringType))
+ }
}
}
@@ -266,6 +319,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== Text file stream tests ================
+
test("read from text files") {
withTempDirs { case (src, tmp) =>
val textStream = createFileStream("text", src.getCanonicalPath)
@@ -284,6 +339,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== JSON file stream tests ================
+
test("read from json files") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema))
@@ -313,74 +370,82 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
test("read from json files with inferring schema") {
withTempDirs { case (src, tmp) =>
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
- val fileStream = createFileStream("json", src.getCanonicalPath)
- assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
+ val fileStream = createFileStream("json", src.getCanonicalPath)
+ assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
- // FileStreamSource should infer the column "c"
- val filtered = fileStream.filter($"c" contains "keep")
+ // FileStreamSource should infer the column "c"
+ val filtered = fileStream.filter($"c" contains "keep")
- testStream(filtered)(
- AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6")
- )
+ testStream(filtered)(
+ AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6")
+ )
+ }
}
}
test("reading from json files inside partitioned directory") {
withTempDirs { case (baseSrc, tmp) =>
- val src = new File(baseSrc, "type=X")
- src.mkdirs()
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ val src = new File(baseSrc, "type=X")
+ src.mkdirs()
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
- val fileStream = createFileStream("json", src.getCanonicalPath)
+ val fileStream = createFileStream("json", src.getCanonicalPath)
- // FileStreamSource should infer the column "c"
- val filtered = fileStream.filter($"c" contains "keep")
+ // FileStreamSource should infer the column "c"
+ val filtered = fileStream.filter($"c" contains "keep")
- testStream(filtered)(
- AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6")
- )
+ testStream(filtered)(
+ AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6")
+ )
+ }
}
}
test("reading from json files with changing schema") {
withTempDirs { case (src, tmp) =>
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'k': 'value0'}")
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'k': 'value0'}")
- val fileStream = createFileStream("json", src.getCanonicalPath)
+ val fileStream = createFileStream("json", src.getCanonicalPath)
- // FileStreamSource should infer the column "k"
- assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
+ // FileStreamSource should infer the column "k"
+ assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
- // After creating DF and before starting stream, add data with different schema
- // Should not affect the inferred schema any more
- stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
+ // After creating DF and before starting stream, add data with different schema
+ // Should not affect the inferred schema any more
+ stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
- testStream(fileStream)(
+ testStream(fileStream)(
- // Should not pick up column v in the file added before start
- AddTextFileData("{'k': 'value2'}", src, tmp),
- CheckAnswer("value0", "value1", "value2"),
+ // Should not pick up column v in the file added before start
+ AddTextFileData("{'k': 'value2'}", src, tmp),
+ CheckAnswer("value0", "value1", "value2"),
- // Should read data in column k, and ignore v
- AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3"),
+ // Should read data in column k, and ignore v
+ AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3"),
- // Should ignore rows that do not have the necessary k column
- AddTextFileData("{'v': 'value4'}", src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3", null))
+ // Should ignore rows that do not have the necessary k column
+ AddTextFileData("{'v': 'value4'}", src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3", null))
+ }
}
}
+ // =============== Parquet file stream tests ================
+
test("read from parquet files") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema))
@@ -402,49 +467,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
test("read from parquet files with changing schema") {
withTempDirs { case (src, tmp) =>
- // Add a file so that we can infer its schema
- AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
-
- val fileStream = createFileStream("parquet", src.getCanonicalPath)
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
- // FileStreamSource should infer the column "k"
- assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
+ // Add a file so that we can infer its schema
+ AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
- // After creating DF and before starting stream, add data with different schema
- // Should not affect the inferred schema any more
- AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
+ val fileStream = createFileStream("parquet", src.getCanonicalPath)
- testStream(fileStream)(
- // Should not pick up column v in the file added before start
- AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
- CheckAnswer("value0", "value1", "value2"),
+ // FileStreamSource should infer the column "k"
+ assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
- // Should read data in column k, and ignore v
- AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3"),
+ // After creating DF and before starting stream, add data with different schema
+ // Should not affect the inferred schema any more
+ AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
- // Should ignore rows that do not have the necessary k column
- AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
- CheckAnswer("value0", "value1", "value2", "value3", null)
- )
- }
- }
+ testStream(fileStream)(
+ // Should not pick up column v in the file added before start
+ AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
+ CheckAnswer("value0", "value1", "value2"),
- test("file stream source without schema") {
- withTempDir { src =>
- // Only "text" doesn't need a schema
- createFileStream("text", src.getCanonicalPath)
+ // Should read data in column k, and ignore v
+ AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3"),
- // Both "json" and "parquet" require a schema if no existing file to infer
- intercept[AnalysisException] {
- createFileStream("json", src.getCanonicalPath)
- }
- intercept[AnalysisException] {
- createFileStream("parquet", src.getCanonicalPath)
+ // Should ignore rows that do not have the necessary k column
+ AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3", null)
+ )
}
}
}
+ // =============== file stream globbing tests ================
+
test("read new files in nested directories with globbing") {
withTempDirs { case (dir, tmp) =>
@@ -518,6 +573,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ // =============== other tests ================
+
test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)