aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-08-25 23:22:40 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-25 23:22:40 +0200
commita133057ce5817f834babe9f25023092aec3c321d (patch)
tree02ac9d496d357a5ae7f450fd00b0e8c9cc53cd19 /sql
parent9958ac0ce2b9e451d400604767bef2fe12a3399d (diff)
downloadspark-a133057ce5817f834babe9f25023092aec3c321d.tar.gz
spark-a133057ce5817f834babe9f25023092aec3c321d.tar.bz2
spark-a133057ce5817f834babe9f25023092aec3c321d.zip
[SPARK-17229][SQL] PostgresDialect shouldn't widen float and short types during reads
## What changes were proposed in this pull request? When reading float4 and smallint columns from PostgreSQL, Spark's `PostgresDialect` widens these types to Decimal and Integer rather than using the narrower Float and Short types. According to https://www.postgresql.org/docs/7.1/static/datatype.html#DATATYPE-TABLE, Postgres maps the `smallint` type to a signed two-byte integer and the `real` / `float4` types to single precision floating point numbers. This patch fixes this by adding more special-cases to `getCatalystType`, similar to what was done for the Derby JDBC dialect. I also fixed a similar problem in the write path which causes Spark to create integer columns in Postgres for what should have been ShortType columns. ## How was this patch tested? New test cases in `PostgresIntegrationSuite` (which I ran manually because Jenkins can't run it right now). Author: Josh Rosen <joshrosen@databricks.com> Closes #14796 from JoshRosen/postgres-jdbc-type-fixes.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala7
2 files changed, 10 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 6dad8cbef7..8d9048ab82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -390,6 +390,10 @@ private[jdbc] class JDBCRDD(
(rs: ResultSet, row: MutableRow, pos: Int) =>
row.setLong(pos, rs.getLong(pos + 1))
+ case ShortType =>
+ (rs: ResultSet, row: MutableRow, pos: Int) =>
+ row.setShort(pos, rs.getShort(pos + 1))
+
case StringType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index fb959d881e..3f540d6258 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -29,7 +29,11 @@ private object PostgresDialect extends JdbcDialect {
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
- if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
+ if (sqlType == Types.REAL) {
+ Some(FloatType)
+ } else if (sqlType == Types.SMALLINT) {
+ Some(ShortType)
+ } else if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
Some(BinaryType)
} else if (sqlType == Types.OTHER) {
Some(StringType)
@@ -66,6 +70,7 @@ private object PostgresDialect extends JdbcDialect {
case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
+ case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT))
case t: DecimalType => Some(
JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
case ArrayType(et, _) if et.isInstanceOf[AtomicType] =>