aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2014-10-31 11:27:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-31 11:27:59 -0700
commit7c41d135709c148d4fa3a1b06b5905715c970519 (patch)
tree78206ad76d5ae1f48917b53f6b68dcf3e01ca765
parentadb6415c1d65d466a10c50e8dc6cb3bf2805ebdf (diff)
downloadspark-7c41d135709c148d4fa3a1b06b5905715c970519.tar.gz
spark-7c41d135709c148d4fa3a1b06b5905715c970519.tar.bz2
spark-7c41d135709c148d4fa3a1b06b5905715c970519.zip
[SPARK-3826][SQL]enable hive-thriftserver to support hive-0.13.1
In #2241 hive-thriftserver is not enabled. This patch enable hive-thriftserver to support hive-0.13.1 by using a shim layer refer to #2241. 1 A light shim layer(code in sql/hive-thriftserver/hive-version) for each different hive version to handle api compatibility 2 New pom profiles "hive-default" and "hive-versions"(copy from #2241) to activate different hive version 3 SBT cmd for different version as follows: hive-0.12.0 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.12.0 assembly hive-0.13.1 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.13.1 assembly 4 Since hive-thriftserver depend on hive subproject, this patch should be merged with #2241 to enable hive-0.13.1 for hive-thriftserver Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Closes #2685 from scwf/shim-thriftserver1 and squashes the following commits: f26f3be [wangfei] remove clean to save time f5cac74 [wangfei] remove local hivecontext test 578234d [wangfei] use new shaded hive 18fb1ff [wangfei] exclude kryo in hive pom fa21d09 [wangfei] clean package assembly/assembly 8a4daf2 [wangfei] minor fix 0d7f6cf [wangfei] address comments f7c93ae [wangfei] adding build with hive 0.13 before running tests bcf943f [wangfei] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1 c359822 [wangfei] reuse getCommandProcessor in hiveshim 52674a4 [scwf] sql/hive included since examples depend on it 3529e98 [scwf] move hive module to hive profile f51ff4e [wangfei] update and fix conflicts f48d3a5 [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1 41f727b [scwf] revert pom changes 13afde0 [scwf] fix small bug 4b681f4 [scwf] enable thriftserver in profile hive-0.13.1 0bc53aa [scwf] fixed when result filed is null dfd1c63 [scwf] update run-tests to run hive-0.12.0 default now c6da3ce [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver 7c66b8e [scwf] update pom according spark-2706 ae47489 [scwf] update and fix conflicts
-rw-r--r--assembly/pom.xml6
-rwxr-xr-xdev/run-tests13
-rw-r--r--pom.xml29
-rw-r--r--python/pyspark/sql.py27
-rw-r--r--sql/hive-thriftserver/pom.xml18
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala (renamed from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala)18
-rwxr-xr-xsql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala6
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala19
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala169
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala225
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala267
-rw-r--r--sql/hive/pom.xml4
12 files changed, 571 insertions, 230 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 11d4bea936..9e8525dd46 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -201,12 +201,6 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
- </dependencies>
- </profile>
- <profile>
- <!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
- <id>hive-0.12.0</id>
- <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
diff --git a/dev/run-tests b/dev/run-tests
index 972c8c8a21..0e9eefa76a 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -142,17 +142,24 @@ CURRENT_BLOCK=$BLOCK_BUILD
# We always build with Hive because the PySpark Spark SQL tests need it.
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"
- echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS"
# NOTE: echo "q" is needed because sbt on encountering a build file with failure
#+ (either resolution or compilation) prompts the user for input either q, r, etc
#+ to quit or retry. This echo is there to make it not block.
- # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
+ # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
#+ single argument!
# QUESTION: Why doesn't 'yes "q"' work?
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
+ # First build with 0.12 to ensure patches do not break the hive 12 build
+ echo "[info] Compile with hive 0.12"
echo -e "q\n" \
- | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly \
+ | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \
+ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
+
+ # Then build with default version(0.13.1) because tests are based on this version
+ echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive"
+ echo -e "q\n" \
+ | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
}
diff --git a/pom.xml b/pom.xml
index 379274d0b1..42fdbb9e09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,7 +129,7 @@
<flume.version>1.4.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<!-- Version used in Maven Hive dependency -->
- <hive.version>0.13.1</hive.version>
+ <hive.version>0.13.1a</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>0.13.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
@@ -240,6 +240,18 @@
<enabled>false</enabled>
</snapshots>
</repository>
+ <repository>
+ <!-- This is temporarily included to fix issues with Hive 0.13 -->
+ <id>spark-staging-hive13</id>
+ <name>Spring Staging Repository Hive 13</name>
+ <url>https://oss.sonatype.org/content/repositories/orgspark-project-1089/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
</repositories>
<pluginRepositories>
<pluginRepository>
@@ -908,9 +920,9 @@
by Spark SQL for code generation. -->
<compilerPlugins>
<compilerPlugin>
- <groupId>org.scalamacros</groupId>
- <artifactId>paradise_${scala.version}</artifactId>
- <version>${scala.macros.version}</version>
+ <groupId>org.scalamacros</groupId>
+ <artifactId>paradise_${scala.version}</artifactId>
+ <version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
@@ -1314,14 +1326,19 @@
</dependencies>
</profile>
<profile>
- <id>hive-0.12.0</id>
+ <id>hive</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
- <!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<modules>
<module>sql/hive-thriftserver</module>
</modules>
+ </profile>
+ <profile>
+ <id>hive-0.12.0</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
<properties>
<hive.version>0.12.0-protobuf-2.5</hive.version>
<hive.version.short>0.12.0</hive.version.short>
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 93fd9d4909..f0bd3cbd98 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -1400,33 +1400,6 @@ class HiveContext(SQLContext):
class LocalHiveContext(HiveContext):
- """Starts up an instance of hive where metadata is stored locally.
-
- An in-process metadata data is created with data stored in ./metadata.
- Warehouse data is stored in in ./warehouse.
-
- >>> import os
- >>> hiveCtx = LocalHiveContext(sc)
- >>> try:
- ... supress = hiveCtx.sql("DROP TABLE src")
- ... except Exception:
- ... pass
- >>> kv1 = os.path.join(os.environ["SPARK_HOME"],
- ... 'examples/src/main/resources/kv1.txt')
- >>> supress = hiveCtx.sql(
- ... "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- >>> supress = hiveCtx.sql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src"
- ... % kv1)
- >>> results = hiveCtx.sql("FROM src SELECT value"
- ... ).map(lambda r: int(r.value.split('_')[1]))
- >>> num = results.count()
- >>> reduce_sum = results.reduce(lambda x, y: x + y)
- >>> num
- 500
- >>> reduce_sum
- 130091
- """
-
def __init__(self, sparkContext, sqlContext=None):
HiveContext.__init__(self, sparkContext, sqlContext)
warnings.warn("LocalHiveContext is deprecated. "
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 124fc107cb..8db3010624 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -71,6 +71,24 @@
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-default-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>v${hive.version.short}/src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
index a5c457c677..fcb302edbf 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.Logging
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
- extends Driver with Logging {
+private[hive] abstract class AbstractSparkSQLDriver(
+ val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {
- private var tableSchema: Schema = _
- private var hiveResponse: Seq[String] = _
+ private[hive] var tableSchema: Schema = _
+ private[hive] var hiveResponse: Seq[String] = _
override def init(): Unit = {
}
@@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo
override def getSchema: Schema = tableSchema
- override def getResults(res: JArrayList[String]): Boolean = {
- if (hiveResponse == null) {
- false
- } else {
- res.addAll(hiveResponse)
- hiveResponse = null
- true
- }
- }
-
override def destroy() {
super.destroy()
hiveResponse = null
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 7ba4564602..2cd02ae926 100755
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.thrift.transport.TSocket
import org.apache.spark.Logging
+import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim
private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
@@ -116,7 +118,7 @@ private[hive] object SparkSQLCLIDriver {
}
}
- if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
+ if (!sessionState.isRemoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
@@ -258,7 +260,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
} else {
var ret = 0
val hconf = conf.asInstanceOf[HiveConf]
- val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
+ val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
if (proc != null) {
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 42cbf363b2..a78311fc48 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -24,6 +24,7 @@ import java.util.{List => JList}
import javax.security.auth.login.LoginException
import org.apache.commons.logging.Log
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
@@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
+ var sparkServiceUGI: UserGroupInformation = null
- try {
- HiveAuthFactory.loginFromKeytab(hiveConf)
- val serverUserName = ShimLoader.getHadoopShims
- .getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
- setSuperField(this, "serverUserName", serverUserName)
- } catch {
- case e @ (_: IOException | _: LoginException) =>
- throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+ if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+ try {
+ HiveAuthFactory.loginFromKeytab(hiveConf)
+ sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
+ } catch {
+ case e @ (_: IOException | _: LoginException) =>
+ throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+ }
}
initCompositeService(hiveConf)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index accf61576b..2a4f24132c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -17,24 +17,15 @@
package org.apache.spark.sql.hive.thriftserver.server
-import java.sql.Timestamp
import java.util.{Map => JMap}
+import scala.collection.mutable.Map
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map}
-import scala.math.{random, round}
-
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
-import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
-import org.apache.spark.sql.catalyst.plans.logical.SetCommand
-import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, ReflectionUtils}
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
@@ -54,158 +45,8 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
- val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) {
- private var result: SchemaRDD = _
- private var iter: Iterator[SparkRow] = _
- private var dataTypes: Array[DataType] = _
-
- def close(): Unit = {
- // RDDs will be cleaned automatically upon garbage collection.
- logDebug("CLOSING")
- }
-
- def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
- if (!iter.hasNext) {
- new RowSet()
- } else {
- // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
- val maxRows = maxRowsL.toInt
- var curRow = 0
- var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
-
- while (curRow < maxRows && iter.hasNext) {
- val sparkRow = iter.next()
- val row = new Row()
- var curCol = 0
-
- while (curCol < sparkRow.length) {
- if (sparkRow.isNullAt(curCol)) {
- addNullColumnValue(sparkRow, row, curCol)
- } else {
- addNonNullColumnValue(sparkRow, row, curCol)
- }
- curCol += 1
- }
- rowSet += row
- curRow += 1
- }
- new RowSet(rowSet, 0)
- }
- }
-
- def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
- dataTypes(ordinal) match {
- case StringType =>
- to.addString(from(ordinal).asInstanceOf[String])
- case IntegerType =>
- to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
- case BooleanType =>
- to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
- case DoubleType =>
- to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
- case FloatType =>
- to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
- case DecimalType =>
- val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
- to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
- case LongType =>
- to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
- case ByteType =>
- to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
- case ShortType =>
- to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
- case TimestampType =>
- to.addColumnValue(
- ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
- case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- val hiveString = result
- .queryExecution
- .asInstanceOf[HiveContext#QueryExecution]
- .toHiveString((from.get(ordinal), dataTypes(ordinal)))
- to.addColumnValue(ColumnValue.stringValue(hiveString))
- }
- }
-
- def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
- dataTypes(ordinal) match {
- case StringType =>
- to.addString(null)
- case IntegerType =>
- to.addColumnValue(ColumnValue.intValue(null))
- case BooleanType =>
- to.addColumnValue(ColumnValue.booleanValue(null))
- case DoubleType =>
- to.addColumnValue(ColumnValue.doubleValue(null))
- case FloatType =>
- to.addColumnValue(ColumnValue.floatValue(null))
- case DecimalType =>
- to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
- case LongType =>
- to.addColumnValue(ColumnValue.longValue(null))
- case ByteType =>
- to.addColumnValue(ColumnValue.byteValue(null))
- case ShortType =>
- to.addColumnValue(ColumnValue.shortValue(null))
- case TimestampType =>
- to.addColumnValue(ColumnValue.timestampValue(null))
- case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- to.addColumnValue(ColumnValue.stringValue(null: String))
- }
- }
-
- def getResultSetSchema: TableSchema = {
- logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
- if (result.queryExecution.analyzed.output.size == 0) {
- new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
- } else {
- val schema = result.queryExecution.analyzed.output.map { attr =>
- new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
- }
- new TableSchema(schema)
- }
- }
-
- def run(): Unit = {
- logInfo(s"Running query '$statement'")
- setState(OperationState.RUNNING)
- try {
- result = hiveContext.sql(statement)
- logDebug(result.queryExecution.toString())
- result.queryExecution.logical match {
- case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
- sessionToActivePool(parentSession) = value
- logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
- case _ =>
- }
-
- val groupId = round(random * 1000000).toString
- hiveContext.sparkContext.setJobGroup(groupId, statement)
- sessionToActivePool.get(parentSession).foreach { pool =>
- hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
- }
- iter = {
- val resultRdd = result.queryExecution.toRdd
- val useIncrementalCollect =
- hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
- if (useIncrementalCollect) {
- resultRdd.toLocalIterator
- } else {
- resultRdd.collect().iterator
- }
- }
- dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
- setHasResultSet(true)
- } catch {
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
- // HiveServer will silently swallow them.
- case e: Throwable =>
- logError("Error executing query:",e)
- throw new HiveSQLException(e.toString)
- }
- setState(OperationState.FINISHED)
- }
- }
-
+ val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)(
+ hiveContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
operation
}
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
new file mode 100644
index 0000000000..bbd727c686
--- /dev/null
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.sql.Timestamp
+import java.util.{ArrayList => JArrayList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import scala.math._
+
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.ExecuteStatementOperation
+import org.apache.hive.service.cli.session.HiveSession
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.plans.logical.SetCommand
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
+import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+
+/**
+ * A compatibility layer for interacting with Hive version 0.12.0.
+ */
+private[thriftserver] object HiveThriftServerShim {
+ val version = "0.12.0"
+
+ def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
+ val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI)
+ setSuperField(sparkCliService, "serverUserName", serverUserName)
+ }
+}
+
+private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
+ extends AbstractSparkSQLDriver(_context) {
+ override def getResults(res: JArrayList[String]): Boolean = {
+ if (hiveResponse == null) {
+ false
+ } else {
+ res.addAll(hiveResponse)
+ hiveResponse = null
+ true
+ }
+ }
+}
+
+private[hive] class SparkExecuteStatementOperation(
+ parentSession: HiveSession,
+ statement: String,
+ confOverlay: JMap[String, String])(
+ hiveContext: HiveContext,
+ sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
+ parentSession, statement, confOverlay) with Logging {
+ private var result: SchemaRDD = _
+ private var iter: Iterator[SparkRow] = _
+ private var dataTypes: Array[DataType] = _
+
+ def close(): Unit = {
+ // RDDs will be cleaned automatically upon garbage collection.
+ logDebug("CLOSING")
+ }
+
+ def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
+ if (!iter.hasNext) {
+ new RowSet()
+ } else {
+ // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
+ val maxRows = maxRowsL.toInt
+ var curRow = 0
+ var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
+
+ while (curRow < maxRows && iter.hasNext) {
+ val sparkRow = iter.next()
+ val row = new Row()
+ var curCol = 0
+
+ while (curCol < sparkRow.length) {
+ if (sparkRow.isNullAt(curCol)) {
+ addNullColumnValue(sparkRow, row, curCol)
+ } else {
+ addNonNullColumnValue(sparkRow, row, curCol)
+ }
+ curCol += 1
+ }
+ rowSet += row
+ curRow += 1
+ }
+ new RowSet(rowSet, 0)
+ }
+ }
+
+ def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
+ dataTypes(ordinal) match {
+ case StringType =>
+ to.addString(from(ordinal).asInstanceOf[String])
+ case IntegerType =>
+ to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
+ case BooleanType =>
+ to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
+ case DoubleType =>
+ to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
+ case FloatType =>
+ to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
+ case DecimalType =>
+ val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
+ to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
+ case LongType =>
+ to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
+ case ByteType =>
+ to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
+ case ShortType =>
+ to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
+ case TimestampType =>
+ to.addColumnValue(
+ ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
+ case BinaryType | _: ArrayType | _: StructType | _: MapType =>
+ val hiveString = result
+ .queryExecution
+ .asInstanceOf[HiveContext#QueryExecution]
+ .toHiveString((from.get(ordinal), dataTypes(ordinal)))
+ to.addColumnValue(ColumnValue.stringValue(hiveString))
+ }
+ }
+
+ def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
+ dataTypes(ordinal) match {
+ case StringType =>
+ to.addString(null)
+ case IntegerType =>
+ to.addColumnValue(ColumnValue.intValue(null))
+ case BooleanType =>
+ to.addColumnValue(ColumnValue.booleanValue(null))
+ case DoubleType =>
+ to.addColumnValue(ColumnValue.doubleValue(null))
+ case FloatType =>
+ to.addColumnValue(ColumnValue.floatValue(null))
+ case DecimalType =>
+ to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
+ case LongType =>
+ to.addColumnValue(ColumnValue.longValue(null))
+ case ByteType =>
+ to.addColumnValue(ColumnValue.byteValue(null))
+ case ShortType =>
+ to.addColumnValue(ColumnValue.shortValue(null))
+ case TimestampType =>
+ to.addColumnValue(ColumnValue.timestampValue(null))
+ case BinaryType | _: ArrayType | _: StructType | _: MapType =>
+ to.addColumnValue(ColumnValue.stringValue(null: String))
+ }
+ }
+
+ def getResultSetSchema: TableSchema = {
+ logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
+ if (result.queryExecution.analyzed.output.size == 0) {
+ new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
+ } else {
+ val schema = result.queryExecution.analyzed.output.map { attr =>
+ new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+ }
+ new TableSchema(schema)
+ }
+ }
+
+ def run(): Unit = {
+ logInfo(s"Running query '$statement'")
+ setState(OperationState.RUNNING)
+ try {
+ result = hiveContext.sql(statement)
+ logDebug(result.queryExecution.toString())
+ result.queryExecution.logical match {
+ case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+ sessionToActivePool(parentSession) = value
+ logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
+ case _ =>
+ }
+
+ val groupId = round(random * 1000000).toString
+ hiveContext.sparkContext.setJobGroup(groupId, statement)
+ sessionToActivePool.get(parentSession).foreach { pool =>
+ hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+ }
+ iter = {
+ val resultRdd = result.queryExecution.toRdd
+ val useIncrementalCollect =
+ hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+ if (useIncrementalCollect) {
+ resultRdd.toLocalIterator
+ } else {
+ resultRdd.collect().iterator
+ }
+ }
+ dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
+ setHasResultSet(true)
+ } catch {
+ // Actually do need to catch Throwable as some failures don't inherit from Exception and
+ // HiveServer will silently swallow them.
+ case e: Throwable =>
+ logError("Error executing query:",e)
+ throw new HiveSQLException(e.toString)
+ }
+ setState(OperationState.FINISHED)
+ }
+}
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
new file mode 100644
index 0000000000..e59681bfbe
--- /dev/null
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.security.PrivilegedExceptionAction
+import java.sql.Timestamp
+import java.util.concurrent.Future
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import scala.math._
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.ExecuteStatementOperation
+import org.apache.hive.service.cli.session.HiveSession
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.{Row => SparkRow, SchemaRDD}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+
+/**
+ * A compatibility layer for interacting with Hive version 0.12.0.
+ */
+private[thriftserver] object HiveThriftServerShim {
+ val version = "0.13.1"
+
+ def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
+ setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
+ }
+}
+
+private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
+ extends AbstractSparkSQLDriver(_context) {
+ override def getResults(res: JList[_]): Boolean = {
+ if (hiveResponse == null) {
+ false
+ } else {
+ res.asInstanceOf[JArrayList[String]].addAll(hiveResponse)
+ hiveResponse = null
+ true
+ }
+ }
+}
+
+private[hive] class SparkExecuteStatementOperation(
+ parentSession: HiveSession,
+ statement: String,
+ confOverlay: JMap[String, String],
+ runInBackground: Boolean = true)(
+ hiveContext: HiveContext,
+ sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
+ parentSession, statement, confOverlay, runInBackground) with Logging {
+
+ private var result: SchemaRDD = _
+ private var iter: Iterator[SparkRow] = _
+ private var dataTypes: Array[DataType] = _
+
+ private def runInternal(cmd: String) = {
+ try {
+ result = hiveContext.sql(cmd)
+ logDebug(result.queryExecution.toString())
+ val groupId = round(random * 1000000).toString
+ hiveContext.sparkContext.setJobGroup(groupId, statement)
+ iter = {
+ val resultRdd = result.queryExecution.toRdd
+ val useIncrementalCollect =
+ hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+ if (useIncrementalCollect) {
+ resultRdd.toLocalIterator
+ } else {
+ resultRdd.collect().iterator
+ }
+ }
+ dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
+ } catch {
+ // Actually do need to catch Throwable as some failures don't inherit from Exception and
+ // HiveServer will silently swallow them.
+ case e: Throwable =>
+ logError("Error executing query:",e)
+ throw new HiveSQLException(e.toString)
+ }
+ }
+
+ def close(): Unit = {
+ // RDDs will be cleaned automatically upon garbage collection.
+ logDebug("CLOSING")
+ }
+
+ def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
+ dataTypes(ordinal) match {
+ case StringType =>
+ to += from.get(ordinal).asInstanceOf[String]
+ case IntegerType =>
+ to += from.getInt(ordinal)
+ case BooleanType =>
+ to += from.getBoolean(ordinal)
+ case DoubleType =>
+ to += from.getDouble(ordinal)
+ case FloatType =>
+ to += from.getFloat(ordinal)
+ case DecimalType =>
+ to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
+ case LongType =>
+ to += from.getLong(ordinal)
+ case ByteType =>
+ to += from.getByte(ordinal)
+ case ShortType =>
+ to += from.getShort(ordinal)
+ case TimestampType =>
+ to += from.get(ordinal).asInstanceOf[Timestamp]
+ case BinaryType =>
+ to += from.get(ordinal).asInstanceOf[String]
+ case _: ArrayType =>
+ to += from.get(ordinal).asInstanceOf[String]
+ case _: StructType =>
+ to += from.get(ordinal).asInstanceOf[String]
+ case _: MapType =>
+ to += from.get(ordinal).asInstanceOf[String]
+ }
+ }
+
+ def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
+ validateDefaultFetchOrientation(order)
+ assertState(OperationState.FINISHED)
+ setHasResultSet(true)
+ val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
+ if (!iter.hasNext) {
+ reultRowSet
+ } else {
+ // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
+ val maxRows = maxRowsL.toInt
+ var curRow = 0
+ while (curRow < maxRows && iter.hasNext) {
+ val sparkRow = iter.next()
+ val row = ArrayBuffer[Any]()
+ var curCol = 0
+ while (curCol < sparkRow.length) {
+ if (sparkRow.isNullAt(curCol)) {
+ row += null
+ } else {
+ addNonNullColumnValue(sparkRow, row, curCol)
+ }
+ curCol += 1
+ }
+ reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
+ curRow += 1
+ }
+ reultRowSet
+ }
+ }
+
+ def getResultSetSchema: TableSchema = {
+ logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
+ if (result.queryExecution.analyzed.output.size == 0) {
+ new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
+ } else {
+ val schema = result.queryExecution.analyzed.output.map { attr =>
+ new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+ }
+ new TableSchema(schema)
+ }
+ }
+
+ private def getConfigForOperation: HiveConf = {
+ var sqlOperationConf: HiveConf = getParentSession.getHiveConf
+ if (!getConfOverlay.isEmpty || shouldRunAsync) {
+ sqlOperationConf = new HiveConf(sqlOperationConf)
+ import scala.collection.JavaConversions._
+ for (confEntry <- getConfOverlay.entrySet) {
+ try {
+ sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
+ }
+ catch {
+ case e: IllegalArgumentException => {
+ throw new HiveSQLException("Error applying statement specific settings", e)
+ }
+ }
+ }
+ }
+ return sqlOperationConf
+ }
+
+ def run(): Unit = {
+ logInfo(s"Running query '$statement'")
+ val opConfig: HiveConf = getConfigForOperation
+ setState(OperationState.RUNNING)
+ setHasResultSet(true)
+
+ if (!shouldRunAsync) {
+ runInternal(statement)
+ setState(OperationState.FINISHED)
+ } else {
+ val parentSessionState = SessionState.get
+ val sessionHive: Hive = Hive.get
+ val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
+
+ val backgroundOperation: Runnable = new Runnable {
+ def run {
+ val doAsAction: PrivilegedExceptionAction[AnyRef] =
+ new PrivilegedExceptionAction[AnyRef] {
+ def run: AnyRef = {
+ Hive.set(sessionHive)
+ SessionState.setCurrentSessionState(parentSessionState)
+ try {
+ runInternal(statement)
+ }
+ catch {
+ case e: HiveSQLException => {
+ setOperationException(e)
+ logError("Error running hive query: ", e)
+ }
+ }
+ return null
+ }
+ }
+ try {
+ ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
+ }
+ catch {
+ case e: Exception => {
+ setOperationException(new HiveSQLException(e))
+ logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
+ }
+ }
+ setState(OperationState.FINISHED)
+ }
+ }
+
+ try {
+ val backgroundHandle: Future[_] = getParentSession.getSessionManager.
+ submitBackgroundOperation(backgroundOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ // Actually do need to catch Throwable as some failures don't inherit from Exception and
+ // HiveServer will silently swallow them.
+ case e: Throwable =>
+ logError("Error executing query:",e)
+ throw new HiveSQLException(e.toString)
+ }
+ }
+ }
+}
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index db01363b4d..67e36a951e 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -65,6 +65,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>