aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala38
2 files changed, 45 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index ee6373d03e..9e336422d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -44,6 +44,12 @@ private[sql] object JDBCRelation {
* exactly once. The parameters minValue and maxValue are advisory in that
* incorrect values may cause the partitioning to be poor, but no data
* will fail to be represented.
+ *
+ * Null value predicate is added to the first partition where clause to include
+ * the rows with null value for the partitions column.
+ *
+ * @param partitioning partition information to generate the where clause for each partition
+ * @return an array of partitions with where clause for each partition
*/
def columnPartition(partitioning: JDBCPartitioningInfo): Array[Partition] = {
if (partitioning == null) return Array[Partition](JDBCPartition(null, 0))
@@ -66,7 +72,7 @@ private[sql] object JDBCRelation {
if (upperBound == null) {
lowerBound
} else if (lowerBound == null) {
- upperBound
+ s"$upperBound or $column is null"
} else {
s"$lowerBound AND $upperBound"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index f8a9a95c87..30a5e2ea4a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -171,6 +171,27 @@ class JDBCSuite extends SparkFunSuite
|OPTIONS (url '$url', dbtable 'TEST.NULLTYPES', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
+ conn.prepareStatement(
+ "create table test.emp(name TEXT(32) NOT NULL," +
+ " theid INTEGER, \"Dept\" INTEGER)").executeUpdate()
+ conn.prepareStatement(
+ "insert into test.emp values ('fred', 1, 10)").executeUpdate()
+ conn.prepareStatement(
+ "insert into test.emp values ('mary', 2, null)").executeUpdate()
+ conn.prepareStatement(
+ "insert into test.emp values ('joe ''foo'' \"bar\"', 3, 30)").executeUpdate()
+ conn.prepareStatement(
+ "insert into test.emp values ('kathy', null, null)").executeUpdate()
+ conn.commit()
+
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE nullparts
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url', dbtable 'TEST.EMP', user 'testUser', password 'testPass',
+ |partitionColumn '"Dept"', lowerBound '1', upperBound '4', numPartitions '4')
+ """.stripMargin.replaceAll("\n", " "))
+
// Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
}
@@ -338,6 +359,23 @@ class JDBCSuite extends SparkFunSuite
.collect().length === 3)
}
+ test("Partioning on column that might have null values.") {
+ assert(
+ sqlContext.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties)
+ .collect().length === 4)
+ assert(
+ sqlContext.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties)
+ .collect().length === 4)
+ // partitioning on a nullable quoted column
+ assert(
+ sqlContext.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties)
+ .collect().length === 4)
+ }
+
+ test("SELECT * on partitioned table with a nullable partioncolumn") {
+ assert(sql("SELECT * FROM nullparts").collect().size == 4)
+ }
+
test("H2 integral types") {
val rows = sql("SELECT * FROM inttypes WHERE A IS NOT NULL").collect()
assert(rows.length === 1)