From 6bca8898a1aa4ca7161492229bac1748b3da2ad7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 18 Aug 2014 10:52:20 -0700 Subject: SPARK-3025 [SQL]: Allow JDBC clients to set a fair scheduler pool This definitely needs review as I am not familiar with this part of Spark. I tested this locally and it did seem to work. Author: Patrick Wendell Closes #1937 from pwendell/scheduler and squashes the following commits: b858e33 [Patrick Wendell] SPARK-3025: Allow JDBC clients to set a fair scheduler pool --- .../server/SparkSQLOperationManager.scala | 27 ++++++++++++++++------ 1 file changed, 20 insertions(+), 7 deletions(-) (limited to 'sql/hive-thriftserver') diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 9338e8121b..699a1103f3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -17,24 +17,24 @@ package org.apache.spark.sql.hive.thriftserver.server -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer -import scala.math.{random, round} - import java.sql.Timestamp import java.util.{Map => JMap} +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, Map} +import scala.math.{random, round} + import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} import org.apache.hive.service.cli.session.HiveSession - import org.apache.spark.Logging +import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} +import org.apache.spark.sql.catalyst.plans.logical.SetCommand import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -import org.apache.spark.sql.{SchemaRDD, Row => SparkRow} +import org.apache.spark.sql.hive.thriftserver.ReflectionUtils /** * Executes queries using Spark SQL, and maintains a list of handles to active queries. @@ -43,6 +43,9 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") + // TODO: Currenlty this will grow infinitely, even as sessions expire + val sessionToActivePool = Map[HiveSession, String]() + override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, @@ -165,8 +168,18 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) + result.queryExecution.logical match { + case SetCommand(Some(key), Some(value)) if (key == SQLConf.THRIFTSERVER_POOL) => + sessionToActivePool(parentSession) = value + logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") + case _ => + } + val groupId = round(random * 1000000).toString hiveContext.sparkContext.setJobGroup(groupId, statement) + sessionToActivePool.get(parentSession).foreach { pool => + hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) + } iter = { val resultRdd = result.queryExecution.toRdd val useIncrementalCollect = -- cgit v1.2.3