aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java')
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java740
1 files changed, 740 insertions, 0 deletions
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
new file mode 100644
index 0000000000..5a0f1c83c7
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.ServiceUtils;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.TSetIpAddressProcessor;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * ThriftCLIService.
+ *
+ */
+public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
+
+ public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName());
+
+ protected CLIService cliService;
+ private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+ protected static HiveAuthFactory hiveAuthFactory;
+
+ protected int portNum;
+ protected InetAddress serverIPAddress;
+ protected String hiveHost;
+ protected TServer server;
+ protected org.eclipse.jetty.server.Server httpServer;
+
+ private boolean isStarted = false;
+ protected boolean isEmbedded = false;
+
+ protected HiveConf hiveConf;
+
+ protected int minWorkerThreads;
+ protected int maxWorkerThreads;
+ protected long workerKeepAliveTime;
+
+ protected TServerEventHandler serverEventHandler;
+ protected ThreadLocal<ServerContext> currentServerContext;
+
+ static class ThriftCLIServerContext implements ServerContext {
+ private SessionHandle sessionHandle = null;
+
+ public void setSessionHandle(SessionHandle sessionHandle) {
+ this.sessionHandle = sessionHandle;
+ }
+
+ public SessionHandle getSessionHandle() {
+ return sessionHandle;
+ }
+ }
+
+ public ThriftCLIService(CLIService service, String serviceName) {
+ super(serviceName);
+ this.cliService = service;
+ currentServerContext = new ThreadLocal<ServerContext>();
+ serverEventHandler = new TServerEventHandler() {
+ @Override
+ public ServerContext createContext(
+ TProtocol input, TProtocol output) {
+ return new ThriftCLIServerContext();
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext,
+ TProtocol input, TProtocol output) {
+ ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
+ SessionHandle sessionHandle = context.getSessionHandle();
+ if (sessionHandle != null) {
+ LOG.info("Session disconnected without closing properly, close it now");
+ try {
+ cliService.closeSession(sessionHandle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Failed to close session: " + e, e);
+ }
+ }
+ }
+
+ @Override
+ public void preServe() {
+ }
+
+ @Override
+ public void processContext(ServerContext serverContext,
+ TTransport input, TTransport output) {
+ currentServerContext.set(serverContext);
+ }
+ };
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ // Initialize common server configs needed in both binary & http modes
+ String portString;
+ hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+ }
+ try {
+ if (hiveHost != null && !hiveHost.isEmpty()) {
+ serverIPAddress = InetAddress.getByName(hiveHost);
+ } else {
+ serverIPAddress = InetAddress.getLocalHost();
+ }
+ } catch (UnknownHostException e) {
+ throw new ServiceException(e);
+ }
+ // HTTP mode
+ if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+ TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+ }
+ }
+ // Binary mode
+ else {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ }
+ }
+ minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+ maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+ super.init(hiveConf);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ if (!isStarted && !isEmbedded) {
+ new Thread(this).start();
+ isStarted = true;
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (isStarted && !isEmbedded) {
+ if(server != null) {
+ server.stop();
+ LOG.info("Thrift server has stopped");
+ }
+ if((httpServer != null) && httpServer.isStarted()) {
+ try {
+ httpServer.stop();
+ LOG.info("Http server has stopped");
+ } catch (Exception e) {
+ LOG.error("Error stopping Http server: ", e);
+ }
+ }
+ isStarted = false;
+ }
+ super.stop();
+ }
+
+ public int getPortNumber() {
+ return portNum;
+ }
+
+ public InetAddress getServerIPAddress() {
+ return serverIPAddress;
+ }
+
+ @Override
+ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
+ throws TException {
+ TGetDelegationTokenResp resp = new TGetDelegationTokenResp();
+
+ if (hiveAuthFactory == null) {
+ resp.setStatus(unsecureTokenErrorStatus());
+ } else {
+ try {
+ String token = cliService.getDelegationToken(
+ new SessionHandle(req.getSessionHandle()),
+ hiveAuthFactory, req.getOwner(), req.getRenewer());
+ resp.setDelegationToken(token);
+ resp.setStatus(OK_STATUS);
+ } catch (HiveSQLException e) {
+ LOG.error("Error obtaining delegation token", e);
+ TStatus tokenErrorStatus = HiveSQLException.toTStatus(e);
+ tokenErrorStatus.setSqlState("42000");
+ resp.setStatus(tokenErrorStatus);
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req)
+ throws TException {
+ TCancelDelegationTokenResp resp = new TCancelDelegationTokenResp();
+
+ if (hiveAuthFactory == null) {
+ resp.setStatus(unsecureTokenErrorStatus());
+ } else {
+ try {
+ cliService.cancelDelegationToken(new SessionHandle(req.getSessionHandle()),
+ hiveAuthFactory, req.getDelegationToken());
+ resp.setStatus(OK_STATUS);
+ } catch (HiveSQLException e) {
+ LOG.error("Error canceling delegation token", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req)
+ throws TException {
+ TRenewDelegationTokenResp resp = new TRenewDelegationTokenResp();
+ if (hiveAuthFactory == null) {
+ resp.setStatus(unsecureTokenErrorStatus());
+ } else {
+ try {
+ cliService.renewDelegationToken(new SessionHandle(req.getSessionHandle()),
+ hiveAuthFactory, req.getDelegationToken());
+ resp.setStatus(OK_STATUS);
+ } catch (HiveSQLException e) {
+ LOG.error("Error obtaining renewing token", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ }
+ return resp;
+ }
+
+ private TStatus unsecureTokenErrorStatus() {
+ TStatus errorStatus = new TStatus(TStatusCode.ERROR_STATUS);
+ errorStatus.setErrorMessage("Delegation token only supported over remote " +
+ "client with kerberos authentication");
+ return errorStatus;
+ }
+
+ @Override
+ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
+ LOG.info("Client protocol version: " + req.getClient_protocol());
+ TOpenSessionResp resp = new TOpenSessionResp();
+ try {
+ SessionHandle sessionHandle = getSessionHandle(req, resp);
+ resp.setSessionHandle(sessionHandle.toTSessionHandle());
+ // TODO: set real configuration map
+ resp.setConfiguration(new HashMap<String, String>());
+ resp.setStatus(OK_STATUS);
+ ThriftCLIServerContext context =
+ (ThriftCLIServerContext)currentServerContext.get();
+ if (context != null) {
+ context.setSessionHandle(sessionHandle);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error opening session: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ private String getIpAddress() {
+ String clientIpAddress;
+ // Http transport mode.
+ // We set the thread local ip address, in ThriftHttpServlet.
+ if (cliService.getHiveConf().getVar(
+ ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+ clientIpAddress = SessionManager.getIpAddress();
+ }
+ else {
+ // Kerberos
+ if (isKerberosAuthMode()) {
+ clientIpAddress = hiveAuthFactory.getIpAddress();
+ }
+ // Except kerberos, NOSASL
+ else {
+ clientIpAddress = TSetIpAddressProcessor.getUserIpAddress();
+ }
+ }
+ LOG.debug("Client's IP Address: " + clientIpAddress);
+ return clientIpAddress;
+ }
+
+ /**
+ * Returns the effective username.
+ * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
+ * 2. If hive.server2.allow.user.substitution = true: the username of the end user,
+ * that the connecting user is trying to proxy for.
+ * This includes a check whether the connecting user is allowed to proxy for the end user.
+ * @param req
+ * @return
+ * @throws HiveSQLException
+ */
+ private String getUserName(TOpenSessionReq req) throws HiveSQLException {
+ String userName = null;
+ // Kerberos
+ if (isKerberosAuthMode()) {
+ userName = hiveAuthFactory.getRemoteUser();
+ }
+ // Except kerberos, NOSASL
+ if (userName == null) {
+ userName = TSetIpAddressProcessor.getUserName();
+ }
+ // Http transport mode.
+ // We set the thread local username, in ThriftHttpServlet.
+ if (cliService.getHiveConf().getVar(
+ ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+ userName = SessionManager.getUserName();
+ }
+ if (userName == null) {
+ userName = req.getUsername();
+ }
+
+ userName = getShortName(userName);
+ String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
+ LOG.debug("Client's username: " + effectiveClientUser);
+ return effectiveClientUser;
+ }
+
+ private String getShortName(String userName) {
+ String ret = null;
+ if (userName != null) {
+ int indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName);
+ ret = (indexOfDomainMatch <= 0) ? userName :
+ userName.substring(0, indexOfDomainMatch);
+ }
+
+ return ret;
+ }
+
+ /**
+ * Create a session handle
+ * @param req
+ * @param res
+ * @return
+ * @throws HiveSQLException
+ * @throws LoginException
+ * @throws IOException
+ */
+ SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)
+ throws HiveSQLException, LoginException, IOException {
+ String userName = getUserName(req);
+ String ipAddress = getIpAddress();
+ TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION,
+ req.getClient_protocol());
+ SessionHandle sessionHandle;
+ if (cliService.getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+ (userName != null)) {
+ String delegationTokenStr = getDelegationToken(userName);
+ sessionHandle = cliService.openSessionWithImpersonation(protocol, userName,
+ req.getPassword(), ipAddress, req.getConfiguration(), delegationTokenStr);
+ } else {
+ sessionHandle = cliService.openSession(protocol, userName, req.getPassword(),
+ ipAddress, req.getConfiguration());
+ }
+ res.setServerProtocolVersion(protocol);
+ return sessionHandle;
+ }
+
+
+ private String getDelegationToken(String userName)
+ throws HiveSQLException, LoginException, IOException {
+ if (userName == null || !cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
+ .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
+ return null;
+ }
+ try {
+ return cliService.getDelegationTokenFromMetaStore(userName);
+ } catch (UnsupportedOperationException e) {
+ // The delegation token is not applicable in the given deployment mode
+ }
+ return null;
+ }
+
+ private TProtocolVersion getMinVersion(TProtocolVersion... versions) {
+ TProtocolVersion[] values = TProtocolVersion.values();
+ int current = values[values.length - 1].getValue();
+ for (TProtocolVersion version : versions) {
+ if (current > version.getValue()) {
+ current = version.getValue();
+ }
+ }
+ for (TProtocolVersion version : values) {
+ if (version.getValue() == current) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("never");
+ }
+
+ @Override
+ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
+ TCloseSessionResp resp = new TCloseSessionResp();
+ try {
+ SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+ cliService.closeSession(sessionHandle);
+ resp.setStatus(OK_STATUS);
+ ThriftCLIServerContext context =
+ (ThriftCLIServerContext)currentServerContext.get();
+ if (context != null) {
+ context.setSessionHandle(null);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing session: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
+ TGetInfoResp resp = new TGetInfoResp();
+ try {
+ GetInfoValue getInfoValue =
+ cliService.getInfo(new SessionHandle(req.getSessionHandle()),
+ GetInfoType.getGetInfoType(req.getInfoType()));
+ resp.setInfoValue(getInfoValue.toTGetInfoValue());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting info: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
+ TExecuteStatementResp resp = new TExecuteStatementResp();
+ try {
+ SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+ String statement = req.getStatement();
+ Map<String, String> confOverlay = req.getConfOverlay();
+ Boolean runAsync = req.isRunAsync();
+ OperationHandle operationHandle = runAsync ?
+ cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
+ : cliService.executeStatement(sessionHandle, statement, confOverlay);
+ resp.setOperationHandle(operationHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error executing statement: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
+ TGetTypeInfoResp resp = new TGetTypeInfoResp();
+ try {
+ OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(operationHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting type info: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
+ TGetCatalogsResp resp = new TGetCatalogsResp();
+ try {
+ OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting catalogs: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
+ TGetSchemasResp resp = new TGetSchemasResp();
+ try {
+ OperationHandle opHandle = cliService.getSchemas(
+ new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting schemas: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
+ TGetTablesResp resp = new TGetTablesResp();
+ try {
+ OperationHandle opHandle = cliService
+ .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+ req.getSchemaName(), req.getTableName(), req.getTableTypes());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting tables: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
+ TGetTableTypesResp resp = new TGetTableTypesResp();
+ try {
+ OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting table types: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
+ TGetColumnsResp resp = new TGetColumnsResp();
+ try {
+ OperationHandle opHandle = cliService.getColumns(
+ new SessionHandle(req.getSessionHandle()),
+ req.getCatalogName(),
+ req.getSchemaName(),
+ req.getTableName(),
+ req.getColumnName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting columns: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
+ TGetFunctionsResp resp = new TGetFunctionsResp();
+ try {
+ OperationHandle opHandle = cliService.getFunctions(
+ new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+ req.getSchemaName(), req.getFunctionName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting functions: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
+ TGetOperationStatusResp resp = new TGetOperationStatusResp();
+ try {
+ OperationStatus operationStatus = cliService.getOperationStatus(
+ new OperationHandle(req.getOperationHandle()));
+ resp.setOperationState(operationStatus.getState().toTOperationState());
+ HiveSQLException opException = operationStatus.getOperationException();
+ if (opException != null) {
+ resp.setSqlState(opException.getSQLState());
+ resp.setErrorCode(opException.getErrorCode());
+ resp.setErrorMessage(opException.getMessage());
+ }
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting operation status: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
+ TCancelOperationResp resp = new TCancelOperationResp();
+ try {
+ cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error cancelling operation: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
+ TCloseOperationResp resp = new TCloseOperationResp();
+ try {
+ cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error closing operation: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
+ throws TException {
+ TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+ try {
+ TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
+ resp.setSchema(schema.toTTableSchema());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error getting result set metadata: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
+ TFetchResultsResp resp = new TFetchResultsResp();
+ try {
+ RowSet rowSet = cliService.fetchResults(
+ new OperationHandle(req.getOperationHandle()),
+ FetchOrientation.getFetchOrientation(req.getOrientation()),
+ req.getMaxRows(),
+ FetchType.getFetchType(req.getFetchType()));
+ resp.setResults(rowSet.toTRowSet());
+ resp.setHasMoreRows(false);
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ LOG.warn("Error fetching results: ", e);
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public abstract void run();
+
+ /**
+ * If the proxy user name is provided then check privileges to substitute the user.
+ * @param realUser
+ * @param sessionConf
+ * @param ipAddress
+ * @return
+ * @throws HiveSQLException
+ */
+ private String getProxyUser(String realUser, Map<String, String> sessionConf,
+ String ipAddress) throws HiveSQLException {
+ String proxyUser = null;
+ // Http transport mode.
+ // We set the thread local proxy username, in ThriftHttpServlet.
+ if (cliService.getHiveConf().getVar(
+ ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+ proxyUser = SessionManager.getProxyUserName();
+ LOG.debug("Proxy user from query string: " + proxyUser);
+ }
+
+ if (proxyUser == null && sessionConf != null && sessionConf.containsKey(HiveAuthFactory.HS2_PROXY_USER)) {
+ String proxyUserFromThriftBody = sessionConf.get(HiveAuthFactory.HS2_PROXY_USER);
+ LOG.debug("Proxy user from thrift body: " + proxyUserFromThriftBody);
+ proxyUser = proxyUserFromThriftBody;
+ }
+
+ if (proxyUser == null) {
+ return realUser;
+ }
+
+ // check whether substitution is allowed
+ if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION)) {
+ throw new HiveSQLException("Proxy user substitution is not allowed");
+ }
+
+ // If there's no authentication, then directly substitute the user
+ if (HiveAuthFactory.AuthTypes.NONE.toString().
+ equalsIgnoreCase(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
+ return proxyUser;
+ }
+
+ // Verify proxy user privilege of the realUser for the proxyUser
+ HiveAuthFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hiveConf);
+ LOG.debug("Verified proxy user: " + proxyUser);
+ return proxyUser;
+ }
+
+ private boolean isKerberosAuthMode() {
+ return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
+ .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
+ }
+}