aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2014-11-07 12:55:11 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-07 12:55:11 -0800
commitd6e55524437026c0c76addeba8f99249a8316716 (patch)
treeee0e39397f50ea2fb82e2b2cb1faeadfb0d5104c /sql
parentac70c972a51952f801fd02dd5962c0a0c1aba8f8 (diff)
downloadspark-d6e55524437026c0c76addeba8f99249a8316716.tar.gz
spark-d6e55524437026c0c76addeba8f99249a8316716.tar.bz2
spark-d6e55524437026c0c76addeba8f99249a8316716.zip
[SPARK-4292][SQL] Result set iterator bug in JDBC/ODBC
select * from src, get the wrong result set as follows: ``` ... | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 309 | val_309 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | | 97 | val_97 | ... ``` Author: wangfei <wangfei1@huawei.com> Closes #3149 from scwf/SPARK-4292 and squashes the following commits: 1574a43 [wangfei] using result.collect 8b2d845 [wangfei] adding test f64eddf [wangfei] result set iter bug
Diffstat (limited to 'sql')
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala23
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala5
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala5
3 files changed, 27 insertions, 6 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index 65d910a0c3..bba29b2bdc 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -267,4 +267,27 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
}
}
+
+ test("SPARK-4292 regression: result set iterator issue") {
+ withJdbcStatement() { statement =>
+ val dataFilePath =
+ Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
+
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_4292",
+ "CREATE TABLE test_4292(key INT, val STRING)",
+ s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
+
+ queries.foreach(statement.execute)
+
+ val resultSet = statement.executeQuery("SELECT key FROM test_4292")
+
+ Seq(238, 86, 311, 27, 165).foreach { key =>
+ resultSet.next()
+ assert(resultSet.getInt(1) == key)
+ }
+
+ statement.executeQuery("DROP TABLE IF EXISTS test_4292")
+ }
+ }
}
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 8077d0ec46..e3ba9914c6 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -202,13 +202,12 @@ private[hive] class SparkExecuteStatementOperation(
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
- val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
- resultRdd.toLocalIterator
+ result.toLocalIterator
} else {
- resultRdd.collect().iterator
+ result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 2c1983de1d..f2ceba8282 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -87,13 +87,12 @@ private[hive] class SparkExecuteStatementOperation(
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = {
- val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
- resultRdd.toLocalIterator
+ result.toLocalIterator
} else {
- resultRdd.collect().iterator
+ result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray