aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Yu <ehotou@gmail.com>2015-02-21 20:40:21 +0000
committerSean Owen <sowen@cloudera.com>2015-02-21 20:40:21 +0000
commit7683982faf920b8ac6cf46b79842450e7d46c5cc (patch)
treeb14d7bffc1dd2e76f3a2bf3d41e7fad018a9c2b8
parent7138816abe1060a1e967c4c77c72d5752586d557 (diff)
downloadspark-7683982faf920b8ac6cf46b79842450e7d46c5cc.tar.gz
spark-7683982faf920b8ac6cf46b79842450e7d46c5cc.tar.bz2
spark-7683982faf920b8ac6cf46b79842450e7d46c5cc.zip
[SPARK-5860][CORE] JdbcRDD: overflow on large range with high number of partitions
Fix a overflow bug in JdbcRDD when calculating partitions for large BIGINT ids Author: Evan Yu <ehotou@gmail.com> Closes #4701 from hotou/SPARK-5860 and squashes the following commits: 9e038d1 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level 7883ad9 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level c88755a [Evan Yu] [SPARK-5860][CORE] switch to BigInt instead of BigDecimal 4e9ff4f [Evan Yu] [SPARK-5860][CORE] JdbcRDD overflow on large range with high number of partitions
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala60
2 files changed, 50 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 4fe7622bda..e2267861e7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -62,11 +62,11 @@ class JdbcRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and - 1 on end
- val length = 1 + upperBound - lowerBound
+ val length = BigInt(1) + upperBound - lowerBound
(0 until numPartitions).map(i => {
- val start = lowerBound + ((i * length) / numPartitions).toLong
- val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
- new JdbcPartition(i, start, end)
+ val start = lowerBound + ((i * length) / numPartitions)
+ val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
+ new JdbcPartition(i, start.toLong, end.toLong)
}).toArray
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
index 6138d0bbd5..0dc59888f7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
@@ -29,22 +29,42 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
try {
- val create = conn.createStatement
- create.execute("""
- CREATE TABLE FOO(
- ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
- DATA INTEGER
- )""")
- create.close()
- val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
- (1 to 100).foreach { i =>
- insert.setInt(1, i * 2)
- insert.executeUpdate
+
+ try {
+ val create = conn.createStatement
+ create.execute("""
+ CREATE TABLE FOO(
+ ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
+ DATA INTEGER
+ )""")
+ create.close()
+ val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
+ (1 to 100).foreach { i =>
+ insert.setInt(1, i * 2)
+ insert.executeUpdate
+ }
+ insert.close()
+ } catch {
+ case e: SQLException if e.getSQLState == "X0Y32" =>
+ // table exists
}
- insert.close()
- } catch {
- case e: SQLException if e.getSQLState == "X0Y32" =>
+
+ try {
+ val create = conn.createStatement
+ create.execute("CREATE TABLE BIGINT_TEST(ID BIGINT NOT NULL, DATA INTEGER)")
+ create.close()
+ val insert = conn.prepareStatement("INSERT INTO BIGINT_TEST VALUES(?,?)")
+ (1 to 100).foreach { i =>
+ insert.setLong(1, 100000000000000000L + 4000000000000000L * i)
+ insert.setInt(2, i)
+ insert.executeUpdate
+ }
+ insert.close()
+ } catch {
+ case e: SQLException if e.getSQLState == "X0Y32" =>
// table exists
+ }
+
} finally {
conn.close()
}
@@ -62,6 +82,18 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
assert(rdd.count === 100)
assert(rdd.reduce(_+_) === 10100)
}
+
+ test("large id overflow") {
+ sc = new SparkContext("local", "test")
+ val rdd = new JdbcRDD(
+ sc,
+ () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
+ "SELECT DATA FROM BIGINT_TEST WHERE ? <= ID AND ID <= ?",
+ 1131544775L, 567279358897692673L, 20,
+ (r: ResultSet) => { r.getInt(1) } ).cache()
+ assert(rdd.count === 100)
+ assert(rdd.reduce(_+_) === 5050)
+ }
after {
try {