diff options
author | wangfei <wangfei1@huawei.com> | 2014-10-31 11:27:59 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-10-31 11:27:59 -0700 |
commit | 7c41d135709c148d4fa3a1b06b5905715c970519 (patch) | |
tree | 78206ad76d5ae1f48917b53f6b68dcf3e01ca765 /sql/hive-thriftserver/src | |
parent | adb6415c1d65d466a10c50e8dc6cb3bf2805ebdf (diff) | |
download | spark-7c41d135709c148d4fa3a1b06b5905715c970519.tar.gz spark-7c41d135709c148d4fa3a1b06b5905715c970519.tar.bz2 spark-7c41d135709c148d4fa3a1b06b5905715c970519.zip |
[SPARK-3826][SQL]enable hive-thriftserver to support hive-0.13.1
In #2241 hive-thriftserver is not enabled. This patch enable hive-thriftserver to support hive-0.13.1 by using a shim layer refer to #2241.
1 A light shim layer(code in sql/hive-thriftserver/hive-version) for each different hive version to handle api compatibility
2 New pom profiles "hive-default" and "hive-versions"(copy from #2241) to activate different hive version
3 SBT cmd for different version as follows:
hive-0.12.0 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.12.0 assembly
hive-0.13.1 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.13.1 assembly
4 Since hive-thriftserver depend on hive subproject, this patch should be merged with #2241 to enable hive-0.13.1 for hive-thriftserver
Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>
Closes #2685 from scwf/shim-thriftserver1 and squashes the following commits:
f26f3be [wangfei] remove clean to save time
f5cac74 [wangfei] remove local hivecontext test
578234d [wangfei] use new shaded hive
18fb1ff [wangfei] exclude kryo in hive pom
fa21d09 [wangfei] clean package assembly/assembly
8a4daf2 [wangfei] minor fix
0d7f6cf [wangfei] address comments
f7c93ae [wangfei] adding build with hive 0.13 before running tests
bcf943f [wangfei] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1
c359822 [wangfei] reuse getCommandProcessor in hiveshim
52674a4 [scwf] sql/hive included since examples depend on it
3529e98 [scwf] move hive module to hive profile
f51ff4e [wangfei] update and fix conflicts
f48d3a5 [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1
41f727b [scwf] revert pom changes
13afde0 [scwf] fix small bug
4b681f4 [scwf] enable thriftserver in profile hive-0.13.1
0bc53aa [scwf] fixed when result filed is null
dfd1c63 [scwf] update run-tests to run hive-0.12.0 default now
c6da3ce [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver
7c66b8e [scwf] update pom according spark-2706
ae47489 [scwf] update and fix conflicts
Diffstat (limited to 'sql/hive-thriftserver/src')
-rw-r--r-- | sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala (renamed from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala) | 18 | ||||
-rwxr-xr-x | sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 6 | ||||
-rw-r--r-- | sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala | 19 | ||||
-rw-r--r-- | sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala | 169 |
4 files changed, 24 insertions, 188 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala index a5c457c677..fcb302edbf 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala @@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.Logging import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext) - extends Driver with Logging { +private[hive] abstract class AbstractSparkSQLDriver( + val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging { - private var tableSchema: Schema = _ - private var hiveResponse: Seq[String] = _ + private[hive] var tableSchema: Schema = _ + private[hive] var hiveResponse: Seq[String] = _ override def init(): Unit = { } @@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo override def getSchema: Schema = tableSchema - override def getResults(res: JArrayList[String]): Boolean = { - if (hiveResponse == null) { - false - } else { - res.addAll(hiveResponse) - hiveResponse = null - true - } - } - override def destroy() { super.destroy() hiveResponse = null diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7ba4564602..2cd02ae926 100755 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.thrift.transport.TSocket import org.apache.spark.Logging +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim private[hive] object SparkSQLCLIDriver { private var prompt = "spark-sql" @@ -116,7 +118,7 @@ private[hive] object SparkSQLCLIDriver { } } - if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) { + if (!sessionState.isRemoteMode) { // Hadoop-20 and above - we need to augment classpath using hiveconf // components. // See also: code in ExecDriver.java @@ -258,7 +260,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { } else { var ret = 0 val hconf = conf.asInstanceOf[HiveConf] - val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf) + val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf) if (proc != null) { if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 42cbf363b2..a78311fc48 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -24,6 +24,7 @@ import java.util.{List => JList} import javax.security.auth.login.LoginException import org.apache.commons.logging.Log +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hive.service.Service.STATE @@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext) val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext) setSuperField(this, "sessionManager", sparkSqlSessionManager) addService(sparkSqlSessionManager) + var sparkServiceUGI: UserGroupInformation = null - try { - HiveAuthFactory.loginFromKeytab(hiveConf) - val serverUserName = ShimLoader.getHadoopShims - .getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf)) - setSuperField(this, "serverUserName", serverUserName) - } catch { - case e @ (_: IOException | _: LoginException) => - throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) + if (ShimLoader.getHadoopShims().isSecurityEnabled()) { + try { + HiveAuthFactory.loginFromKeytab(hiveConf) + sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) + HiveThriftServerShim.setServerUserName(sparkServiceUGI, this) + } catch { + case e @ (_: IOException | _: LoginException) => + throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) + } } initCompositeService(hiveConf) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index accf61576b..2a4f24132c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -17,24 +17,15 @@ package org.apache.spark.sql.hive.thriftserver.server -import java.sql.Timestamp import java.util.{Map => JMap} +import scala.collection.mutable.Map -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, Map} -import scala.math.{random, round} - -import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging -import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} -import org.apache.spark.sql.catalyst.plans.logical.SetCommand -import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, ReflectionUtils} /** * Executes queries using Spark SQL, and maintains a list of handles to active queries. @@ -54,158 +45,8 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) { - private var result: SchemaRDD = _ - private var iter: Iterator[SparkRow] = _ - private var dataTypes: Array[DataType] = _ - - def close(): Unit = { - // RDDs will be cleaned automatically upon garbage collection. - logDebug("CLOSING") - } - - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { - if (!iter.hasNext) { - new RowSet() - } else { - // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int - val maxRows = maxRowsL.toInt - var curRow = 0 - var rowSet = new ArrayBuffer[Row](maxRows.min(1024)) - - while (curRow < maxRows && iter.hasNext) { - val sparkRow = iter.next() - val row = new Row() - var curCol = 0 - - while (curCol < sparkRow.length) { - if (sparkRow.isNullAt(curCol)) { - addNullColumnValue(sparkRow, row, curCol) - } else { - addNonNullColumnValue(sparkRow, row, curCol) - } - curCol += 1 - } - rowSet += row - curRow += 1 - } - new RowSet(rowSet, 0) - } - } - - def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) { - dataTypes(ordinal) match { - case StringType => - to.addString(from(ordinal).asInstanceOf[String]) - case IntegerType => - to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal))) - case BooleanType => - to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal))) - case DoubleType => - to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal))) - case FloatType => - to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal))) - case DecimalType => - val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal - to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal))) - case LongType => - to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal))) - case ByteType => - to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal))) - case ShortType => - to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal))) - case TimestampType => - to.addColumnValue( - ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp])) - case BinaryType | _: ArrayType | _: StructType | _: MapType => - val hiveString = result - .queryExecution - .asInstanceOf[HiveContext#QueryExecution] - .toHiveString((from.get(ordinal), dataTypes(ordinal))) - to.addColumnValue(ColumnValue.stringValue(hiveString)) - } - } - - def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) { - dataTypes(ordinal) match { - case StringType => - to.addString(null) - case IntegerType => - to.addColumnValue(ColumnValue.intValue(null)) - case BooleanType => - to.addColumnValue(ColumnValue.booleanValue(null)) - case DoubleType => - to.addColumnValue(ColumnValue.doubleValue(null)) - case FloatType => - to.addColumnValue(ColumnValue.floatValue(null)) - case DecimalType => - to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal)) - case LongType => - to.addColumnValue(ColumnValue.longValue(null)) - case ByteType => - to.addColumnValue(ColumnValue.byteValue(null)) - case ShortType => - to.addColumnValue(ColumnValue.shortValue(null)) - case TimestampType => - to.addColumnValue(ColumnValue.timestampValue(null)) - case BinaryType | _: ArrayType | _: StructType | _: MapType => - to.addColumnValue(ColumnValue.stringValue(null: String)) - } - } - - def getResultSetSchema: TableSchema = { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - if (result.queryExecution.analyzed.output.size == 0) { - new TableSchema(new FieldSchema("Result", "string", "") :: Nil) - } else { - val schema = result.queryExecution.analyzed.output.map { attr => - new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") - } - new TableSchema(schema) - } - } - - def run(): Unit = { - logInfo(s"Running query '$statement'") - setState(OperationState.RUNNING) - try { - result = hiveContext.sql(statement) - logDebug(result.queryExecution.toString()) - result.queryExecution.logical match { - case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) => - sessionToActivePool(parentSession) = value - logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") - case _ => - } - - val groupId = round(random * 1000000).toString - hiveContext.sparkContext.setJobGroup(groupId, statement) - sessionToActivePool.get(parentSession).foreach { pool => - hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) - } - iter = { - val resultRdd = result.queryExecution.toRdd - val useIncrementalCollect = - hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean - if (useIncrementalCollect) { - resultRdd.toLocalIterator - } else { - resultRdd.collect().iterator - } - } - dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray - setHasResultSet(true) - } catch { - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - logError("Error executing query:",e) - throw new HiveSQLException(e.toString) - } - setState(OperationState.FINISHED) - } - } - + val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)( + hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) operation } |