aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-04-02 23:12:04 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-02 23:12:04 -0700
commit2262a93358c2f6d4cfb73645c4ebc963c5640ec8 (patch)
tree6c18cee7dfc269e1b6cf7e213aa7043afbc98469 /sql
parent7be46205083fc688249ee619ac7758904f7aa55d (diff)
downloadspark-2262a93358c2f6d4cfb73645c4ebc963c5640ec8.tar.gz
spark-2262a93358c2f6d4cfb73645c4ebc963c5640ec8.tar.bz2
spark-2262a93358c2f6d4cfb73645c4ebc963c5640ec8.zip
[SPARK-14231] [SQL] JSON data source infers floating-point values as a double when they do not fit in a decimal
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-14231 Currently, JSON data source supports to infer `DecimalType` for big numbers and `floatAsBigDecimal` option which reads floating-point values as `DecimalType`. But there are few restrictions in Spark `DecimalType` below: 1. The precision cannot be bigger than 38. 2. scale cannot be bigger than precision. Currently, both restrictions are not being handled. This PR handles the cases by inferring them as `DoubleType`. Also, the option name was changed from `floatAsBigDecimal` to `prefersDecimal` as suggested [here](https://issues.apache.org/jira/browse/SPARK-14231?focusedCommentId=15215579&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15215579). So, the codes below: ```scala def doubleRecords: RDD[String] = sqlContext.sparkContext.parallelize( s"""{"a": 1${"0" * 38}, "b": 0.01}""" :: s"""{"a": 2${"0" * 38}, "b": 0.02}""" :: Nil) val jsonDF = sqlContext.read .option("prefersDecimal", "true") .json(doubleRecords) jsonDF.printSchema() ``` produces below: - **Before** ```scala org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1).; at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:44) at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:144) at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:108) at ... ``` - **After** ```scala root |-- a: double (nullable = true) |-- b: double (nullable = true) ``` ## How was this patch tested? Unit tests were used and `./dev/run_tests` for coding style tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes #12030 from HyukjinKwon/SPARK-14231.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala48
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala8
5 files changed, 69 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 704535adaa..a5a6e01e99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -315,8 +315,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*
* You can set the following JSON-specific options to deal with non-standard JSON files:
* <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
- * <li>`floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal
- * type</li>
+ * <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal
+ * type. If the values do not fit in decimal, then it infers them as doubles.</li>
* <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
* <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
* <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 945ed2c211..4a34f365e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-
private[sql] object InferSchema {
/**
@@ -135,14 +134,20 @@ private[sql] object InferSchema {
// when we see a Java BigInteger, we use DecimalType.
case BIG_INTEGER | BIG_DECIMAL =>
val v = parser.getDecimalValue
- DecimalType(v.precision(), v.scale())
- case FLOAT | DOUBLE =>
- if (configOptions.floatAsBigDecimal) {
- val v = parser.getDecimalValue
- DecimalType(v.precision(), v.scale())
+ if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
+ DecimalType(Math.max(v.precision(), v.scale()), v.scale())
+ } else {
+ DoubleType
+ }
+ case FLOAT | DOUBLE if configOptions.prefersDecimal =>
+ val v = parser.getDecimalValue
+ if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
+ DecimalType(Math.max(v.precision(), v.scale()), v.scale())
} else {
DoubleType
}
+ case FLOAT | DOUBLE =>
+ DoubleType
}
case VALUE_TRUE | VALUE_FALSE => BooleanType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index c0ad9efcb7..66f1126fb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -35,8 +35,8 @@ private[sql] class JSONOptions(
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
val primitivesAsString =
parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
- val floatAsBigDecimal =
- parameters.get("floatAsBigDecimal").map(_.toBoolean).getOrElse(false)
+ val prefersDecimal =
+ parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
val allowComments =
parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
val allowUnquotedFieldNames =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index c108d81b18..421862c394 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -745,8 +745,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
- test("Loading a JSON dataset floatAsBigDecimal returns schema with float types as BigDecimal") {
- val jsonDF = sqlContext.read.option("floatAsBigDecimal", "true").json(primitiveFieldAndType)
+ test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") {
+ val jsonDF = sqlContext.read.option("prefersDecimal", "true").json(primitiveFieldAndType)
val expectedSchema = StructType(
StructField("bigInteger", DecimalType(20, 0), true) ::
@@ -773,6 +773,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
+ test("Infer big integers correctly even when it does not fit in decimal") {
+ val jsonDF = sqlContext.read
+ .json(bigIntegerRecords)
+
+ // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
+ // it will be a decimal as `92233720368547758070`.
+ val expectedSchema = StructType(
+ StructField("a", DoubleType, true) ::
+ StructField("b", DecimalType(20, 0), true) :: Nil)
+
+ assert(expectedSchema === jsonDF.schema)
+ checkAnswer(jsonDF, Row(1.0E38D, BigDecimal("92233720368547758070")))
+ }
+
+ test("Infer floating-point values correctly even when it does not fit in decimal") {
+ val jsonDF = sqlContext.read
+ .option("prefersDecimal", "true")
+ .json(floatingValueRecords)
+
+ // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
+ // it will be a decimal as `0.01` by having a precision equal to the scale.
+ val expectedSchema = StructType(
+ StructField("a", DoubleType, true) ::
+ StructField("b", DecimalType(2, 2), true):: Nil)
+
+ assert(expectedSchema === jsonDF.schema)
+ checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01)))
+
+ val mergedJsonDF = sqlContext.read
+ .option("prefersDecimal", "true")
+ .json(floatingValueRecords ++ bigIntegerRecords)
+
+ val expectedMergedSchema = StructType(
+ StructField("a", DoubleType, true) ::
+ StructField("b", DecimalType(22, 2), true):: Nil)
+
+ assert(expectedMergedSchema === mergedJsonDF.schema)
+ checkAnswer(
+ mergedJsonDF,
+ Row(1.0E-39D, BigDecimal(0.01)) ::
+ Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
+ )
+ }
+
test("Loading a JSON dataset from a text file with SQL") {
val dir = Utils.createTempDir()
dir.delete()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index b2eff816ee..2873c6a881 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -214,6 +214,14 @@ private[json] trait TestJsonData {
"""{"a": {"b": 1}}""" ::
"""{"a": []}""" :: Nil)
+ def floatingValueRecords: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil)
+
+ def bigIntegerRecords: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+
lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())