diff options
author | wangfei <wangfei1@huawei.com> | 2014-11-07 12:55:11 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-07 12:55:11 -0800 |
commit | d6e55524437026c0c76addeba8f99249a8316716 (patch) | |
tree | ee0e39397f50ea2fb82e2b2cb1faeadfb0d5104c /sql | |
parent | ac70c972a51952f801fd02dd5962c0a0c1aba8f8 (diff) | |
download | spark-d6e55524437026c0c76addeba8f99249a8316716.tar.gz spark-d6e55524437026c0c76addeba8f99249a8316716.tar.bz2 spark-d6e55524437026c0c76addeba8f99249a8316716.zip |
[SPARK-4292][SQL] Result set iterator bug in JDBC/ODBC
select * from src, get the wrong result set as follows:
```
...
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 309 | val_309 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
| 97 | val_97 |
...
```
Author: wangfei <wangfei1@huawei.com>
Closes #3149 from scwf/SPARK-4292 and squashes the following commits:
1574a43 [wangfei] using result.collect
8b2d845 [wangfei] adding test
f64eddf [wangfei] result set iter bug
Diffstat (limited to 'sql')
3 files changed, 27 insertions, 6 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 65d910a0c3..bba29b2bdc 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -267,4 +267,27 @@ class HiveThriftServer2Suite extends FunSuite with Logging { assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") } } + + test("SPARK-4292 regression: result set iterator issue") { + withJdbcStatement() { statement => + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + + val queries = Seq( + "DROP TABLE IF EXISTS test_4292", + "CREATE TABLE test_4292(key INT, val STRING)", + s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292") + + queries.foreach(statement.execute) + + val resultSet = statement.executeQuery("SELECT key FROM test_4292") + + Seq(238, 86, 311, 27, 165).foreach { key => + resultSet.next() + assert(resultSet.getInt(1) == key) + } + + statement.executeQuery("DROP TABLE IF EXISTS test_4292") + } + } } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 8077d0ec46..e3ba9914c6 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -202,13 +202,12 @@ private[hive] class SparkExecuteStatementOperation( hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } iter = { - val resultRdd = result.queryExecution.toRdd val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean if (useIncrementalCollect) { - resultRdd.toLocalIterator + result.toLocalIterator } else { - resultRdd.collect().iterator + result.collect().iterator } } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index 2c1983de1d..f2ceba8282 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -87,13 +87,12 @@ private[hive] class SparkExecuteStatementOperation( val groupId = round(random * 1000000).toString hiveContext.sparkContext.setJobGroup(groupId, statement) iter = { - val resultRdd = result.queryExecution.toRdd val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean if (useIncrementalCollect) { - resultRdd.toLocalIterator + result.toLocalIterator } else { - resultRdd.collect().iterator + result.collect().iterator } } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray |