aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-08-18 10:52:20 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-18 10:52:20 -0700
commit6bca8898a1aa4ca7161492229bac1748b3da2ad7 (patch)
tree0b6ae9b8512608a5c207c839f62d597288d9f2cf /sql/hive-thriftserver
parent4bf3de71074053af94f077c99e9c65a1962739e1 (diff)
downloadspark-6bca8898a1aa4ca7161492229bac1748b3da2ad7.tar.gz
spark-6bca8898a1aa4ca7161492229bac1748b3da2ad7.tar.bz2
spark-6bca8898a1aa4ca7161492229bac1748b3da2ad7.zip
SPARK-3025 [SQL]: Allow JDBC clients to set a fair scheduler pool
This definitely needs review as I am not familiar with this part of Spark. I tested this locally and it did seem to work. Author: Patrick Wendell <pwendell@gmail.com> Closes #1937 from pwendell/scheduler and squashes the following commits: b858e33 [Patrick Wendell] SPARK-3025: Allow JDBC clients to set a fair scheduler pool
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala27
1 files changed, 20 insertions, 7 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 9338e8121b..699a1103f3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -17,24 +17,24 @@
package org.apache.spark.sql.hive.thriftserver.server
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.math.{random, round}
-
import java.sql.Timestamp
import java.util.{Map => JMap}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map}
+import scala.math.{random, round}
+
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
-
import org.apache.spark.Logging
+import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
+import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
@@ -43,6 +43,9 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
+ // TODO: Currenlty this will grow infinitely, even as sessions expire
+ val sessionToActivePool = Map[HiveSession, String]()
+
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
@@ -165,8 +168,18 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
+ result.queryExecution.logical match {
+ case SetCommand(Some(key), Some(value)) if (key == SQLConf.THRIFTSERVER_POOL) =>
+ sessionToActivePool(parentSession) = value
+ logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
+ case _ =>
+ }
+
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
+ sessionToActivePool.get(parentSession).foreach { pool =>
+ hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+ }
iter = {
val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =