aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-06-20 21:49:33 -0700
committerReynold Xin <rxin@databricks.com>2016-06-20 21:49:33 -0700
commitd9a3a2a0bec504d17d3b94104d449ee3bd850120 (patch)
tree40b29a5c6c3221d2e75b16e7dddd19f9514b5aa6 /sql
parentc775bf09e0c3540f76de3f15d3fd35112a4912c1 (diff)
downloadspark-d9a3a2a0bec504d17d3b94104d449ee3bd850120.tar.gz
spark-d9a3a2a0bec504d17d3b94104d449ee3bd850120.tar.bz2
spark-d9a3a2a0bec504d17d3b94104d449ee3bd850120.zip
[SPARK-16056][SPARK-16057][SPARK-16058][SQL] Fix Multiple Bugs in Column Partitioning in JDBC Source
#### What changes were proposed in this pull request? This PR is to fix the following bugs: **Issue 1: Wrong Results when lowerBound is larger than upperBound in Column Partitioning** ```scala spark.read.jdbc( url = urlWithUserAndPass, table = "TEST.seq", columnName = "id", lowerBound = 4, upperBound = 0, numPartitions = 3, connectionProperties = new Properties) ``` **Before code changes:** The returned results are wrong and the generated partitions are wrong: ``` Part 0 id < 3 or id is null Part 1 id >= 3 AND id < 2 Part 2 id >= 2 ``` **After code changes:** Issue an `IllegalArgumentException` exception: ``` Operation not allowed: the lower bound of partitioning column is larger than the upper bound. lowerBound: 5; higherBound: 1 ``` **Issue 2: numPartitions is more than the number of key values between upper and lower bounds** ```scala spark.read.jdbc( url = urlWithUserAndPass, table = "TEST.seq", columnName = "id", lowerBound = 1, upperBound = 5, numPartitions = 10, connectionProperties = new Properties) ``` **Before code changes:** Returned correct results but the generated partitions are very inefficient, like: ``` Partition 0: id < 1 or id is null Partition 1: id >= 1 AND id < 1 Partition 2: id >= 1 AND id < 1 Partition 3: id >= 1 AND id < 1 Partition 4: id >= 1 AND id < 1 Partition 5: id >= 1 AND id < 1 Partition 6: id >= 1 AND id < 1 Partition 7: id >= 1 AND id < 1 Partition 8: id >= 1 AND id < 1 Partition 9: id >= 1 ``` **After code changes:** Adjust `numPartitions` and can return the correct answers: ``` Partition 0: id < 2 or id is null Partition 1: id >= 2 AND id < 3 Partition 2: id >= 3 AND id < 4 Partition 3: id >= 4 ``` **Issue 3: java.lang.ArithmeticException when numPartitions is zero** ```Scala spark.read.jdbc( url = urlWithUserAndPass, table = "TEST.seq", columnName = "id", lowerBound = 0, upperBound = 4, numPartitions = 0, connectionProperties = new Properties) ``` **Before code changes:** Got the following exception: ``` java.lang.ArithmeticException: / by zero ``` **After code changes:** Able to return a correct answer by disabling column partitioning when numPartitions is equal to or less than zero #### How was this patch tested? Added test cases to verify the results Author: gatorsmile <gatorsmile@gmail.com> Closes #13773 from gatorsmile/jdbcPartitioning.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala48
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala65
2 files changed, 98 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 233b7891d6..11613dd912 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -21,6 +21,7 @@ import java.util.Properties
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.internal.Logging
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
@@ -36,7 +37,7 @@ private[sql] case class JDBCPartitioningInfo(
upperBound: Long,
numPartitions: Int)
-private[sql] object JDBCRelation {
+private[sql] object JDBCRelation extends Logging {
/**
* Given a partitioning schematic (a column of integral type, a number of
* partitions, and upper and lower bounds on the column's value), generate
@@ -52,29 +53,46 @@ private[sql] object JDBCRelation {
* @return an array of partitions with where clause for each partition
*/
def columnPartition(partitioning: JDBCPartitioningInfo): Array[Partition] = {
- if (partitioning == null) return Array[Partition](JDBCPartition(null, 0))
+ if (partitioning == null || partitioning.numPartitions <= 1 ||
+ partitioning.lowerBound == partitioning.upperBound) {
+ return Array[Partition](JDBCPartition(null, 0))
+ }
- val numPartitions = partitioning.numPartitions
- val column = partitioning.column
- if (numPartitions == 1) return Array[Partition](JDBCPartition(null, 0))
+ val lowerBound = partitioning.lowerBound
+ val upperBound = partitioning.upperBound
+ require (lowerBound <= upperBound,
+ "Operation not allowed: the lower bound of partitioning column is larger than the upper " +
+ s"bound. Lower bound: $lowerBound; Upper bound: $upperBound")
+
+ val numPartitions =
+ if ((upperBound - lowerBound) >= partitioning.numPartitions) {
+ partitioning.numPartitions
+ } else {
+ logWarning("The number of partitions is reduced because the specified number of " +
+ "partitions is less than the difference between upper bound and lower bound. " +
+ s"Updated number of partitions: ${upperBound - lowerBound}; Input number of " +
+ s"partitions: ${partitioning.numPartitions}; Lower bound: $lowerBound; " +
+ s"Upper bound: $upperBound.")
+ upperBound - lowerBound
+ }
// Overflow and silliness can happen if you subtract then divide.
// Here we get a little roundoff, but that's (hopefully) OK.
- val stride: Long = (partitioning.upperBound / numPartitions
- - partitioning.lowerBound / numPartitions)
+ val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
+ val column = partitioning.column
var i: Int = 0
- var currentValue: Long = partitioning.lowerBound
+ var currentValue: Long = lowerBound
var ans = new ArrayBuffer[Partition]()
while (i < numPartitions) {
- val lowerBound = if (i != 0) s"$column >= $currentValue" else null
+ val lBound = if (i != 0) s"$column >= $currentValue" else null
currentValue += stride
- val upperBound = if (i != numPartitions - 1) s"$column < $currentValue" else null
+ val uBound = if (i != numPartitions - 1) s"$column < $currentValue" else null
val whereClause =
- if (upperBound == null) {
- lowerBound
- } else if (lowerBound == null) {
- s"$upperBound or $column is null"
+ if (uBound == null) {
+ lBound
+ } else if (lBound == null) {
+ s"$uBound or $column is null"
} else {
- s"$lowerBound AND $upperBound"
+ s"$lBound AND $uBound"
}
ans += JDBCPartition(whereClause, i)
i = i + 1
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d6ec40c18b..fd6671a39b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -184,6 +184,16 @@ class JDBCSuite extends SparkFunSuite
"insert into test.emp values ('kathy', null, null)").executeUpdate()
conn.commit()
+ conn.prepareStatement(
+ "create table test.seq(id INTEGER)").executeUpdate()
+ (0 to 6).foreach { value =>
+ conn.prepareStatement(
+ s"insert into test.seq values ($value)").executeUpdate()
+ }
+ conn.prepareStatement(
+ "insert into test.seq values (null)").executeUpdate()
+ conn.commit()
+
sql(
s"""
|CREATE TEMPORARY TABLE nullparts
@@ -373,6 +383,61 @@ class JDBCSuite extends SparkFunSuite
.collect().length === 4)
}
+ test("Partitioning on column where numPartitions is zero") {
+ val res = spark.read.jdbc(
+ url = urlWithUserAndPass,
+ table = "TEST.seq",
+ columnName = "id",
+ lowerBound = 0,
+ upperBound = 4,
+ numPartitions = 0,
+ connectionProperties = new Properties
+ )
+ assert(res.count() === 8)
+ }
+
+ test("Partitioning on column where numPartitions are more than the number of total rows") {
+ val res = spark.read.jdbc(
+ url = urlWithUserAndPass,
+ table = "TEST.seq",
+ columnName = "id",
+ lowerBound = 1,
+ upperBound = 5,
+ numPartitions = 10,
+ connectionProperties = new Properties
+ )
+ assert(res.count() === 8)
+ }
+
+ test("Partitioning on column where lowerBound is equal to upperBound") {
+ val res = spark.read.jdbc(
+ url = urlWithUserAndPass,
+ table = "TEST.seq",
+ columnName = "id",
+ lowerBound = 5,
+ upperBound = 5,
+ numPartitions = 4,
+ connectionProperties = new Properties
+ )
+ assert(res.count() === 8)
+ }
+
+ test("Partitioning on column where lowerBound is larger than upperBound") {
+ val e = intercept[IllegalArgumentException] {
+ spark.read.jdbc(
+ url = urlWithUserAndPass,
+ table = "TEST.seq",
+ columnName = "id",
+ lowerBound = 5,
+ upperBound = 1,
+ numPartitions = 3,
+ connectionProperties = new Properties
+ )
+ }.getMessage
+ assert(e.contains("Operation not allowed: the lower bound of partitioning column " +
+ "is larger than the upper bound. Lower bound: 5; Upper bound: 1"))
+ }
+
test("SELECT * on partitioned table with a nullable partition column") {
assert(sql("SELECT * FROM nullparts").collect().size == 4)
}