aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift')
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java48
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java108
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java740
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java440
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java167
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java546
6 files changed, 2049 insertions, 0 deletions
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
new file mode 100644
index 0000000000..ac63537337
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.ICLIService;
+
+
+/**
+ * EmbeddedThriftBinaryCLIService.
+ *
+ */
+public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
+
+ public EmbeddedThriftBinaryCLIService() {
+ super(new CLIService(null));
+ isEmbedded = true;
+ HiveConf.setLoadHiveServer2Config(true);
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ cliService.init(hiveConf);
+ cliService.start();
+ super.init(hiveConf);
+ }
+
+ public ICLIService getService() {
+ return cliService;
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
new file mode 100644
index 0000000000..6c9efba9e5
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+
+
+public class ThriftBinaryCLIService extends ThriftCLIService {
+
+ public ThriftBinaryCLIService(CLIService cliService) {
+ super(cliService, ThriftBinaryCLIService.class.getSimpleName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Server thread pool
+ String threadPoolName = "HiveServer2-Handler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
+
+ // Thrift configs
+ hiveAuthFactory = new HiveAuthFactory(hiveConf);
+ TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+ TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
+ TServerSocket serverSocket = null;
+ List<String> sslVersionBlacklist = new ArrayList<String>();
+ for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
+ sslVersionBlacklist.add(sslVersion);
+ }
+ if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
+ serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
+ } else {
+ String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
+ if (keyStorePath.isEmpty()) {
+ throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ + " Not configured for SSL connection");
+ }
+ String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
+ HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
+ serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
+ keyStorePassword, sslVersionBlacklist);
+ }
+
+ // Server args
+ int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
+ int requestTimeout = (int) hiveConf.getTimeVar(
+ HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
+ int beBackoffSlotLength = (int) hiveConf.getTimeVar(
+ HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
+ TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
+ .processorFactory(processorFactory).transportFactory(transportFactory)
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
+ .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
+ .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
+ .executorService(executorService);
+
+ // TCP Server
+ server = new TThreadPoolServer(sargs);
+ server.setServerEventHandler(serverEventHandler);
+ String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
+ LOG.info(msg);
+ server.serve();
+ } catch (Throwable t) {
+ LOG.fatal(
+ "Error starting HiveServer2: could not start "
+ + ThriftBinaryCLIService.class.getSimpleName(), t);
+ System.exit(-1);
+ }
+ }
+
+}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
new file mode 100644
index 0000000000..5a0f1c83c7
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.ServiceUtils;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.TSetIpAddressProcessor;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * ThriftCLIService.
+ *
+ */
+public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
+
+ public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName());
+
+ protected CLIService cliService;
+ private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+ protected static HiveAuthFactory hiveAuthFactory;
+
+ protected int portNum;
+ protected InetAddress serverIPAddress;
+ protected String hiveHost;
+ protected TServer server;
+ protected org.eclipse.jetty.server.Server httpServer;
+
+ private boolean isStarted = false;
+ protected boolean isEmbedded = false;
+
+ protected HiveConf hiveConf;
+
+ protected int minWorkerThreads;
+ protected int maxWorkerThreads;
+ protected long workerKeepAliveTime;
+
+ protected TServerEventHandler serverEventHandler;
+ protected ThreadLocal<ServerContext> currentServerContext;
+
+ static class ThriftCLIServerContext implements ServerContext {
+ private SessionHandle sessionHandle = null;
+
+ public void setSessionHandle(SessionHandle sessionHandle) {
+ this.sessionHandle = sessionHandle;
+ }
+
+ public SessionHandle getSessionHandle() {
+ return sessionHandle;
+ }
+ }
+
+ public ThriftCLIService(CLIService service, String serviceName) {
+ super(serviceName);
+ this.cliService = service;
+ currentServerContext = new ThreadLocal<ServerContext>();
+ serverEventHandler = new TServerEventHandler() {
+ @Override
+ public ServerContext createContext(
+ TProtocol input, TProtocol output) {
+ return new ThriftCLIServerContext();
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext,
+ TProtocol input, TProtocol output) {
+ ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
+ SessionHandle sessionHandle = context.getSessionHandle();
+ if (sessionHandle != null) {
+ LOG.info("Session disconnected without closing properly, close it now");
+ try {
+ cliService.closeSession(sessionHandle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Failed to close session: " + e, e);
+ }
+ }
+ }
+
+ @Override
+ public void preServe() {
+ }
+
+ @Override
+ public void processContext(ServerContext serverContext,
+ TTransport input, TTransport output) {
+ currentServerContext.set(serverContext);
+ }
+ };
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ // Initialize common server configs needed in both binary & http modes
+ String portString;
+ hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+ }
+ try {
+ if (hiveHost != null && !hiveHost.isEmpty()) {
+ serverIPAddress = InetAddress.getByName(hiveHost);
+ } else {
+ serverIPAddress = InetAddress.getLocalHost();
+ }
+ } catch (UnknownHostException e) {
+ throw new ServiceException(e);
+ }
+ // HTTP mode
+ if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+ TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+ }
+ }
+ // Binary mode
+ else {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ }
+ }
+ minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+ maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+ super.init(hiveConf);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ if (!isStarted && !isEmbedded) {
+ new Thread(this).start();
+ isStarted = true;
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (isStarted && !isEmbedded) {
+ if(server != null) {
+ server.stop();
+ LOG.info("Thrift server has stopped");
+ }
+ if((httpServer != null) && httpServer.isStarted()) {
+ try {
+ httpServer.stop();
+ LOG.info("Http server has stopped");
+ } catch (Exception e) {
+ LOG.error("Error stopping Http server: ", e);
+ }
+ }
+ isStarted = false;
+ }
+ super.stop();
+ }
+
+ public int getPortNumber() {
+ return portNum;
+ }
+
+ public InetAddress getServerIPAddress() {
+ return serverIPAddress;
+ }
+
+ @Override
+ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
+ throws TException {
+ TGetDelegationTokenResp resp = new TGetDelegationTokenResp();
+
+ if (hiveAuthFactory == null) {
+ resp.setStatus(unsecureTokenErrorStatus());
+ } else {
+ try {
+ String token = cliService.getDelegationToken(
+ new SessionHandle(req.getSessionHandle()),
+ hiveAuthFactory, req.getOwner(), req.getRenewer());
+ resp.setDelegationToken(token);
+ resp.setStatus(OK_STATUS);
+ } catch (HiveSQLException e) {
+ LOG.error("Error obtaining delegation token", e);
+ TStatus tokenErrorStatus = HiveSQLException.toTStatus(e);
+ tokenErrorStatus.setSqlState("42000");
+ resp.setStatus(tokenErrorStatus);
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req)
+ throws TException {
+ TCancelDelegationTokenResp resp = new TCancelDelegationTokenResp();
+
+ if (hiveAuthFactory == null) {
+ resp.setStatus(unsecureTokenErrorStatus());
+ } else {
+ try {
+ cliService.cancelDelegationToken(new SessionHandle(req.getSessionHandle()),
+ hiveAuthFactory, req.getDelegationToken());
+ resp.setStatus(OK_STATUS);
+ } catch (HiveSQLException e) {
+ LOG.error("Error canceling delegation token", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req)
+ throws TException {
+ TRenewDelegationTokenResp resp = new TRenewDelegationTokenResp();
+ if (hiveAuthFactory == null) {
+ resp.setStatus(unsecureTokenErrorStatus());
+ } else {
+ try {
+ cliService.renewDelegationToken(new SessionHandle(req.getSessionHandle()),
+ hiveAuthFactory, req.getDelegationToken());
+ resp.setStatus(OK_STATUS);
+ } catch (HiveSQLException e) {
+ LOG.error("Error obtaining renewing token", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ }
+ return resp;
+ }
+
+ private TStatus unsecureTokenErrorStatus() {
+ TStatus errorStatus = new TStatus(TStatusCode.ERROR_STATUS);
+ errorStatus.setErrorMessage("Delegation token only supported over remote " +
+ "client with kerberos authentication");
+ return errorStatus;
+ }
+
+ @Override
+ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
+ LOG.info("Client protocol version: " + req.getClient_protocol());
+ TOpenSessionResp resp = new TOpenSessionResp();
+ try {
+ SessionHandle sessionHandle = getSessionHandle(req, resp);
+ resp.setSessionHandle(sessionHandle.toTSessionHandle());
+ // TODO: set real configuration map
+ resp.setConfiguration(new HashMap<String, String>());
+ resp.setStatus(OK_STATUS);
+ ThriftCLIServerContext context =
+ (ThriftCLIServerContext)currentServerContext.get();
+ if (context != null) {
+ context.setSessionHandle(sessionHandle);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error opening session: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ private String getIpAddress() {
+ String clientIpAddress;
+ // Http transport mode.
+ // We set the thread local ip address, in ThriftHttpServlet.
+ if (cliService.getHiveConf().getVar(
+ ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+ clientIpAddress = SessionManager.getIpAddress();
+ }
+ else {
+ // Kerberos
+ if (isKerberosAuthMode()) {
+ clientIpAddress = hiveAuthFactory.getIpAddress();
+ }
+ // Except kerberos, NOSASL
+ else {
+ clientIpAddress = TSetIpAddressProcessor.getUserIpAddress();
+ }
+ }
+ LOG.debug("Client's IP Address: " + clientIpAddress);
+ return clientIpAddress;
+ }
+
+ /**
+ * Returns the effective username.
+ * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
+ * 2. If hive.server2.allow.user.substitution = true: the username of the end user,
+ * that the connecting user is trying to proxy for.
+ * This includes a check whether the connecting user is allowed to proxy for the end user.
+ * @param req
+ * @return
+ * @throws HiveSQLException
+ */
+ private String getUserName(TOpenSessionReq req) throws HiveSQLException {
+ String userName = null;
+ // Kerberos
+ if (isKerberosAuthMode()) {
+ userName = hiveAuthFactory.getRemoteUser();
+ }
+ // Except kerberos, NOSASL
+ if (userName == null) {
+ userName = TSetIpAddressProcessor.getUserName();
+ }
+ // Http transport mode.
+ // We set the thread local username, in ThriftHttpServlet.
+ if (cliService.getHiveConf().getVar(
+ ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+ userName = SessionManager.getUserName();
+ }
+ if (userName == null) {
+ userName = req.getUsername();
+ }
+
+ userName = getShortName(userName);
+ String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
+ LOG.debug("Client's username: " + effectiveClientUser);
+ return effectiveClientUser;
+ }
+
+ private String getShortName(String userName) {
+ String ret = null;
+ if (userName != null) {
+ int indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName);
+ ret = (indexOfDomainMatch <= 0) ? userName :
+ userName.substring(0, indexOfDomainMatch);
+ }
+
+ return ret;
+ }
+
+ /**
+ * Create a session handle
+ * @param req
+ * @param res
+ * @return
+ * @throws HiveSQLException
+ * @throws LoginException
+ * @throws IOException
+ */
+ SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)
+ throws HiveSQLException, LoginException, IOException {
+ String userName = getUserName(req);
+ String ipAddress = getIpAddress();
+ TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION,
+ req.getClient_protocol());
+ SessionHandle sessionHandle;
+ if (cliService.getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+ (userName != null)) {
+ String delegationTokenStr = getDelegationToken(userName);
+ sessionHandle = cliService.openSessionWithImpersonation(protocol, userName,
+ req.getPassword(), ipAddress, req.getConfiguration(), delegationTokenStr);
+ } else {
+ sessionHandle = cliService.openSession(protocol, userName, req.getPassword(),
+ ipAddress, req.getConfiguration());
+ }
+ res.setServerProtocolVersion(protocol);
+ return sessionHandle;
+ }
+
+
+ private String getDelegationToken(String userName)
+ throws HiveSQLException, LoginException, IOException {
+ if (userName == null || !cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
+ .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
+ return null;
+ }
+ try {
+ return cliService.getDelegationTokenFromMetaStore(userName);
+ } catch (UnsupportedOperationException e) {
+ // The delegation token is not applicable in the given deployment mode
+ }
+ return null;
+ }
+
+ private TProtocolVersion getMinVersion(TProtocolVersion... versions) {
+ TProtocolVersion[] values = TProtocolVersion.values();
+ int current = values[values.length - 1].getValue();
+ for (TProtocolVersion version : versions) {
+ if (current > version.getValue()) {
+ current = version.getValue();
+ }
+ }
+ for (TProtocolVersion version : values) {
+ if (version.getValue() == current) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("never");
+ }
+
+ @Override
+ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
+ TCloseSessionResp resp = new TCloseSessionResp();
+ try {
+ SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+ cliService.closeSession(sessionHandle);
+ resp.setStatus(OK_STATUS);
+ ThriftCLIServerContext context =
+ (ThriftCLIServerContext)currentServerContext.get();
+ if (context != null) {
+ context.setSessionHandle(null);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing session: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
+ TGetInfoResp resp = new TGetInfoResp();
+ try {
+ GetInfoValue getInfoValue =
+ cliService.getInfo(new SessionHandle(req.getSessionHandle()),
+ GetInfoType.getGetInfoType(req.getInfoType()));
+ resp.setInfoValue(getInfoValue.toTGetInfoValue());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting info: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
+ TExecuteStatementResp resp = new TExecuteStatementResp();
+ try {
+ SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+ String statement = req.getStatement();
+ Map<String, String> confOverlay = req.getConfOverlay();
+ Boolean runAsync = req.isRunAsync();
+ OperationHandle operationHandle = runAsync ?
+ cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
+ : cliService.executeStatement(sessionHandle, statement, confOverlay);
+ resp.setOperationHandle(operationHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error executing statement: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
+ TGetTypeInfoResp resp = new TGetTypeInfoResp();
+ try {
+ OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(operationHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting type info: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
+ TGetCatalogsResp resp = new TGetCatalogsResp();
+ try {
+ OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting catalogs: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
+ TGetSchemasResp resp = new TGetSchemasResp();
+ try {
+ OperationHandle opHandle = cliService.getSchemas(
+ new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting schemas: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
+ TGetTablesResp resp = new TGetTablesResp();
+ try {
+ OperationHandle opHandle = cliService
+ .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+ req.getSchemaName(), req.getTableName(), req.getTableTypes());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting tables: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
+ TGetTableTypesResp resp = new TGetTableTypesResp();
+ try {
+ OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting table types: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
+ TGetColumnsResp resp = new TGetColumnsResp();
+ try {
+ OperationHandle opHandle = cliService.getColumns(
+ new SessionHandle(req.getSessionHandle()),
+ req.getCatalogName(),
+ req.getSchemaName(),
+ req.getTableName(),
+ req.getColumnName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting columns: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
+ TGetFunctionsResp resp = new TGetFunctionsResp();
+ try {
+ OperationHandle opHandle = cliService.getFunctions(
+ new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+ req.getSchemaName(), req.getFunctionName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting functions: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
+ TGetOperationStatusResp resp = new TGetOperationStatusResp();
+ try {
+ OperationStatus operationStatus = cliService.getOperationStatus(
+ new OperationHandle(req.getOperationHandle()));
+ resp.setOperationState(operationStatus.getState().toTOperationState());
+ HiveSQLException opException = operationStatus.getOperationException();
+ if (opException != null) {
+ resp.setSqlState(opException.getSQLState());
+ resp.setErrorCode(opException.getErrorCode());
+ resp.setErrorMessage(opException.getMessage());
+ }
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting operation status: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
+ TCancelOperationResp resp = new TCancelOperationResp();
+ try {
+ cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error cancelling operation: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
+ TCloseOperationResp resp = new TCloseOperationResp();
+ try {
+ cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error closing operation: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
+ throws TException {
+ TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+ try {
+ TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
+ resp.setSchema(schema.toTTableSchema());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting result set metadata: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
+ TFetchResultsResp resp = new TFetchResultsResp();
+ try {
+ RowSet rowSet = cliService.fetchResults(
+ new OperationHandle(req.getOperationHandle()),
+ FetchOrientation.getFetchOrientation(req.getOrientation()),
+ req.getMaxRows(),
+ FetchType.getFetchType(req.getFetchType()));
+ resp.setResults(rowSet.toTRowSet());
+ resp.setHasMoreRows(false);
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error fetching results: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public abstract void run();
+
+ /**
+ * If the proxy user name is provided then check privileges to substitute the user.
+ * @param realUser
+ * @param sessionConf
+ * @param ipAddress
+ * @return
+ * @throws HiveSQLException
+ */
+ private String getProxyUser(String realUser, Map<String, String> sessionConf,
+ String ipAddress) throws HiveSQLException {
+ String proxyUser = null;
+ // Http transport mode.
+ // We set the thread local proxy username, in ThriftHttpServlet.
+ if (cliService.getHiveConf().getVar(
+ ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+ proxyUser = SessionManager.getProxyUserName();
+ LOG.debug("Proxy user from query string: " + proxyUser);
+ }
+
+ if (proxyUser == null && sessionConf != null && sessionConf.containsKey(HiveAuthFactory.HS2_PROXY_USER)) {
+ String proxyUserFromThriftBody = sessionConf.get(HiveAuthFactory.HS2_PROXY_USER);
+ LOG.debug("Proxy user from thrift body: " + proxyUserFromThriftBody);
+ proxyUser = proxyUserFromThriftBody;
+ }
+
+ if (proxyUser == null) {
+ return realUser;
+ }
+
+ // check whether substitution is allowed
+ if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION)) {
+ throw new HiveSQLException("Proxy user substitution is not allowed");
+ }
+
+ // If there's no authentication, then directly substitute the user
+ if (HiveAuthFactory.AuthTypes.NONE.toString().
+ equalsIgnoreCase(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
+ return proxyUser;
+ }
+
+ // Verify proxy user privilege of the realUser for the proxyUser
+ HiveAuthFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hiveConf);
+ LOG.debug("Verified proxy user: " + proxyUser);
+ return proxyUser;
+ }
+
+ private boolean isKerberosAuthMode() {
+ return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
+ .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
new file mode 100644
index 0000000000..1af45398b8
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.*;
+import org.apache.thrift.TException;
+
+/**
+ * ThriftCLIServiceClient.
+ *
+ */
+public class ThriftCLIServiceClient extends CLIServiceClient {
+ private final TCLIService.Iface cliService;
+
+ public ThriftCLIServiceClient(TCLIService.Iface cliService) {
+ this.cliService = cliService;
+ }
+
+ public void checkStatus(TStatus status) throws HiveSQLException {
+ if (TStatusCode.ERROR_STATUS.equals(status.getStatusCode())) {
+ throw new HiveSQLException(status);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#openSession(java.lang.String, java.lang.String, java.util.Map)
+ */
+ @Override
+ public SessionHandle openSession(String username, String password,
+ Map<String, String> configuration)
+ throws HiveSQLException {
+ try {
+ TOpenSessionReq req = new TOpenSessionReq();
+ req.setUsername(username);
+ req.setPassword(password);
+ req.setConfiguration(configuration);
+ TOpenSessionResp resp = cliService.OpenSession(req);
+ checkStatus(resp.getStatus());
+ return new SessionHandle(resp.getSessionHandle(), resp.getServerProtocolVersion());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public SessionHandle openSessionWithImpersonation(String username, String password,
+ Map<String, String> configuration, String delegationToken) throws HiveSQLException {
+ throw new HiveSQLException("open with impersonation operation is not supported in the client");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
+ try {
+ TCloseSessionReq req = new TCloseSessionReq(sessionHandle.toTSessionHandle());
+ TCloseSessionResp resp = cliService.CloseSession(req);
+ checkStatus(resp.getStatus());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getInfo(org.apache.hive.service.cli.SessionHandle, java.util.List)
+ */
+ @Override
+ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType)
+ throws HiveSQLException {
+ try {
+ // FIXME extract the right info type
+ TGetInfoReq req = new TGetInfoReq(sessionHandle.toTSessionHandle(), infoType.toTGetInfoType());
+ TGetInfoResp resp = cliService.GetInfo(req);
+ checkStatus(resp.getStatus());
+ return new GetInfoValue(resp.getInfoValue());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+ */
+ @Override
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay)
+ throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, false);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+ */
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay)
+ throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, true);
+ }
+
+ private OperationHandle executeStatementInternal(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, boolean isAsync)
+ throws HiveSQLException {
+ try {
+ TExecuteStatementReq req =
+ new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement);
+ req.setConfOverlay(confOverlay);
+ req.setRunAsync(isAsync);
+ TExecuteStatementResp resp = cliService.ExecuteStatement(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException {
+ try {
+ TGetTypeInfoReq req = new TGetTypeInfoReq(sessionHandle.toTSessionHandle());
+ TGetTypeInfoResp resp = cliService.GetTypeInfo(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getCatalogs(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException {
+ try {
+ TGetCatalogsReq req = new TGetCatalogsReq(sessionHandle.toTSessionHandle());
+ TGetCatalogsResp resp = cliService.GetCatalogs(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getSchemas(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String)
+ */
+ @Override
+ public OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName,
+ String schemaName)
+ throws HiveSQLException {
+ try {
+ TGetSchemasReq req = new TGetSchemasReq(sessionHandle.toTSessionHandle());
+ req.setCatalogName(catalogName);
+ req.setSchemaName(schemaName);
+ TGetSchemasResp resp = cliService.GetSchemas(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getTables(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.util.List)
+ */
+ @Override
+ public OperationHandle getTables(SessionHandle sessionHandle, String catalogName,
+ String schemaName, String tableName, List<String> tableTypes)
+ throws HiveSQLException {
+ try {
+ TGetTablesReq req = new TGetTablesReq(sessionHandle.toTSessionHandle());
+ req.setTableName(tableName);
+ req.setTableTypes(tableTypes);
+ req.setSchemaName(schemaName);
+ TGetTablesResp resp = cliService.GetTables(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getTableTypes(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException {
+ try {
+ TGetTableTypesReq req = new TGetTableTypesReq(sessionHandle.toTSessionHandle());
+ TGetTableTypesResp resp = cliService.GetTableTypes(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getColumns(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getColumns(SessionHandle sessionHandle,
+ String catalogName, String schemaName, String tableName, String columnName)
+ throws HiveSQLException {
+ try {
+ TGetColumnsReq req = new TGetColumnsReq();
+ req.setSessionHandle(sessionHandle.toTSessionHandle());
+ req.setCatalogName(catalogName);
+ req.setSchemaName(schemaName);
+ req.setTableName(tableName);
+ req.setColumnName(columnName);
+ TGetColumnsResp resp = cliService.GetColumns(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getFunctions(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getFunctions(SessionHandle sessionHandle,
+ String catalogName, String schemaName, String functionName) throws HiveSQLException {
+ try {
+ TGetFunctionsReq req = new TGetFunctionsReq(sessionHandle.toTSessionHandle(), functionName);
+ req.setCatalogName(catalogName);
+ req.setSchemaName(schemaName);
+ TGetFunctionsResp resp = cliService.GetFunctions(req);
+ checkStatus(resp.getStatus());
+ TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+ return new OperationHandle(resp.getOperationHandle(), protocol);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
+ try {
+ TGetOperationStatusReq req = new TGetOperationStatusReq(opHandle.toTOperationHandle());
+ TGetOperationStatusResp resp = cliService.GetOperationStatus(req);
+ // Checks the status of the RPC call, throws an exception in case of error
+ checkStatus(resp.getStatus());
+ OperationState opState = OperationState.getOperationState(resp.getOperationState());
+ HiveSQLException opException = null;
+ if (opState == OperationState.ERROR) {
+ opException = new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode());
+ }
+ return new OperationStatus(opState, opException);
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#cancelOperation(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+ try {
+ TCancelOperationReq req = new TCancelOperationReq(opHandle.toTOperationHandle());
+ TCancelOperationResp resp = cliService.CancelOperation(req);
+ checkStatus(resp.getStatus());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#closeOperation(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public void closeOperation(OperationHandle opHandle)
+ throws HiveSQLException {
+ try {
+ TCloseOperationReq req = new TCloseOperationReq(opHandle.toTOperationHandle());
+ TCloseOperationResp resp = cliService.CloseOperation(req);
+ checkStatus(resp.getStatus());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public TableSchema getResultSetMetadata(OperationHandle opHandle)
+ throws HiveSQLException {
+ try {
+ TGetResultSetMetadataReq req = new TGetResultSetMetadataReq(opHandle.toTOperationHandle());
+ TGetResultSetMetadataResp resp = cliService.GetResultSetMetadata(req);
+ checkStatus(resp.getStatus());
+ return new TableSchema(resp.getSchema());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows,
+ FetchType fetchType) throws HiveSQLException {
+ try {
+ TFetchResultsReq req = new TFetchResultsReq();
+ req.setOperationHandle(opHandle.toTOperationHandle());
+ req.setOrientation(orientation.toTFetchOrientation());
+ req.setMaxRows(maxRows);
+ req.setFetchType(fetchType.toTFetchType());
+ TFetchResultsResp resp = cliService.FetchResults(req);
+ checkStatus(resp.getStatus());
+ return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
+ // TODO: set the correct default fetch size
+ return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT);
+ }
+
+ @Override
+ public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String owner, String renewer) throws HiveSQLException {
+ TGetDelegationTokenReq req = new TGetDelegationTokenReq(
+ sessionHandle.toTSessionHandle(), owner, renewer);
+ try {
+ TGetDelegationTokenResp tokenResp = cliService.GetDelegationToken(req);
+ checkStatus(tokenResp.getStatus());
+ return tokenResp.getDelegationToken();
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ @Override
+ public void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException {
+ TCancelDelegationTokenReq cancelReq = new TCancelDelegationTokenReq(
+ sessionHandle.toTSessionHandle(), tokenStr);
+ try {
+ TCancelDelegationTokenResp cancelResp =
+ cliService.CancelDelegationToken(cancelReq);
+ checkStatus(cancelResp.getStatus());
+ return;
+ } catch (TException e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ @Override
+ public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException {
+ TRenewDelegationTokenReq cancelReq = new TRenewDelegationTokenReq(
+ sessionHandle.toTSessionHandle(), tokenStr);
+ try {
+ TRenewDelegationTokenResp renewResp =
+ cliService.RenewDelegationToken(cancelReq);
+ checkStatus(renewResp.getStatus());
+ return;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
new file mode 100644
index 0000000000..3b57efa38b
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.thrift.TCLIService.Iface;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+
+
+public class ThriftHttpCLIService extends ThriftCLIService {
+
+ public ThriftHttpCLIService(CLIService cliService) {
+ super(cliService, ThriftHttpCLIService.class.getSimpleName());
+ }
+
+ /**
+ * Configure Jetty to serve http requests. Example of a client connection URL:
+ * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
+ * e.g. http://gateway:port/hive2/servlets/thrifths2/
+ */
+ @Override
+ public void run() {
+ try {
+ // HTTP Server
+ httpServer = new org.eclipse.jetty.server.Server();
+
+ // Server thread pool
+ // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
+ String threadPoolName = "HiveServer2-HttpHandler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
+ ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
+ httpServer.setThreadPool(threadPool);
+
+ // Connector configs
+ SelectChannelConnector connector = new SelectChannelConnector();
+ boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
+ String schemeName = useSsl ? "https" : "http";
+ // Change connector if SSL is used
+ if (useSsl) {
+ String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
+ String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
+ HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
+ if (keyStorePath.isEmpty()) {
+ throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ + " Not configured for SSL connection");
+ }
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
+ LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols));
+ sslContextFactory.addExcludeProtocols(excludedProtocols);
+ LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
+ Arrays.toString(sslContextFactory.getExcludeProtocols()));
+ sslContextFactory.setKeyStorePath(keyStorePath);
+ sslContextFactory.setKeyStorePassword(keyStorePassword);
+ connector = new SslSelectChannelConnector(sslContextFactory);
+ }
+ connector.setPort(portNum);
+ // Linux:yes, Windows:no
+ connector.setReuseAddress(!Shell.WINDOWS);
+ int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
+ TimeUnit.MILLISECONDS);
+ connector.setMaxIdleTime(maxIdleTime);
+
+ httpServer.addConnector(connector);
+
+ // Thrift configs
+ hiveAuthFactory = new HiveAuthFactory(hiveConf);
+ TProcessor processor = new TCLIService.Processor<Iface>(this);
+ TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+ // Set during the init phase of HiveServer2 if auth mode is kerberos
+ // UGI for the hive/_HOST (kerberos) principal
+ UserGroupInformation serviceUGI = cliService.getServiceUGI();
+ // UGI for the http/_HOST (SPNego) principal
+ UserGroupInformation httpUGI = cliService.getHttpUGI();
+ String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
+ TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
+ serviceUGI, httpUGI);
+
+ // Context handler
+ final ServletContextHandler context = new ServletContextHandler(
+ ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ String httpPath = getHttpPath(hiveConf
+ .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+ httpServer.setHandler(context);
+ context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
+
+ // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
+ // Finally, start the server
+ httpServer.start();
+ String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+ + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+ + maxWorkerThreads + " worker threads";
+ LOG.info(msg);
+ httpServer.join();
+ } catch (Throwable t) {
+ LOG.fatal(
+ "Error starting HiveServer2: could not start "
+ + ThriftHttpCLIService.class.getSimpleName(), t);
+ System.exit(-1);
+ }
+ }
+
+ /**
+ * The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on.
+ * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*"
+ * @param httpPath
+ * @return
+ */
+ private String getHttpPath(String httpPath) {
+ if(httpPath == null || httpPath.equals("")) {
+ httpPath = "/*";
+ }
+ else {
+ if(!httpPath.startsWith("/")) {
+ httpPath = "/" + httpPath;
+ }
+ if(httpPath.endsWith("/")) {
+ httpPath = httpPath + "*";
+ }
+ if(!httpPath.endsWith("/*")) {
+ httpPath = httpPath + "/*";
+ }
+ }
+ return httpPath;
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
new file mode 100644
index 0000000000..56c8cb6e54
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.NewCookie;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.auth.AuthenticationProviderFactory;
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.HttpAuthUtils;
+import org.apache.hive.service.auth.HttpAuthenticationException;
+import org.apache.hive.service.auth.PasswdAuthenticationProvider;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.CookieSigner;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+/**
+ *
+ * ThriftHttpServlet
+ *
+ */
+public class ThriftHttpServlet extends TServlet {
+
+ private static final long serialVersionUID = 1L;
+ public static final Log LOG = LogFactory.getLog(ThriftHttpServlet.class.getName());
+ private final String authType;
+ private final UserGroupInformation serviceUGI;
+ private final UserGroupInformation httpUGI;
+ private HiveConf hiveConf = new HiveConf();
+
+ // Class members for cookie based authentication.
+ private CookieSigner signer;
+ public static final String AUTH_COOKIE = "hive.server2.auth";
+ private static final Random RAN = new Random();
+ private boolean isCookieAuthEnabled;
+ private String cookieDomain;
+ private String cookiePath;
+ private int cookieMaxAge;
+ private boolean isCookieSecure;
+ private boolean isHttpOnlyCookie;
+
+ public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
+ String authType, UserGroupInformation serviceUGI, UserGroupInformation httpUGI) {
+ super(processor, protocolFactory);
+ this.authType = authType;
+ this.serviceUGI = serviceUGI;
+ this.httpUGI = httpUGI;
+ this.isCookieAuthEnabled = hiveConf.getBoolVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED);
+ // Initialize the cookie based authentication related variables.
+ if (isCookieAuthEnabled) {
+ // Generate the signer with secret.
+ String secret = Long.toString(RAN.nextLong());
+ LOG.debug("Using the random number as the secret for cookie generation " + secret);
+ this.signer = new CookieSigner(secret.getBytes());
+ this.cookieMaxAge = (int) hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE, TimeUnit.SECONDS);
+ this.cookieDomain = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_DOMAIN);
+ this.cookiePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_PATH);
+ this.isCookieSecure = hiveConf.getBoolVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_IS_SECURE);
+ this.isHttpOnlyCookie = hiveConf.getBoolVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_IS_HTTPONLY);
+ }
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ String clientUserName = null;
+ String clientIpAddress;
+ boolean requireNewCookie = false;
+
+ try {
+ // If the cookie based authentication is already enabled, parse the
+ // request and validate the request cookies.
+ if (isCookieAuthEnabled) {
+ clientUserName = validateCookie(request);
+ requireNewCookie = (clientUserName == null);
+ if (requireNewCookie) {
+ LOG.info("Could not validate cookie sent, will try to generate a new cookie");
+ }
+ }
+ // If the cookie based authentication is not enabled or the request does
+ // not have a valid cookie, use the kerberos or password based authentication
+ // depending on the server setup.
+ if (clientUserName == null) {
+ // For a kerberos setup
+ if (isKerberosAuthMode(authType)) {
+ clientUserName = doKerberosAuth(request);
+ }
+ // For password based authentication
+ else {
+ clientUserName = doPasswdAuth(request, authType);
+ }
+ }
+ LOG.debug("Client username: " + clientUserName);
+
+ // Set the thread local username to be used for doAs if true
+ SessionManager.setUserName(clientUserName);
+
+ // find proxy user if any from query param
+ String doAsQueryParam = getDoAsQueryParam(request.getQueryString());
+ if (doAsQueryParam != null) {
+ SessionManager.setProxyUserName(doAsQueryParam);
+ }
+
+ clientIpAddress = request.getRemoteAddr();
+ LOG.debug("Client IP Address: " + clientIpAddress);
+ // Set the thread local ip address
+ SessionManager.setIpAddress(clientIpAddress);
+ // Generate new cookie and add it to the response
+ if (requireNewCookie &&
+ !authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.NOSASL.toString())) {
+ String cookieToken = HttpAuthUtils.createCookieToken(clientUserName);
+ Cookie hs2Cookie = createCookie(signer.signCookie(cookieToken));
+
+ if (isHttpOnlyCookie) {
+ response.setHeader("SET-COOKIE", getHttpOnlyCookieHeader(hs2Cookie));
+ } else {
+ response.addCookie(hs2Cookie);
+ }
+ LOG.info("Cookie added for clientUserName " + clientUserName);
+ }
+ super.doPost(request, response);
+ }
+ catch (HttpAuthenticationException e) {
+ LOG.error("Error: ", e);
+ // Send a 401 to the client
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+ if(isKerberosAuthMode(authType)) {
+ response.addHeader(HttpAuthUtils.WWW_AUTHENTICATE, HttpAuthUtils.NEGOTIATE);
+ }
+ response.getWriter().println("Authentication Error: " + e.getMessage());
+ }
+ finally {
+ // Clear the thread locals
+ SessionManager.clearUserName();
+ SessionManager.clearIpAddress();
+ SessionManager.clearProxyUserName();
+ }
+ }
+
+ /**
+ * Retrieves the client name from cookieString. If the cookie does not
+ * correspond to a valid client, the function returns null.
+ * @param cookies HTTP Request cookies.
+ * @return Client Username if cookieString has a HS2 Generated cookie that is currently valid.
+ * Else, returns null.
+ */
+ private String getClientNameFromCookie(Cookie[] cookies) {
+ // Current Cookie Name, Current Cookie Value
+ String currName, currValue;
+
+ // Following is the main loop which iterates through all the cookies send by the client.
+ // The HS2 generated cookies are of the format hive.server2.auth=<value>
+ // A cookie which is identified as a hiveserver2 generated cookie is validated
+ // by calling signer.verifyAndExtract(). If the validation passes, send the
+ // username for which the cookie is validated to the caller. If no client side
+ // cookie passes the validation, return null to the caller.
+ for (Cookie currCookie : cookies) {
+ // Get the cookie name
+ currName = currCookie.getName();
+ if (!currName.equals(AUTH_COOKIE)) {
+ // Not a HS2 generated cookie, continue.
+ continue;
+ }
+ // If we reached here, we have match for HS2 generated cookie
+ currValue = currCookie.getValue();
+ // Validate the value.
+ currValue = signer.verifyAndExtract(currValue);
+ // Retrieve the user name, do the final validation step.
+ if (currValue != null) {
+ String userName = HttpAuthUtils.getUserNameFromCookieToken(currValue);
+
+ if (userName == null) {
+ LOG.warn("Invalid cookie token " + currValue);
+ continue;
+ }
+ //We have found a valid cookie in the client request.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Validated the cookie for user " + userName);
+ }
+ return userName;
+ }
+ }
+ // No valid HS2 generated cookies found, return null
+ return null;
+ }
+
+ /**
+ * Convert cookie array to human readable cookie string
+ * @param cookies Cookie Array
+ * @return String containing all the cookies separated by a newline character.
+ * Each cookie is of the format [key]=[value]
+ */
+ private String toCookieStr(Cookie[] cookies) {
+ String cookieStr = "";
+
+ for (Cookie c : cookies) {
+ cookieStr += c.getName() + "=" + c.getValue() + " ;\n";
+ }
+ return cookieStr;
+ }
+
+ /**
+ * Validate the request cookie. This function iterates over the request cookie headers
+ * and finds a cookie that represents a valid client/server session. If it finds one, it
+ * returns the client name associated with the session. Else, it returns null.
+ * @param request The HTTP Servlet Request send by the client
+ * @return Client Username if the request has valid HS2 cookie, else returns null
+ * @throws UnsupportedEncodingException
+ */
+ private String validateCookie(HttpServletRequest request) throws UnsupportedEncodingException {
+ // Find all the valid cookies associated with the request.
+ Cookie[] cookies = request.getCookies();
+
+ if (cookies == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No valid cookies associated with the request " + request);
+ }
+ return null;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received cookies: " + toCookieStr(cookies));
+ }
+ return getClientNameFromCookie(cookies);
+ }
+
+ /**
+ * Generate a server side cookie given the cookie value as the input.
+ * @param str Input string token.
+ * @return The generated cookie.
+ * @throws UnsupportedEncodingException
+ */
+ private Cookie createCookie(String str) throws UnsupportedEncodingException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cookie name = " + AUTH_COOKIE + " value = " + str);
+ }
+ Cookie cookie = new Cookie(AUTH_COOKIE, str);
+
+ cookie.setMaxAge(cookieMaxAge);
+ if (cookieDomain != null) {
+ cookie.setDomain(cookieDomain);
+ }
+ if (cookiePath != null) {
+ cookie.setPath(cookiePath);
+ }
+ cookie.setSecure(isCookieSecure);
+ return cookie;
+ }
+
+ /**
+ * Generate httponly cookie from HS2 cookie
+ * @param cookie HS2 generated cookie
+ * @return The httponly cookie
+ */
+ private static String getHttpOnlyCookieHeader(Cookie cookie) {
+ NewCookie newCookie = new NewCookie(cookie.getName(), cookie.getValue(),
+ cookie.getPath(), cookie.getDomain(), cookie.getVersion(),
+ cookie.getComment(), cookie.getMaxAge(), cookie.getSecure());
+ return newCookie + "; HttpOnly";
+ }
+
+ /**
+ * Do the LDAP/PAM authentication
+ * @param request
+ * @param authType
+ * @throws HttpAuthenticationException
+ */
+ private String doPasswdAuth(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String userName = getUsername(request, authType);
+ // No-op when authType is NOSASL
+ if (!authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.NOSASL.toString())) {
+ try {
+ AuthMethods authMethod = AuthMethods.getValidAuthMethod(authType);
+ PasswdAuthenticationProvider provider =
+ AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
+ provider.Authenticate(userName, getPassword(request, authType));
+
+ } catch (Exception e) {
+ throw new HttpAuthenticationException(e);
+ }
+ }
+ return userName;
+ }
+
+ /**
+ * Do the GSS-API kerberos authentication.
+ * We already have a logged in subject in the form of serviceUGI,
+ * which GSS-API will extract information from.
+ * In case of a SPNego request we use the httpUGI,
+ * for the authenticating service tickets.
+ * @param request
+ * @return
+ * @throws HttpAuthenticationException
+ */
+ private String doKerberosAuth(HttpServletRequest request)
+ throws HttpAuthenticationException {
+ // Try authenticating with the http/_HOST principal
+ if (httpUGI != null) {
+ try {
+ return httpUGI.doAs(new HttpKerberosServerAction(request, httpUGI));
+ } catch (Exception e) {
+ LOG.info("Failed to authenticate with http/_HOST kerberos principal, " +
+ "trying with hive/_HOST kerberos principal");
+ }
+ }
+ // Now try with hive/_HOST principal
+ try {
+ return serviceUGI.doAs(new HttpKerberosServerAction(request, serviceUGI));
+ } catch (Exception e) {
+ LOG.error("Failed to authenticate with hive/_HOST kerberos principal");
+ throw new HttpAuthenticationException(e);
+ }
+
+ }
+
+ class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
+ HttpServletRequest request;
+ UserGroupInformation serviceUGI;
+
+ HttpKerberosServerAction(HttpServletRequest request,
+ UserGroupInformation serviceUGI) {
+ this.request = request;
+ this.serviceUGI = serviceUGI;
+ }
+
+ @Override
+ public String run() throws HttpAuthenticationException {
+ // Get own Kerberos credentials for accepting connection
+ GSSManager manager = GSSManager.getInstance();
+ GSSContext gssContext = null;
+ String serverPrincipal = getPrincipalWithoutRealm(
+ serviceUGI.getUserName());
+ try {
+ // This Oid for Kerberos GSS-API mechanism.
+ Oid kerberosMechOid = new Oid("1.2.840.113554.1.2.2");
+ // Oid for SPNego GSS-API mechanism.
+ Oid spnegoMechOid = new Oid("1.3.6.1.5.5.2");
+ // Oid for kerberos principal name
+ Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
+
+ // GSS name for server
+ GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
+
+ // GSS credentials for server
+ GSSCredential serverCreds = manager.createCredential(serverName,
+ GSSCredential.DEFAULT_LIFETIME,
+ new Oid[]{kerberosMechOid, spnegoMechOid},
+ GSSCredential.ACCEPT_ONLY);
+
+ // Create a GSS context
+ gssContext = manager.createContext(serverCreds);
+ // Get service ticket from the authorization header
+ String serviceTicketBase64 = getAuthHeader(request, authType);
+ byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
+ gssContext.acceptSecContext(inToken, 0, inToken.length);
+ // Authenticate or deny based on its context completion
+ if (!gssContext.isEstablished()) {
+ throw new HttpAuthenticationException("Kerberos authentication failed: " +
+ "unable to establish context with the service ticket " +
+ "provided by the client.");
+ }
+ else {
+ return getPrincipalWithoutRealmAndHost(gssContext.getSrcName().toString());
+ }
+ }
+ catch (GSSException e) {
+ throw new HttpAuthenticationException("Kerberos authentication failed: ", e);
+ }
+ finally {
+ if (gssContext != null) {
+ try {
+ gssContext.dispose();
+ } catch (GSSException e) {
+ // No-op
+ }
+ }
+ }
+ }
+
+ private String getPrincipalWithoutRealm(String fullPrincipal)
+ throws HttpAuthenticationException {
+ KerberosNameShim fullKerberosName;
+ try {
+ fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+ } catch (IOException e) {
+ throw new HttpAuthenticationException(e);
+ }
+ String serviceName = fullKerberosName.getServiceName();
+ String hostName = fullKerberosName.getHostName();
+ String principalWithoutRealm = serviceName;
+ if (hostName != null) {
+ principalWithoutRealm = serviceName + "/" + hostName;
+ }
+ return principalWithoutRealm;
+ }
+
+ private String getPrincipalWithoutRealmAndHost(String fullPrincipal)
+ throws HttpAuthenticationException {
+ KerberosNameShim fullKerberosName;
+ try {
+ fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+ return fullKerberosName.getShortName();
+ } catch (IOException e) {
+ throw new HttpAuthenticationException(e);
+ }
+ }
+ }
+
+ private String getUsername(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String creds[] = getAuthHeaderTokens(request, authType);
+ // Username must be present
+ if (creds[0] == null || creds[0].isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client does not contain username.");
+ }
+ return creds[0];
+ }
+
+ private String getPassword(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String creds[] = getAuthHeaderTokens(request, authType);
+ // Password must be present
+ if (creds[1] == null || creds[1].isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client does not contain username.");
+ }
+ return creds[1];
+ }
+
+ private String[] getAuthHeaderTokens(HttpServletRequest request,
+ String authType) throws HttpAuthenticationException {
+ String authHeaderBase64 = getAuthHeader(request, authType);
+ String authHeaderString = StringUtils.newStringUtf8(
+ Base64.decodeBase64(authHeaderBase64.getBytes()));
+ String[] creds = authHeaderString.split(":");
+ return creds;
+ }
+
+ /**
+ * Returns the base64 encoded auth header payload
+ * @param request
+ * @param authType
+ * @return
+ * @throws HttpAuthenticationException
+ */
+ private String getAuthHeader(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String authHeader = request.getHeader(HttpAuthUtils.AUTHORIZATION);
+ // Each http request must have an Authorization header
+ if (authHeader == null || authHeader.isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client is empty.");
+ }
+
+ String authHeaderBase64String;
+ int beginIndex;
+ if (isKerberosAuthMode(authType)) {
+ beginIndex = (HttpAuthUtils.NEGOTIATE + " ").length();
+ }
+ else {
+ beginIndex = (HttpAuthUtils.BASIC + " ").length();
+ }
+ authHeaderBase64String = authHeader.substring(beginIndex);
+ // Authorization header must have a payload
+ if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client does not contain any data.");
+ }
+ return authHeaderBase64String;
+ }
+
+ private boolean isKerberosAuthMode(String authType) {
+ return authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
+ }
+
+ private static String getDoAsQueryParam(String queryString) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL query string:" + queryString);
+ }
+ if (queryString == null) {
+ return null;
+ }
+ Map<String, String[]> params = javax.servlet.http.HttpUtils.parseQueryString( queryString );
+ Set<String> keySet = params.keySet();
+ for (String key: keySet) {
+ if (key.equalsIgnoreCase("doAs")) {
+ return params.get(key)[0];
+ }
+ }
+ return null;
+ }
+
+}
+
+