aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2014-10-31 11:27:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-31 11:27:59 -0700
commit7c41d135709c148d4fa3a1b06b5905715c970519 (patch)
tree78206ad76d5ae1f48917b53f6b68dcf3e01ca765 /sql/hive-thriftserver/src
parentadb6415c1d65d466a10c50e8dc6cb3bf2805ebdf (diff)
downloadspark-7c41d135709c148d4fa3a1b06b5905715c970519.tar.gz
spark-7c41d135709c148d4fa3a1b06b5905715c970519.tar.bz2
spark-7c41d135709c148d4fa3a1b06b5905715c970519.zip
[SPARK-3826][SQL]enable hive-thriftserver to support hive-0.13.1
In #2241 hive-thriftserver is not enabled. This patch enable hive-thriftserver to support hive-0.13.1 by using a shim layer refer to #2241. 1 A light shim layer(code in sql/hive-thriftserver/hive-version) for each different hive version to handle api compatibility 2 New pom profiles "hive-default" and "hive-versions"(copy from #2241) to activate different hive version 3 SBT cmd for different version as follows: hive-0.12.0 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.12.0 assembly hive-0.13.1 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.13.1 assembly 4 Since hive-thriftserver depend on hive subproject, this patch should be merged with #2241 to enable hive-0.13.1 for hive-thriftserver Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Closes #2685 from scwf/shim-thriftserver1 and squashes the following commits: f26f3be [wangfei] remove clean to save time f5cac74 [wangfei] remove local hivecontext test 578234d [wangfei] use new shaded hive 18fb1ff [wangfei] exclude kryo in hive pom fa21d09 [wangfei] clean package assembly/assembly 8a4daf2 [wangfei] minor fix 0d7f6cf [wangfei] address comments f7c93ae [wangfei] adding build with hive 0.13 before running tests bcf943f [wangfei] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1 c359822 [wangfei] reuse getCommandProcessor in hiveshim 52674a4 [scwf] sql/hive included since examples depend on it 3529e98 [scwf] move hive module to hive profile f51ff4e [wangfei] update and fix conflicts f48d3a5 [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1 41f727b [scwf] revert pom changes 13afde0 [scwf] fix small bug 4b681f4 [scwf] enable thriftserver in profile hive-0.13.1 0bc53aa [scwf] fixed when result filed is null dfd1c63 [scwf] update run-tests to run hive-0.12.0 default now c6da3ce [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver 7c66b8e [scwf] update pom according spark-2706 ae47489 [scwf] update and fix conflicts
Diffstat (limited to 'sql/hive-thriftserver/src')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala (renamed from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala)18
-rwxr-xr-xsql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala6
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala19
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala169
4 files changed, 24 insertions, 188 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
index a5c457c677..fcb302edbf 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.Logging
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
- extends Driver with Logging {
+private[hive] abstract class AbstractSparkSQLDriver(
+ val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {
- private var tableSchema: Schema = _
- private var hiveResponse: Seq[String] = _
+ private[hive] var tableSchema: Schema = _
+ private[hive] var hiveResponse: Seq[String] = _
override def init(): Unit = {
}
@@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo
override def getSchema: Schema = tableSchema
- override def getResults(res: JArrayList[String]): Boolean = {
- if (hiveResponse == null) {
- false
- } else {
- res.addAll(hiveResponse)
- hiveResponse = null
- true
- }
- }
-
override def destroy() {
super.destroy()
hiveResponse = null
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 7ba4564602..2cd02ae926 100755
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.thrift.transport.TSocket
import org.apache.spark.Logging
+import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim
private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
@@ -116,7 +118,7 @@ private[hive] object SparkSQLCLIDriver {
}
}
- if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
+ if (!sessionState.isRemoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
@@ -258,7 +260,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
} else {
var ret = 0
val hconf = conf.asInstanceOf[HiveConf]
- val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
+ val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
if (proc != null) {
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 42cbf363b2..a78311fc48 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -24,6 +24,7 @@ import java.util.{List => JList}
import javax.security.auth.login.LoginException
import org.apache.commons.logging.Log
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
@@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
+ var sparkServiceUGI: UserGroupInformation = null
- try {
- HiveAuthFactory.loginFromKeytab(hiveConf)
- val serverUserName = ShimLoader.getHadoopShims
- .getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
- setSuperField(this, "serverUserName", serverUserName)
- } catch {
- case e @ (_: IOException | _: LoginException) =>
- throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+ if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+ try {
+ HiveAuthFactory.loginFromKeytab(hiveConf)
+ sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
+ } catch {
+ case e @ (_: IOException | _: LoginException) =>
+ throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+ }
}
initCompositeService(hiveConf)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index accf61576b..2a4f24132c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -17,24 +17,15 @@
package org.apache.spark.sql.hive.thriftserver.server
-import java.sql.Timestamp
import java.util.{Map => JMap}
+import scala.collection.mutable.Map
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map}
-import scala.math.{random, round}
-
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
-import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
-import org.apache.spark.sql.catalyst.plans.logical.SetCommand
-import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, ReflectionUtils}
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
@@ -54,158 +45,8 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
- val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) {
- private var result: SchemaRDD = _
- private var iter: Iterator[SparkRow] = _
- private var dataTypes: Array[DataType] = _
-
- def close(): Unit = {
- // RDDs will be cleaned automatically upon garbage collection.
- logDebug("CLOSING")
- }
-
- def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
- if (!iter.hasNext) {
- new RowSet()
- } else {
- // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
- val maxRows = maxRowsL.toInt
- var curRow = 0
- var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
-
- while (curRow < maxRows && iter.hasNext) {
- val sparkRow = iter.next()
- val row = new Row()
- var curCol = 0
-
- while (curCol < sparkRow.length) {
- if (sparkRow.isNullAt(curCol)) {
- addNullColumnValue(sparkRow, row, curCol)
- } else {
- addNonNullColumnValue(sparkRow, row, curCol)
- }
- curCol += 1
- }
- rowSet += row
- curRow += 1
- }
- new RowSet(rowSet, 0)
- }
- }
-
- def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
- dataTypes(ordinal) match {
- case StringType =>
- to.addString(from(ordinal).asInstanceOf[String])
- case IntegerType =>
- to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
- case BooleanType =>
- to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
- case DoubleType =>
- to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
- case FloatType =>
- to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
- case DecimalType =>
- val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
- to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
- case LongType =>
- to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
- case ByteType =>
- to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
- case ShortType =>
- to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
- case TimestampType =>
- to.addColumnValue(
- ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
- case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- val hiveString = result
- .queryExecution
- .asInstanceOf[HiveContext#QueryExecution]
- .toHiveString((from.get(ordinal), dataTypes(ordinal)))
- to.addColumnValue(ColumnValue.stringValue(hiveString))
- }
- }
-
- def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
- dataTypes(ordinal) match {
- case StringType =>
- to.addString(null)
- case IntegerType =>
- to.addColumnValue(ColumnValue.intValue(null))
- case BooleanType =>
- to.addColumnValue(ColumnValue.booleanValue(null))
- case DoubleType =>
- to.addColumnValue(ColumnValue.doubleValue(null))
- case FloatType =>
- to.addColumnValue(ColumnValue.floatValue(null))
- case DecimalType =>
- to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
- case LongType =>
- to.addColumnValue(ColumnValue.longValue(null))
- case ByteType =>
- to.addColumnValue(ColumnValue.byteValue(null))
- case ShortType =>
- to.addColumnValue(ColumnValue.shortValue(null))
- case TimestampType =>
- to.addColumnValue(ColumnValue.timestampValue(null))
- case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- to.addColumnValue(ColumnValue.stringValue(null: String))
- }
- }
-
- def getResultSetSchema: TableSchema = {
- logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
- if (result.queryExecution.analyzed.output.size == 0) {
- new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
- } else {
- val schema = result.queryExecution.analyzed.output.map { attr =>
- new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
- }
- new TableSchema(schema)
- }
- }
-
- def run(): Unit = {
- logInfo(s"Running query '$statement'")
- setState(OperationState.RUNNING)
- try {
- result = hiveContext.sql(statement)
- logDebug(result.queryExecution.toString())
- result.queryExecution.logical match {
- case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
- sessionToActivePool(parentSession) = value
- logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
- case _ =>
- }
-
- val groupId = round(random * 1000000).toString
- hiveContext.sparkContext.setJobGroup(groupId, statement)
- sessionToActivePool.get(parentSession).foreach { pool =>
- hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
- }
- iter = {
- val resultRdd = result.queryExecution.toRdd
- val useIncrementalCollect =
- hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
- if (useIncrementalCollect) {
- resultRdd.toLocalIterator
- } else {
- resultRdd.collect().iterator
- }
- }
- dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
- setHasResultSet(true)
- } catch {
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
- // HiveServer will silently swallow them.
- case e: Throwable =>
- logError("Error executing query:",e)
- throw new HiveSQLException(e.toString)
- }
- setState(OperationState.FINISHED)
- }
- }
-
+ val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)(
+ hiveContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
operation
}