aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-10-09 14:57:27 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-09 14:57:27 -0700
commit1c7f0ab302de9f82b1bd6da852d133823bc67c66 (patch)
treeb59cbdd05cd7fdc532da7c24333c6917a57a361b /sql/core/src/main
parent1faa1135a3fc0acd89f934f01a4a2edefcb93d33 (diff)
downloadspark-1c7f0ab302de9f82b1bd6da852d133823bc67c66.tar.gz
spark-1c7f0ab302de9f82b1bd6da852d133823bc67c66.tar.bz2
spark-1c7f0ab302de9f82b1bd6da852d133823bc67c66.zip
[SPARK-3339][SQL] Support for skipping json lines that fail to parse
This PR aims to provide a way to skip/query corrupt JSON records. To do so, we introduce an internal column to hold corrupt records (the default name is `_corrupt_record`. This name can be changed by setting the value of `spark.sql.columnNameOfCorruptRecord`). When there is a parsing error, we will put the corrupt record in its unparsed format to the internal column. Users can skip/query this column through SQL. * To query those corrupt records ``` -- For Hive parser SELECT `_corrupt_record` FROM jsonTable WHERE `_corrupt_record` IS NOT NULL -- For our SQL parser SELECT _corrupt_record FROM jsonTable WHERE _corrupt_record IS NOT NULL ``` * To skip corrupt records and query regular records ``` -- For Hive parser SELECT field1, field2 FROM jsonTable WHERE `_corrupt_record` IS NULL -- For our SQL parser SELECT field1, field2 FROM jsonTable WHERE _corrupt_record IS NULL ``` Generally, it is not recommended to change the name of the internal column. If the name has to be changed to avoid possible name conflicts, you can use `sqlContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, <new column name>)` or `sqlContext.sql(SET spark.sql.columnNameOfCorruptRecord=<new column name>)`. Author: Yin Huai <huai@cse.ohio-state.edu> Closes #2680 from yhuai/corruptJsonRecord and squashes the following commits: 4c9828e [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord 309616a [Yin Huai] Change the default name of corrupt record to "_corrupt_record". b4a3632 [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord 9375ae9 [Yin Huai] Set the column name of corrupt json record back to the default one after the unit test. ee584c0 [Yin Huai] Provide a way to query corrupt json records as unparsed strings.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala30
4 files changed, 46 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f6f4cf3b80..07e6e2eccd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -35,6 +35,7 @@ private[spark] object SQLConf {
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
+ val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -131,6 +132,9 @@ private[sql] trait SQLConf {
private[spark] def inMemoryPartitionPruning: Boolean =
getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean
+ private[spark] def columnNameOfCorruptRecord: String =
+ getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 35561cac3e..014e1e2826 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -195,9 +195,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
+ val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
val appliedSchema =
- Option(schema).getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, 1.0)))
- val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+ Option(schema).getOrElse(
+ JsonRDD.nullTypeToStringType(
+ JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
+ val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
applySchema(rowRDD, appliedSchema)
}
@@ -206,8 +209,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
- val appliedSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))
- val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+ val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+ val appliedSchema =
+ JsonRDD.nullTypeToStringType(
+ JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
+ val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
applySchema(rowRDD, appliedSchema)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index c006c4330f..f8171c3be3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -148,8 +148,12 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
* It goes through the entire dataset once to determine the schema.
*/
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
- val appliedScalaSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))
- val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+ val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+ val appliedScalaSchema =
+ JsonRDD.nullTypeToStringType(
+ JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
+ val scalaRowRDD =
+ JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
val logicalPlan =
LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
new JavaSchemaRDD(sqlContext, logicalPlan)
@@ -162,10 +166,14 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
*/
@Experimental
def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
+ val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
val appliedScalaSchema =
Option(asScalaDataType(schema)).getOrElse(
- JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))).asInstanceOf[SStructType]
- val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+ JsonRDD.nullTypeToStringType(
+ JsonRDD.inferSchema(
+ json.rdd, 1.0, columnNameOfCorruptJsonRecord))).asInstanceOf[SStructType]
+ val scalaRowRDD = JsonRDD.jsonStringToRow(
+ json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
val logicalPlan =
LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
new JavaSchemaRDD(sqlContext, logicalPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index fbc2965e61..61ee960aad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -22,6 +22,7 @@ import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
import scala.math.BigDecimal
import java.sql.Timestamp
+import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.rdd.RDD
@@ -35,16 +36,19 @@ private[sql] object JsonRDD extends Logging {
private[sql] def jsonStringToRow(
json: RDD[String],
- schema: StructType): RDD[Row] = {
- parseJson(json).map(parsed => asRow(parsed, schema))
+ schema: StructType,
+ columnNameOfCorruptRecords: String): RDD[Row] = {
+ parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, schema))
}
private[sql] def inferSchema(
json: RDD[String],
- samplingRatio: Double = 1.0): StructType = {
+ samplingRatio: Double = 1.0,
+ columnNameOfCorruptRecords: String): StructType = {
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
- val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
+ val allKeys =
+ parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
createSchema(allKeys)
}
@@ -274,7 +278,9 @@ private[sql] object JsonRDD extends Logging {
case atom => atom
}
- private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = {
+ private def parseJson(
+ json: RDD[String],
+ columnNameOfCorruptRecords: String): RDD[Map[String, Any]] = {
// According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
// ObjectMapper will not return BigDecimal when
// "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
@@ -289,12 +295,16 @@ private[sql] object JsonRDD extends Logging {
// For example: for {"key": 1, "key":2}, we will get "key"->2.
val mapper = new ObjectMapper()
iter.flatMap { record =>
- val parsed = mapper.readValue(record, classOf[Object]) match {
- case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil
- case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]]
- }
+ try {
+ val parsed = mapper.readValue(record, classOf[Object]) match {
+ case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil
+ case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]]
+ }
- parsed
+ parsed
+ } catch {
+ case e: JsonProcessingException => Map(columnNameOfCorruptRecords -> record) :: Nil
+ }
}
})
}