aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/MimaExcludes.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala190
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala48
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala54
10 files changed, 332 insertions, 187 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 40f5c9fec8..dacef911e3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -116,7 +116,24 @@ object MimaExcludes {
"org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
) ++ Seq(
// SPARK-11485
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df")
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df"),
+ // SPARK-11541 mark various JDBC dialects as private
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productElement"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productArity"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.canEqual"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productIterator"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productPrefix"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.toString"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.hashCode"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.PostgresDialect$"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productElement"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productArity"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.canEqual"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productIterator"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productPrefix"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.toString"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.hashCode"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.NoopDialect$")
)
case v if v.startsWith("1.5") =>
Seq(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index 7cf66b65c8..f9eab5c2e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.NumericType
class GroupedData protected[sql](
df: DataFrame,
groupingExprs: Seq[Expression],
- private val groupType: GroupedData.GroupType) {
+ groupType: GroupedData.GroupType) {
private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
new file mode 100644
index 0000000000..467d8d62d1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import org.apache.spark.sql.types.{DataType, MetadataBuilder}
+
+/**
+ * AggregatedDialect can unify multiple dialects into one virtual Dialect.
+ * Dialects are tried in order, and the first dialect that does not return a
+ * neutral element will will.
+ *
+ * @param dialects List of dialects.
+ */
+private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
+
+ require(dialects.nonEmpty)
+
+ override def canHandle(url : String): Boolean =
+ dialects.map(_.canHandle(url)).reduce(_ && _)
+
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
+ }
+
+ override def getJDBCType(dt: DataType): Option[JdbcType] = {
+ dialects.flatMap(_.getJDBCType(dt)).headOption
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
new file mode 100644
index 0000000000..b1cb0e5502
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import org.apache.spark.sql.types.{BooleanType, StringType, DataType}
+
+
+private object DB2Dialect extends JdbcDialect {
+
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
+
+ override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+ case StringType => Option(JdbcType("CLOB", java.sql.Types.CLOB))
+ case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+ case _ => None
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
new file mode 100644
index 0000000000..84f68e779c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private object DerbyDialect extends JdbcDialect {
+
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
+
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ if (sqlType == Types.REAL) Option(FloatType) else None
+ }
+
+ override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+ case StringType => Option(JdbcType("CLOB", java.sql.Types.CLOB))
+ case ByteType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+ case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+ case BooleanType => Option(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
+ // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
+ case t: DecimalType if t.precision > 31 =>
+ Option(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
+ case _ => None
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index f9a6a09b62..14bfea4e3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.jdbc
-import java.sql.Types
-
import org.apache.spark.sql.types._
import org.apache.spark.annotation.DeveloperApi
@@ -115,11 +113,10 @@ abstract class JdbcDialect {
@DeveloperApi
object JdbcDialects {
- private var dialects = List[JdbcDialect]()
-
/**
* Register a dialect for use on all new matching jdbc [[org.apache.spark.sql.DataFrame]].
* Readding an existing dialect will cause a move-to-front.
+ *
* @param dialect The new dialect.
*/
def registerDialect(dialect: JdbcDialect) : Unit = {
@@ -128,12 +125,15 @@ object JdbcDialects {
/**
* Unregister a dialect. Does nothing if the dialect is not registered.
+ *
* @param dialect The jdbc dialect.
*/
def unregisterDialect(dialect : JdbcDialect) : Unit = {
dialects = dialects.filterNot(_ == dialect)
}
+ private[this] var dialects = List[JdbcDialect]()
+
registerDialect(MySQLDialect)
registerDialect(PostgresDialect)
registerDialect(DB2Dialect)
@@ -141,7 +141,6 @@ object JdbcDialects {
registerDialect(DerbyDialect)
registerDialect(OracleDialect)
-
/**
* Fetch the JdbcDialect class corresponding to a given database url.
*/
@@ -156,187 +155,8 @@ object JdbcDialects {
}
/**
- * :: DeveloperApi ::
- * AggregatedDialect can unify multiple dialects into one virtual Dialect.
- * Dialects are tried in order, and the first dialect that does not return a
- * neutral element will will.
- * @param dialects List of dialects.
- */
-@DeveloperApi
-class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
-
- require(dialects.nonEmpty)
-
- override def canHandle(url : String): Boolean =
- dialects.map(_.canHandle(url)).reduce(_ && _)
-
- override def getCatalystType(
- sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
- dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
- }
-
- override def getJDBCType(dt: DataType): Option[JdbcType] = {
- dialects.flatMap(_.getJDBCType(dt)).headOption
- }
-}
-
-/**
- * :: DeveloperApi ::
* NOOP dialect object, always returning the neutral element.
*/
-@DeveloperApi
-case object NoopDialect extends JdbcDialect {
+private object NoopDialect extends JdbcDialect {
override def canHandle(url : String): Boolean = true
}
-
-/**
- * :: DeveloperApi ::
- * Default postgres dialect, mapping bit/cidr/inet on read and string/binary/boolean on write.
- */
-@DeveloperApi
-case object PostgresDialect extends JdbcDialect {
- override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
- override def getCatalystType(
- sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
- if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
- Option(BinaryType)
- } else if (sqlType == Types.OTHER && typeName.equals("cidr")) {
- Option(StringType)
- } else if (sqlType == Types.OTHER && typeName.equals("inet")) {
- Option(StringType)
- } else if (sqlType == Types.OTHER && typeName.equals("json")) {
- Option(StringType)
- } else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
- Option(StringType)
- } else None
- }
-
- override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
- case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
- case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
- case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
- case _ => None
- }
-
- override def getTableExistsQuery(table: String): String = {
- s"SELECT 1 FROM $table LIMIT 1"
- }
-
-}
-
-/**
- * :: DeveloperApi ::
- * Default mysql dialect to read bit/bitsets correctly.
- */
-@DeveloperApi
-case object MySQLDialect extends JdbcDialect {
- override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
- override def getCatalystType(
- sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
- if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
- // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
- // byte arrays instead of longs.
- md.putLong("binarylong", 1)
- Option(LongType)
- } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
- Option(BooleanType)
- } else None
- }
-
- override def quoteIdentifier(colName: String): String = {
- s"`$colName`"
- }
-
- override def getTableExistsQuery(table: String): String = {
- s"SELECT 1 FROM $table LIMIT 1"
- }
-}
-
-/**
- * :: DeveloperApi ::
- * Default DB2 dialect, mapping string/boolean on write to valid DB2 types.
- * By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1).
- */
-@DeveloperApi
-case object DB2Dialect extends JdbcDialect {
-
- override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
-
- override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
- case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
- case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
- case _ => None
- }
-}
-
-/**
- * :: DeveloperApi ::
- * Default Microsoft SQL Server dialect, mapping the datetimeoffset types to a String on read.
- */
-@DeveloperApi
-case object MsSqlServerDialect extends JdbcDialect {
- override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
- override def getCatalystType(
- sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
- if (typeName.contains("datetimeoffset")) {
- // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
- Option(StringType)
- } else None
- }
-
- override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
- case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
- case _ => None
- }
-}
-
-/**
- * :: DeveloperApi ::
- * Default Apache Derby dialect, mapping real on read
- * and string/byte/short/boolean/decimal on write.
- */
-@DeveloperApi
-case object DerbyDialect extends JdbcDialect {
- override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
- override def getCatalystType(
- sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
- if (sqlType == Types.REAL) Option(FloatType) else None
- }
-
- override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
- case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
- case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
- case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
- case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
- // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
- case (t: DecimalType) if (t.precision > 31) =>
- Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
- case _ => None
- }
-
-}
-
-/**
- * :: DeveloperApi ::
- * Default Oracle dialect, mapping a nonspecific numeric type to a general decimal type.
- */
-@DeveloperApi
-case object OracleDialect extends JdbcDialect {
- override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
- override def getCatalystType(
- sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
- // Handle NUMBER fields that have no precision/scale in special way
- // because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
- // For more details, please see
- // https://github.com/apache/spark/pull/8780#issuecomment-145598968
- // and
- // https://github.com/apache/spark/pull/8780#issuecomment-144541760
- if (sqlType == Types.NUMERIC && size == 0) {
- // This is sub-optimal as we have to pick a precision/scale in advance whereas the data
- // in Oracle is allowed to have different precision/scale for each value.
- Some(DecimalType(DecimalType.MAX_PRECISION, 10))
- } else {
- None
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
new file mode 100644
index 0000000000..3eb722b070
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import org.apache.spark.sql.types._
+
+
+private object MsSqlServerDialect extends JdbcDialect {
+
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
+
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ if (typeName.contains("datetimeoffset")) {
+ // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
+ Option(StringType)
+ } else {
+ None
+ }
+ }
+
+ override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+ case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
+ case _ => None
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
new file mode 100644
index 0000000000..da413ed1f0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types.{BooleanType, LongType, DataType, MetadataBuilder}
+
+
+private case object MySQLDialect extends JdbcDialect {
+
+ override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
+
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
+ // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
+ // byte arrays instead of longs.
+ md.putLong("binarylong", 1)
+ Option(LongType)
+ } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
+ Option(BooleanType)
+ } else None
+ }
+
+ override def quoteIdentifier(colName: String): String = {
+ s"`$colName`"
+ }
+
+ override def getTableExistsQuery(table: String): String = {
+ s"SELECT 1 FROM $table LIMIT 1"
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
new file mode 100644
index 0000000000..4165c38268
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private case object OracleDialect extends JdbcDialect {
+
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
+
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ // Handle NUMBER fields that have no precision/scale in special way
+ // because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
+ // For more details, please see
+ // https://github.com/apache/spark/pull/8780#issuecomment-145598968
+ // and
+ // https://github.com/apache/spark/pull/8780#issuecomment-144541760
+ if (sqlType == Types.NUMERIC && size == 0) {
+ // This is sub-optimal as we have to pick a precision/scale in advance whereas the data
+ // in Oracle is allowed to have different precision/scale for each value.
+ Option(DecimalType(DecimalType.MAX_PRECISION, 10))
+ } else {
+ None
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
new file mode 100644
index 0000000000..e701a7fcd9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private object PostgresDialect extends JdbcDialect {
+
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
+
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
+ Option(BinaryType)
+ } else if (sqlType == Types.OTHER && typeName.equals("cidr")) {
+ Option(StringType)
+ } else if (sqlType == Types.OTHER && typeName.equals("inet")) {
+ Option(StringType)
+ } else if (sqlType == Types.OTHER && typeName.equals("json")) {
+ Option(StringType)
+ } else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
+ Option(StringType)
+ } else None
+ }
+
+ override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+ case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
+ case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
+ case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
+ case _ => None
+ }
+
+ override def getTableExistsQuery(table: String): String = {
+ s"SELECT 1 FROM $table LIMIT 1"
+ }
+}