aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2017-01-10 13:27:55 +0000
committerSean Owen <sowen@cloudera.com>2017-01-10 13:27:55 +0000
commita2c6adcc5d2702d2f0e9b239517353335e5f911e (patch)
tree6691ef8d2fc499df589622e256fc60eb5e82b5d8 /sql/hive-thriftserver
parent2cfd41ac02193aaf121afcddcb6383f4d075ea1e (diff)
downloadspark-a2c6adcc5d2702d2f0e9b239517353335e5f911e.tar.gz
spark-a2c6adcc5d2702d2f0e9b239517353335e5f911e.tar.bz2
spark-a2c6adcc5d2702d2f0e9b239517353335e5f911e.zip
[SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server
## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #16440 from dongjoon-hyun/SPARK-18857.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala30
1 files changed, 19 insertions, 11 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index aeabd6a158..517b01f183 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation(
with Logging {
private var result: DataFrame = _
+
+ // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST.
+ // This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`.
+ // In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution.
+ private var resultList: Option[Array[SparkRow]] = _
+
private var iter: Iterator[SparkRow] = _
- private var iterHeader: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private var statementId: String = _
@@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation(
// Reset iter to header when fetching start from first row
if (order.equals(FetchOrientation.FETCH_FIRST)) {
- val (ita, itb) = iterHeader.duplicate
- iter = ita
- iterHeader = itb
+ iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
+ resultList = None
+ result.toLocalIterator.asScala
+ } else {
+ if (resultList.isEmpty) {
+ resultList = Some(result.collect())
+ }
+ resultList.get.iterator
+ }
}
if (!iter.hasNext) {
@@ -227,17 +238,14 @@ private[hive] class SparkExecuteStatementOperation(
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
- val useIncrementalCollect =
- sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
- if (useIncrementalCollect) {
+ if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
+ resultList = None
result.toLocalIterator.asScala
} else {
- result.collect().iterator
+ resultList = Some(result.collect())
+ resultList.get.iterator
}
}
- val (itra, itrb) = iter.duplicate
- iterHeader = itra
- iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>