aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-08-11 20:08:06 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-11 20:08:06 -0700
commit21a95ef051f7b23a80d147aadb00dfa4ebb169b0 (patch)
tree608585c64d4acd7a16db99b369d6a328e3375d7b
parent490ecfa20327a636289321ea447722aa32b81657 (diff)
downloadspark-21a95ef051f7b23a80d147aadb00dfa4ebb169b0.tar.gz
spark-21a95ef051f7b23a80d147aadb00dfa4ebb169b0.tar.bz2
spark-21a95ef051f7b23a80d147aadb00dfa4ebb169b0.zip
[SPARK-2590][SQL] Added option to handle incremental collection, disabled by default
JIRA issue: [SPARK-2590](https://issues.apache.org/jira/browse/SPARK-2590) Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1853 from liancheng/inc-collect-option and squashes the following commits: cb3ea45 [Cheng Lian] Moved incremental collection option to Thrift server 43ce3aa [Cheng Lian] Changed incremental collect option name 623abde [Cheng Lian] Added option to handle incremental collection, disabled by default
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala11
1 files changed, 10 insertions, 1 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index dee092159d..f192f490ac 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -132,7 +132,16 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
- iter = result.queryExecution.toRdd.toLocalIterator
+ iter = {
+ val resultRdd = result.queryExecution.toRdd
+ val useIncrementalCollect =
+ hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+ if (useIncrementalCollect) {
+ resultRdd.toLocalIterator
+ } else {
+ resultRdd.collect().iterator
+ }
+ }
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {