diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-08-25 23:22:40 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-08-25 23:22:40 +0200 |
commit | a133057ce5817f834babe9f25023092aec3c321d (patch) | |
tree | 02ac9d496d357a5ae7f450fd00b0e8c9cc53cd19 /sql | |
parent | 9958ac0ce2b9e451d400604767bef2fe12a3399d (diff) | |
download | spark-a133057ce5817f834babe9f25023092aec3c321d.tar.gz spark-a133057ce5817f834babe9f25023092aec3c321d.tar.bz2 spark-a133057ce5817f834babe9f25023092aec3c321d.zip |
[SPARK-17229][SQL] PostgresDialect shouldn't widen float and short types during reads
## What changes were proposed in this pull request?
When reading float4 and smallint columns from PostgreSQL, Spark's `PostgresDialect` widens these types to Decimal and Integer rather than using the narrower Float and Short types. According to https://www.postgresql.org/docs/7.1/static/datatype.html#DATATYPE-TABLE, Postgres maps the `smallint` type to a signed two-byte integer and the `real` / `float4` types to single precision floating point numbers.
This patch fixes this by adding more special-cases to `getCatalystType`, similar to what was done for the Derby JDBC dialect. I also fixed a similar problem in the write path which causes Spark to create integer columns in Postgres for what should have been ShortType columns.
## How was this patch tested?
New test cases in `PostgresIntegrationSuite` (which I ran manually because Jenkins can't run it right now).
Author: Josh Rosen <joshrosen@databricks.com>
Closes #14796 from JoshRosen/postgres-jdbc-type-fixes.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 4 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 7 |
2 files changed, 10 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 6dad8cbef7..8d9048ab82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -390,6 +390,10 @@ private[jdbc] class JDBCRDD( (rs: ResultSet, row: MutableRow, pos: Int) => row.setLong(pos, rs.getLong(pos + 1)) + case ShortType => + (rs: ResultSet, row: MutableRow, pos: Int) => + row.setShort(pos, rs.getShort(pos + 1)) + case StringType => (rs: ResultSet, row: MutableRow, pos: Int) => // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index fb959d881e..3f540d6258 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -29,7 +29,11 @@ private object PostgresDialect extends JdbcDialect { override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { - if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) { + if (sqlType == Types.REAL) { + Some(FloatType) + } else if (sqlType == Types.SMALLINT) { + Some(ShortType) + } else if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) { Some(BinaryType) } else if (sqlType == Types.OTHER) { Some(StringType) @@ -66,6 +70,7 @@ private object PostgresDialect extends JdbcDialect { case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) + case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT)) case t: DecimalType => Some( JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] => |