aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorTakeshi Yamamuro <yamamuro@apache.org>2017-02-22 21:39:20 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-22 21:39:20 -0800
commit769aa0f1d22d3c6d4c7871468344d82c8dc36260 (patch)
tree0f90fe73597bf9821fcde910cd17f51aea566849 /sql/core/src/main/scala/org
parent66c4b79afd3aad4c1f9446b641e28d5513cbdf5f (diff)
downloadspark-769aa0f1d22d3c6d4c7871468344d82c8dc36260.tar.gz
spark-769aa0f1d22d3c6d4c7871468344d82c8dc36260.tar.bz2
spark-769aa0f1d22d3c6d4c7871468344d82c8dc36260.zip
[SPARK-19695][SQL] Throw an exception if a `columnNameOfCorruptRecord` field violates requirements in json formats
## What changes were proposed in this pull request? This pr comes from #16928 and fixed a json behaviour along with the CSV one. ## How was this patch tested? Added tests in `JsonSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17023 from maropu/SPARK-19695.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala13
2 files changed, 21 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4c1341ed5d..2be22761e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.JsonInferSchema
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -365,6 +365,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
createParser)
}
+ // Check a field requirement for corrupt records here to throw an exception in a driver side
+ schema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
+ val f = schema(corruptFieldIndex)
+ if (f.dataType != StringType || !f.nullable) {
+ throw new AnalysisException(
+ "The field for corrupt records must be string type and nullable")
+ }
+ }
+
val parsed = jsonDataset.rdd.mapPartitions { iter =>
val parser = new JacksonParser(schema, parsedOptions)
iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 2cbf4ea7be..902fee5a7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -22,13 +22,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
@@ -102,6 +102,15 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ // Check a field requirement for corrupt records here to throw an exception in a driver side
+ dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
+ val f = dataSchema(corruptFieldIndex)
+ if (f.dataType != StringType || !f.nullable) {
+ throw new AnalysisException(
+ "The field for corrupt records must be string type and nullable")
+ }
+ }
+
(file: PartitionedFile) => {
val parser = new JacksonParser(requiredSchema, parsedOptions)
JsonDataSource(parsedOptions).readFile(