aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala12
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala48
2 files changed, 60 insertions, 0 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e8bcdd76ef..b2717ec54e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -51,6 +51,7 @@ private[hive] class SparkExecuteStatementOperation(
private var result: DataFrame = _
private var iter: Iterator[SparkRow] = _
+ private var iterHeader: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private var statementId: String = _
@@ -110,6 +111,14 @@ private[hive] class SparkExecuteStatementOperation(
assertState(OperationState.FINISHED)
setHasResultSet(true)
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
+
+ // Reset iter to header when fetching start from first row
+ if (order.equals(FetchOrientation.FETCH_FIRST)) {
+ val (ita, itb) = iterHeader.duplicate
+ iter = ita
+ iterHeader = itb
+ }
+
if (!iter.hasNext) {
resultRowSet
} else {
@@ -228,6 +237,9 @@ private[hive] class SparkExecuteStatementOperation(
result.collect().iterator
}
}
+ val (itra, itrb) = iter.duplicate
+ iterHeader = itra
+ iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index e388c2a082..8f2c4fafa0 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -36,6 +36,8 @@ import org.apache.hive.service.auth.PlainSaslHelper
import org.apache.hive.service.cli.GetInfoType
import org.apache.hive.service.cli.thrift.TCLIService.Client
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
+import org.apache.hive.service.cli.FetchOrientation
+import org.apache.hive.service.cli.FetchType
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.scalatest.BeforeAndAfterAll
@@ -91,6 +93,52 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}
+ test("SPARK-16563 ThriftCLIService FetchResults repeat fetching result") {
+ withCLIServiceClient { client =>
+ val user = System.getProperty("user.name")
+ val sessionHandle = client.openSession(user, "")
+
+ withJdbcStatement { statement =>
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_16563",
+ "CREATE TABLE test_16563(key INT, val STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_16563")
+
+ queries.foreach(statement.execute)
+ val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
+ val operationHandle = client.executeStatement(
+ sessionHandle,
+ "SELECT * FROM test_16563",
+ confOverlay)
+
+ // Fetch result first time
+ assertResult(5, "Fetching result first time from next row") {
+
+ val rows_next = client.fetchResults(
+ operationHandle,
+ FetchOrientation.FETCH_NEXT,
+ 1000,
+ FetchType.QUERY_OUTPUT)
+
+ rows_next.numRows()
+ }
+
+ // Fetch result second time from first row
+ assertResult(5, "Repeat fetching result from first row") {
+
+ val rows_first = client.fetchResults(
+ operationHandle,
+ FetchOrientation.FETCH_FIRST,
+ 1000,
+ FetchType.QUERY_OUTPUT)
+
+ rows_first.numRows()
+ }
+ statement.executeQuery("DROP TABLE IF EXISTS test_16563")
+ }
+ }
+ }
+
test("JDBC query execution") {
withJdbcStatement { statement =>
val queries = Seq(