diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-29 01:14:02 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-29 01:14:02 -0700 |
commit | 054f991c4350af1350af7a4109ee77f4a34822f0 (patch) | |
tree | ec40f69f6dae5ed63c7247027f47f0b2da9d49c7 /sql/hive-thriftserver | |
parent | 2057cbcb0bc9d5a4fb66006c42457a556d0bb277 (diff) | |
download | spark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.gz spark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.bz2 spark-054f991c4350af1350af7a4109ee77f4a34822f0.zip |
[SPARK-14994][SQL] Remove execution hive from HiveSessionState
## What changes were proposed in this pull request?
This patch removes executionHive from HiveSessionState and HiveSharedState.
## How was this patch tested?
Updated test cases.
Author: Reynold Xin <rxin@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes #12770 from rxin/SPARK-14994.
Diffstat (limited to 'sql/hive-thriftserver')
5 files changed, 102 insertions, 52 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java new file mode 100644 index 0000000000..0f2683db07 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import java.util.Properties; + +import org.apache.commons.cli.*; + +public class HiveServerServerOptionsProcessor { + private final Options options = new Options(); + private org.apache.commons.cli.CommandLine commandLine; + private final String serverName; + private final StringBuilder debugMessage = new StringBuilder(); + + @SuppressWarnings("static-access") + public HiveServerServerOptionsProcessor(String serverName) { + this.serverName = serverName; + // -hiveconf x=y + options.addOption(OptionBuilder + .withValueSeparator() + .hasArgs(2) + .withArgName("property=value") + .withLongOpt("hiveconf") + .withDescription("Use value for given property") + .create()); + // -deregister <versionNumber> + options.addOption(OptionBuilder + .hasArgs(1) + .withArgName("versionNumber") + .withLongOpt("deregister") + .withDescription("Deregister all instances of given version from dynamic service discovery") + .create()); + options.addOption(new Option("H", "help", false, "Print help information")); + } + + public HiveServer2.ServerOptionsProcessorResponse parse(String[] argv) { + try { + commandLine = new GnuParser().parse(options, argv); + // Process --hiveconf + // Get hiveconf param values and set the System property values + Properties confProps = commandLine.getOptionProperties("hiveconf"); + for (String propKey : confProps.stringPropertyNames()) { + // save logging message for log4j output latter after log4j initialize properly + debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n"); + // System.setProperty("hivecli." + propKey, confProps.getProperty(propKey)); + System.setProperty(propKey, confProps.getProperty(propKey)); + } + + // Process --help + if (commandLine.hasOption('H')) { + return new HiveServer2.ServerOptionsProcessorResponse( + new HiveServer2.HelpOptionExecutor(serverName, options)); + } + + // Process --deregister + if (commandLine.hasOption("deregister")) { + return new HiveServer2.ServerOptionsProcessorResponse( + new HiveServer2.DeregisterOptionExecutor( + commandLine.getOptionValue("deregister"))); + } + } catch (ParseException e) { + // Error out & exit - we were not able to parse the args successfully + System.err.println("Error starting HiveServer2 with given arguments: "); + System.err.println(e.getMessage()); + System.exit(-1); + } + // Default executor, when no option is specified + return new HiveServer2.ServerOptionsProcessorResponse(new HiveServer2.StartOptionExecutor()); + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala deleted file mode 100644 index 60bb4dc5e7..0000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hive.service.server - -import org.apache.hive.service.server.HiveServer2.{ServerOptionsProcessor, StartOptionExecutor} - -/** - * Class to upgrade a package-private class to public, and - * implement a `process()` operation consistent with - * the behavior of older Hive versions - * @param serverName name of the hive server - */ -private[apache] class HiveServerServerOptionsProcessor(serverName: String) - extends ServerOptionsProcessor(serverName) { - - def process(args: Array[String]): Boolean = { - // A parse failure automatically triggers a system exit - val response = super.parse(args) - val executor = response.getServerOptionsExecutor() - // return true if the parsed option was to start the service - executor.isInstanceOf[StartOptionExecutor] - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 24a25023a6..03727b8ab2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -22,8 +22,10 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import org.apache.commons.logging.LogFactory +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} @@ -34,7 +36,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveSessionState +import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -56,7 +58,12 @@ object HiveThriftServer2 extends Logging { @DeveloperApi def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) - server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + + val executionHive = HiveUtils.newClientForExecution( + sqlContext.sparkContext.conf, + sqlContext.sessionState.newHadoopConf()) + + server.init(executionHive.conf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) sqlContext.sparkContext.addSparkListener(listener) @@ -70,9 +77,7 @@ object HiveThriftServer2 extends Logging { def main(args: Array[String]) { Utils.initDaemon(log) val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2") - if (!optionsProcessor.process(args)) { - System.exit(-1) - } + optionsProcessor.parse(args) logInfo("Starting SparkContext") SparkSQLEnv.init() @@ -82,9 +87,13 @@ object HiveThriftServer2 extends Logging { uiTab.foreach(_.detach()) } + val executionHive = HiveUtils.newClientForExecution( + SparkSQLEnv.sqlContext.sparkContext.conf, + SparkSQLEnv.sqlContext.sessionState.newHadoopConf()) + try { val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) - server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + server.init(executionHive.conf) server.start() logInfo("HiveThriftServer2 started") listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 40dc81e02d..e8bcdd76ef 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} import org.apache.spark.sql.execution.command.SetCommand -import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} @@ -195,9 +195,8 @@ private[hive] class SparkExecuteStatementOperation( statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.listener.onStatementStart( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 268ba2f0bc..665a44e51a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -60,13 +60,7 @@ private[hive] object SparkSQLEnv extends Logging { sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) - - if (log.isDebugEnabled) { - sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted - .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } - } } } |