aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala23
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala5
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala5
3 files changed, 27 insertions, 6 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index 65d910a0c3..bba29b2bdc 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -267,4 +267,27 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
}
}
+
+ test("SPARK-4292 regression: result set iterator issue") {
+ withJdbcStatement() { statement =>
+ val dataFilePath =
+ Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
+
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_4292",
+ "CREATE TABLE test_4292(key INT, val STRING)",
+ s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
+
+ queries.foreach(statement.execute)
+
+ val resultSet = statement.executeQuery("SELECT key FROM test_4292")
+
+ Seq(238, 86, 311, 27, 165).foreach { key =>
+ resultSet.next()
+ assert(resultSet.getInt(1) == key)
+ }
+
+ statement.executeQuery("DROP TABLE IF EXISTS test_4292")
+ }
+ }
}
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 8077d0ec46..e3ba9914c6 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -202,13 +202,12 @@ private[hive] class SparkExecuteStatementOperation(
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
- val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
- resultRdd.toLocalIterator
+ result.toLocalIterator
} else {
- resultRdd.collect().iterator
+ result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 2c1983de1d..f2ceba8282 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -87,13 +87,12 @@ private[hive] class SparkExecuteStatementOperation(
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = {
- val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
- resultRdd.toLocalIterator
+ result.toLocalIterator
} else {
- resultRdd.collect().iterator
+ result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray