aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-29 01:14:02 -0700
committerReynold Xin <rxin@databricks.com>2016-04-29 01:14:02 -0700
commit054f991c4350af1350af7a4109ee77f4a34822f0 (patch)
treeec40f69f6dae5ed63c7247027f47f0b2da9d49c7 /sql/hive-thriftserver
parent2057cbcb0bc9d5a4fb66006c42457a556d0bb277 (diff)
downloadspark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.gz
spark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.bz2
spark-054f991c4350af1350af7a4109ee77f4a34822f0.zip
[SPARK-14994][SQL] Remove execution hive from HiveSessionState
## What changes were proposed in this pull request? This patch removes executionHive from HiveSessionState and HiveSharedState. ## How was this patch tested? Updated test cases. Author: Reynold Xin <rxin@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12770 from rxin/SPARK-14994.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java85
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala37
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala21
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala5
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala6
5 files changed, 102 insertions, 52 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java
new file mode 100644
index 0000000000..0f2683db07
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.util.Properties;
+
+import org.apache.commons.cli.*;
+
+public class HiveServerServerOptionsProcessor {
+ private final Options options = new Options();
+ private org.apache.commons.cli.CommandLine commandLine;
+ private final String serverName;
+ private final StringBuilder debugMessage = new StringBuilder();
+
+ @SuppressWarnings("static-access")
+ public HiveServerServerOptionsProcessor(String serverName) {
+ this.serverName = serverName;
+ // -hiveconf x=y
+ options.addOption(OptionBuilder
+ .withValueSeparator()
+ .hasArgs(2)
+ .withArgName("property=value")
+ .withLongOpt("hiveconf")
+ .withDescription("Use value for given property")
+ .create());
+ // -deregister <versionNumber>
+ options.addOption(OptionBuilder
+ .hasArgs(1)
+ .withArgName("versionNumber")
+ .withLongOpt("deregister")
+ .withDescription("Deregister all instances of given version from dynamic service discovery")
+ .create());
+ options.addOption(new Option("H", "help", false, "Print help information"));
+ }
+
+ public HiveServer2.ServerOptionsProcessorResponse parse(String[] argv) {
+ try {
+ commandLine = new GnuParser().parse(options, argv);
+ // Process --hiveconf
+ // Get hiveconf param values and set the System property values
+ Properties confProps = commandLine.getOptionProperties("hiveconf");
+ for (String propKey : confProps.stringPropertyNames()) {
+ // save logging message for log4j output latter after log4j initialize properly
+ debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
+ // System.setProperty("hivecli." + propKey, confProps.getProperty(propKey));
+ System.setProperty(propKey, confProps.getProperty(propKey));
+ }
+
+ // Process --help
+ if (commandLine.hasOption('H')) {
+ return new HiveServer2.ServerOptionsProcessorResponse(
+ new HiveServer2.HelpOptionExecutor(serverName, options));
+ }
+
+ // Process --deregister
+ if (commandLine.hasOption("deregister")) {
+ return new HiveServer2.ServerOptionsProcessorResponse(
+ new HiveServer2.DeregisterOptionExecutor(
+ commandLine.getOptionValue("deregister")));
+ }
+ } catch (ParseException e) {
+ // Error out & exit - we were not able to parse the args successfully
+ System.err.println("Error starting HiveServer2 with given arguments: ");
+ System.err.println(e.getMessage());
+ System.exit(-1);
+ }
+ // Default executor, when no option is specified
+ return new HiveServer2.ServerOptionsProcessorResponse(new HiveServer2.StartOptionExecutor());
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
deleted file mode 100644
index 60bb4dc5e7..0000000000
--- a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.service.server
-
-import org.apache.hive.service.server.HiveServer2.{ServerOptionsProcessor, StartOptionExecutor}
-
-/**
- * Class to upgrade a package-private class to public, and
- * implement a `process()` operation consistent with
- * the behavior of older Hive versions
- * @param serverName name of the hive server
- */
-private[apache] class HiveServerServerOptionsProcessor(serverName: String)
- extends ServerOptionsProcessor(serverName) {
-
- def process(args: Array[String]): Boolean = {
- // A parse failure automatically triggers a system exit
- val response = super.parse(args)
- val executor = response.getServerOptionsExecutor()
- // return true if the parsed option was to start the service
- executor.isInstanceOf[StartOptionExecutor]
- }
-}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 24a25023a6..03727b8ab2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -22,8 +22,10 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
@@ -34,7 +36,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveSessionState
+import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
@@ -56,7 +58,12 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: SQLContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
- server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
+
+ val executionHive = HiveUtils.newClientForExecution(
+ sqlContext.sparkContext.conf,
+ sqlContext.sessionState.newHadoopConf())
+
+ server.init(executionHive.conf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
@@ -70,9 +77,7 @@ object HiveThriftServer2 extends Logging {
def main(args: Array[String]) {
Utils.initDaemon(log)
val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2")
- if (!optionsProcessor.process(args)) {
- System.exit(-1)
- }
+ optionsProcessor.parse(args)
logInfo("Starting SparkContext")
SparkSQLEnv.init()
@@ -82,9 +87,13 @@ object HiveThriftServer2 extends Logging {
uiTab.foreach(_.detach())
}
+ val executionHive = HiveUtils.newClientForExecution(
+ SparkSQLEnv.sqlContext.sparkContext.conf,
+ SparkSQLEnv.sqlContext.sessionState.newHadoopConf())
+
try {
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
- server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
+ server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 40dc81e02d..e8bcdd76ef 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
import org.apache.spark.sql.execution.command.SetCommand
-import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{Utils => SparkUtils}
@@ -195,9 +195,8 @@ private[hive] class SparkExecuteStatementOperation(
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
// Always use the latest class loader provided by executionHive's state.
- val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader
+ val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
HiveThriftServer2.listener.onStatementStart(
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 268ba2f0bc..665a44e51a 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -60,13 +60,7 @@ private[hive] object SparkSQLEnv extends Logging {
sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
-
sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
-
- if (log.isDebugEnabled) {
- sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
- .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") }
- }
}
}