diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-08-11 20:08:06 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-11 20:08:06 -0700 |
commit | 21a95ef051f7b23a80d147aadb00dfa4ebb169b0 (patch) | |
tree | 608585c64d4acd7a16db99b369d6a328e3375d7b /sql | |
parent | 490ecfa20327a636289321ea447722aa32b81657 (diff) | |
download | spark-21a95ef051f7b23a80d147aadb00dfa4ebb169b0.tar.gz spark-21a95ef051f7b23a80d147aadb00dfa4ebb169b0.tar.bz2 spark-21a95ef051f7b23a80d147aadb00dfa4ebb169b0.zip |
[SPARK-2590][SQL] Added option to handle incremental collection, disabled by default
JIRA issue: [SPARK-2590](https://issues.apache.org/jira/browse/SPARK-2590)
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #1853 from liancheng/inc-collect-option and squashes the following commits:
cb3ea45 [Cheng Lian] Moved incremental collection option to Thrift server
43ce3aa [Cheng Lian] Changed incremental collect option name
623abde [Cheng Lian] Added option to handle incremental collection, disabled by default
Diffstat (limited to 'sql')
-rw-r--r-- | sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index dee092159d..f192f490ac 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -132,7 +132,16 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage logDebug(result.queryExecution.toString()) val groupId = round(random * 1000000).toString hiveContext.sparkContext.setJobGroup(groupId, statement) - iter = result.queryExecution.toRdd.toLocalIterator + iter = { + val resultRdd = result.queryExecution.toRdd + val useIncrementalCollect = + hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + if (useIncrementalCollect) { + resultRdd.toLocalIterator + } else { + resultRdd.collect().iterator + } + } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray setHasResultSet(true) } catch { |