diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-04-02 23:12:04 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-02 23:12:04 -0700 |
commit | 2262a93358c2f6d4cfb73645c4ebc963c5640ec8 (patch) | |
tree | 6c18cee7dfc269e1b6cf7e213aa7043afbc98469 /sql | |
parent | 7be46205083fc688249ee619ac7758904f7aa55d (diff) | |
download | spark-2262a93358c2f6d4cfb73645c4ebc963c5640ec8.tar.gz spark-2262a93358c2f6d4cfb73645c4ebc963c5640ec8.tar.bz2 spark-2262a93358c2f6d4cfb73645c4ebc963c5640ec8.zip |
[SPARK-14231] [SQL] JSON data source infers floating-point values as a double when they do not fit in a decimal
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-14231
Currently, JSON data source supports to infer `DecimalType` for big numbers and `floatAsBigDecimal` option which reads floating-point values as `DecimalType`.
But there are few restrictions in Spark `DecimalType` below:
1. The precision cannot be bigger than 38.
2. scale cannot be bigger than precision.
Currently, both restrictions are not being handled.
This PR handles the cases by inferring them as `DoubleType`. Also, the option name was changed from `floatAsBigDecimal` to `prefersDecimal` as suggested [here](https://issues.apache.org/jira/browse/SPARK-14231?focusedCommentId=15215579&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15215579).
So, the codes below:
```scala
def doubleRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 0.01}""" ::
s"""{"a": 2${"0" * 38}, "b": 0.02}""" :: Nil)
val jsonDF = sqlContext.read
.option("prefersDecimal", "true")
.json(doubleRecords)
jsonDF.printSchema()
```
produces below:
- **Before**
```scala
org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1).;
at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:44)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:144)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:108)
at
...
```
- **After**
```scala
root
|-- a: double (nullable = true)
|-- b: double (nullable = true)
```
## How was this patch tested?
Unit tests were used and `./dev/run_tests` for coding style tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #12030 from HyukjinKwon/SPARK-14231.
Diffstat (limited to 'sql')
5 files changed, 69 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 704535adaa..a5a6e01e99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -315,8 +315,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * * You can set the following JSON-specific options to deal with non-standard JSON files: * <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li> - * <li>`floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal - * type</li> + * <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal + * type. If the values do not fit in decimal, then it infers them as doubles.</li> * <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li> * <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li> * <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 945ed2c211..4a34f365e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ import org.apache.spark.util.Utils - private[sql] object InferSchema { /** @@ -135,14 +134,20 @@ private[sql] object InferSchema { // when we see a Java BigInteger, we use DecimalType. case BIG_INTEGER | BIG_DECIMAL => val v = parser.getDecimalValue - DecimalType(v.precision(), v.scale()) - case FLOAT | DOUBLE => - if (configOptions.floatAsBigDecimal) { - val v = parser.getDecimalValue - DecimalType(v.precision(), v.scale()) + if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) { + DecimalType(Math.max(v.precision(), v.scale()), v.scale()) + } else { + DoubleType + } + case FLOAT | DOUBLE if configOptions.prefersDecimal => + val v = parser.getDecimalValue + if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) { + DecimalType(Math.max(v.precision(), v.scale()), v.scale()) } else { DoubleType } + case FLOAT | DOUBLE => + DoubleType } case VALUE_TRUE | VALUE_FALSE => BooleanType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index c0ad9efcb7..66f1126fb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -35,8 +35,8 @@ private[sql] class JSONOptions( parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false) - val floatAsBigDecimal = - parameters.get("floatAsBigDecimal").map(_.toBoolean).getOrElse(false) + val prefersDecimal = + parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false) val allowComments = parameters.get("allowComments").map(_.toBoolean).getOrElse(false) val allowUnquotedFieldNames = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index c108d81b18..421862c394 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -745,8 +745,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } - test("Loading a JSON dataset floatAsBigDecimal returns schema with float types as BigDecimal") { - val jsonDF = sqlContext.read.option("floatAsBigDecimal", "true").json(primitiveFieldAndType) + test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") { + val jsonDF = sqlContext.read.option("prefersDecimal", "true").json(primitiveFieldAndType) val expectedSchema = StructType( StructField("bigInteger", DecimalType(20, 0), true) :: @@ -773,6 +773,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } + test("Infer big integers correctly even when it does not fit in decimal") { + val jsonDF = sqlContext.read + .json(bigIntegerRecords) + + // The value in `a` field will be a double as it does not fit in decimal. For `b` field, + // it will be a decimal as `92233720368547758070`. + val expectedSchema = StructType( + StructField("a", DoubleType, true) :: + StructField("b", DecimalType(20, 0), true) :: Nil) + + assert(expectedSchema === jsonDF.schema) + checkAnswer(jsonDF, Row(1.0E38D, BigDecimal("92233720368547758070"))) + } + + test("Infer floating-point values correctly even when it does not fit in decimal") { + val jsonDF = sqlContext.read + .option("prefersDecimal", "true") + .json(floatingValueRecords) + + // The value in `a` field will be a double as it does not fit in decimal. For `b` field, + // it will be a decimal as `0.01` by having a precision equal to the scale. + val expectedSchema = StructType( + StructField("a", DoubleType, true) :: + StructField("b", DecimalType(2, 2), true):: Nil) + + assert(expectedSchema === jsonDF.schema) + checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01))) + + val mergedJsonDF = sqlContext.read + .option("prefersDecimal", "true") + .json(floatingValueRecords ++ bigIntegerRecords) + + val expectedMergedSchema = StructType( + StructField("a", DoubleType, true) :: + StructField("b", DecimalType(22, 2), true):: Nil) + + assert(expectedMergedSchema === mergedJsonDF.schema) + checkAnswer( + mergedJsonDF, + Row(1.0E-39D, BigDecimal(0.01)) :: + Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil + ) + } + test("Loading a JSON dataset from a text file with SQL") { val dir = Utils.createTempDir() dir.delete() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index b2eff816ee..2873c6a881 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -214,6 +214,14 @@ private[json] trait TestJsonData { """{"a": {"b": 1}}""" :: """{"a": []}""" :: Nil) + def floatingValueRecords: RDD[String] = + sqlContext.sparkContext.parallelize( + s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil) + + def bigIntegerRecords: RDD[String] = + sqlContext.sparkContext.parallelize( + s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil) + lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]()) |