diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-08-25 23:22:40 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-08-25 23:22:40 +0200 |
commit | a133057ce5817f834babe9f25023092aec3c321d (patch) | |
tree | 02ac9d496d357a5ae7f450fd00b0e8c9cc53cd19 /external/docker-integration-tests | |
parent | 9958ac0ce2b9e451d400604767bef2fe12a3399d (diff) | |
download | spark-a133057ce5817f834babe9f25023092aec3c321d.tar.gz spark-a133057ce5817f834babe9f25023092aec3c321d.tar.bz2 spark-a133057ce5817f834babe9f25023092aec3c321d.zip |
[SPARK-17229][SQL] PostgresDialect shouldn't widen float and short types during reads
## What changes were proposed in this pull request?
When reading float4 and smallint columns from PostgreSQL, Spark's `PostgresDialect` widens these types to Decimal and Integer rather than using the narrower Float and Short types. According to https://www.postgresql.org/docs/7.1/static/datatype.html#DATATYPE-TABLE, Postgres maps the `smallint` type to a signed two-byte integer and the `real` / `float4` types to single precision floating point numbers.
This patch fixes this by adding more special-cases to `getCatalystType`, similar to what was done for the Derby JDBC dialect. I also fixed a similar problem in the write path which causes Spark to create integer columns in Postgres for what should have been ShortType columns.
## How was this patch tested?
New test cases in `PostgresIntegrationSuite` (which I ran manually because Jenkins can't run it right now).
Author: Josh Rosen <joshrosen@databricks.com>
Closes #14796 from JoshRosen/postgres-jdbc-type-fixes.
Diffstat (limited to 'external/docker-integration-tests')
-rw-r--r-- | external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 79dd70116e..c9325dea0b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -22,7 +22,7 @@ import java.util.Properties import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.types.{ArrayType, DecimalType} +import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType} import org.apache.spark.tags.DockerTest @DockerTest @@ -45,10 +45,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { conn.prepareStatement("CREATE TYPE enum_type AS ENUM ('d1', 'd2')").executeUpdate() conn.prepareStatement("CREATE TABLE bar (c0 text, c1 integer, c2 double precision, c3 bigint, " + "c4 bit(1), c5 bit(10), c6 bytea, c7 boolean, c8 inet, c9 cidr, " - + "c10 integer[], c11 text[], c12 real[], c13 numeric(2,2)[], c14 enum_type)").executeUpdate() + + "c10 integer[], c11 text[], c12 real[], c13 numeric(2,2)[], c14 enum_type, " + + "c15 float4, c16 smallint)").executeUpdate() conn.prepareStatement("INSERT INTO bar VALUES ('hello', 42, 1.25, 123456789012345, B'0', " + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16', " - + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{0.11, 0.22}', 'd1')""").executeUpdate() + + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{0.11, 0.22}', 'd1', 1.01, 1)""" + ).executeUpdate() } test("Type mapping for various types") { @@ -56,7 +58,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass) - assert(types.length == 15) + assert(types.length == 17) assert(classOf[String].isAssignableFrom(types(0))) assert(classOf[java.lang.Integer].isAssignableFrom(types(1))) assert(classOf[java.lang.Double].isAssignableFrom(types(2))) @@ -72,6 +74,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(classOf[Seq[Double]].isAssignableFrom(types(12))) assert(classOf[Seq[BigDecimal]].isAssignableFrom(types(13))) assert(classOf[String].isAssignableFrom(types(14))) + assert(classOf[java.lang.Float].isAssignableFrom(types(15))) + assert(classOf[java.lang.Short].isAssignableFrom(types(16))) assert(rows(0).getString(0).equals("hello")) assert(rows(0).getInt(1) == 42) assert(rows(0).getDouble(2) == 1.25) @@ -90,6 +94,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getSeq(12).toSeq == Seq(0.11f, 0.22f)) assert(rows(0).getSeq(13) == Seq("0.11", "0.22").map(BigDecimal(_).bigDecimal)) assert(rows(0).getString(14) == "d1") + assert(rows(0).getFloat(15) == 1.01f) + assert(rows(0).getShort(16) == 1) } test("Basic write test") { @@ -104,4 +110,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { Column(Literal.create(null, a.dataType)).as(a.name) }: _*).write.jdbc(jdbcUrl, "public.barcopy2", new Properties) } + + test("Creating a table with shorts and floats") { + sqlContext.createDataFrame(Seq((1.0f, 1.toShort))) + .write.jdbc(jdbcUrl, "shortfloat", new Properties) + val schema = sqlContext.read.jdbc(jdbcUrl, "shortfloat", new Properties).schema + assert(schema(0).dataType == FloatType) + assert(schema(1).dataType == ShortType) + } } |