aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java')
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java322
1 files changed, 322 insertions, 0 deletions
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
new file mode 100644
index 0000000000..19153b654b
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli.operation;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.EnumSet;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.OperationLog;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.cli.thrift.TProtocolVersion;
+
+public abstract class Operation {
+ protected final HiveSession parentSession;
+ private OperationState state = OperationState.INITIALIZED;
+ private final OperationHandle opHandle;
+ private HiveConf configuration;
+ public static final Log LOG = LogFactory.getLog(Operation.class.getName());
+ public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
+ public static final long DEFAULT_FETCH_MAX_ROWS = 100;
+ protected boolean hasResultSet;
+ protected volatile HiveSQLException operationException;
+ protected final boolean runAsync;
+ protected volatile Future<?> backgroundHandle;
+ protected OperationLog operationLog;
+ protected boolean isOperationLogEnabled;
+
+ private long operationTimeout;
+ private long lastAccessTime;
+
+ protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
+ EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
+
+ protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
+ this.parentSession = parentSession;
+ this.runAsync = runInBackground;
+ this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
+ lastAccessTime = System.currentTimeMillis();
+ operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public Future<?> getBackgroundHandle() {
+ return backgroundHandle;
+ }
+
+ protected void setBackgroundHandle(Future<?> backgroundHandle) {
+ this.backgroundHandle = backgroundHandle;
+ }
+
+ public boolean shouldRunAsync() {
+ return runAsync;
+ }
+
+ public void setConfiguration(HiveConf configuration) {
+ this.configuration = new HiveConf(configuration);
+ }
+
+ public HiveConf getConfiguration() {
+ return new HiveConf(configuration);
+ }
+
+ public HiveSession getParentSession() {
+ return parentSession;
+ }
+
+ public OperationHandle getHandle() {
+ return opHandle;
+ }
+
+ public TProtocolVersion getProtocolVersion() {
+ return opHandle.getProtocolVersion();
+ }
+
+ public OperationType getType() {
+ return opHandle.getOperationType();
+ }
+
+ public OperationStatus getStatus() {
+ return new OperationStatus(state, operationException);
+ }
+
+ public boolean hasResultSet() {
+ return hasResultSet;
+ }
+
+ protected void setHasResultSet(boolean hasResultSet) {
+ this.hasResultSet = hasResultSet;
+ opHandle.setHasResultSet(hasResultSet);
+ }
+
+ public OperationLog getOperationLog() {
+ return operationLog;
+ }
+
+ protected final OperationState setState(OperationState newState) throws HiveSQLException {
+ state.validateTransition(newState);
+ this.state = newState;
+ this.lastAccessTime = System.currentTimeMillis();
+ return this.state;
+ }
+
+ public boolean isTimedOut(long current) {
+ if (operationTimeout == 0) {
+ return false;
+ }
+ if (operationTimeout > 0) {
+ // check only when it's in terminal state
+ return state.isTerminal() && lastAccessTime + operationTimeout <= current;
+ }
+ return lastAccessTime + -operationTimeout <= current;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public long getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public void setOperationTimeout(long operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
+ protected void setOperationException(HiveSQLException operationException) {
+ this.operationException = operationException;
+ }
+
+ protected final void assertState(OperationState state) throws HiveSQLException {
+ if (this.state != state) {
+ throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
+ }
+ this.lastAccessTime = System.currentTimeMillis();
+ }
+
+ public boolean isRunning() {
+ return OperationState.RUNNING.equals(state);
+ }
+
+ public boolean isFinished() {
+ return OperationState.FINISHED.equals(state);
+ }
+
+ public boolean isCanceled() {
+ return OperationState.CANCELED.equals(state);
+ }
+
+ public boolean isFailed() {
+ return OperationState.ERROR.equals(state);
+ }
+
+ protected void createOperationLog() {
+ if (parentSession.isOperationLogEnabled()) {
+ File operationLogFile = new File(parentSession.getOperationLogSessionDir(),
+ opHandle.getHandleIdentifier().toString());
+ isOperationLogEnabled = true;
+
+ // create log file
+ try {
+ if (operationLogFile.exists()) {
+ LOG.warn("The operation log file should not exist, but it is already there: " +
+ operationLogFile.getAbsolutePath());
+ operationLogFile.delete();
+ }
+ if (!operationLogFile.createNewFile()) {
+ // the log file already exists and cannot be deleted.
+ // If it can be read/written, keep its contents and use it.
+ if (!operationLogFile.canRead() || !operationLogFile.canWrite()) {
+ LOG.warn("The already existed operation log file cannot be recreated, " +
+ "and it cannot be read or written: " + operationLogFile.getAbsolutePath());
+ isOperationLogEnabled = false;
+ return;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e);
+ isOperationLogEnabled = false;
+ return;
+ }
+
+ // create OperationLog object with above log file
+ try {
+ operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf());
+ } catch (FileNotFoundException e) {
+ LOG.warn("Unable to instantiate OperationLog object for operation: " +
+ opHandle, e);
+ isOperationLogEnabled = false;
+ return;
+ }
+
+ // register this operationLog to current thread
+ OperationLog.setCurrentOperationLog(operationLog);
+ }
+ }
+
+ protected void unregisterOperationLog() {
+ if (isOperationLogEnabled) {
+ OperationLog.removeCurrentOperationLog();
+ }
+ }
+
+ /**
+ * Invoked before runInternal().
+ * Set up some preconditions, or configurations.
+ */
+ protected void beforeRun() {
+ createOperationLog();
+ }
+
+ /**
+ * Invoked after runInternal(), even if an exception is thrown in runInternal().
+ * Clean up resources, which was set up in beforeRun().
+ */
+ protected void afterRun() {
+ unregisterOperationLog();
+ }
+
+ /**
+ * Implemented by subclass of Operation class to execute specific behaviors.
+ * @throws HiveSQLException
+ */
+ protected abstract void runInternal() throws HiveSQLException;
+
+ public void run() throws HiveSQLException {
+ beforeRun();
+ try {
+ runInternal();
+ } finally {
+ afterRun();
+ }
+ }
+
+ protected void cleanupOperationLog() {
+ if (isOperationLogEnabled) {
+ if (operationLog == null) {
+ LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] "
+ + "logging is enabled, but its OperationLog object cannot be found.");
+ } else {
+ operationLog.close();
+ }
+ }
+ }
+
+ // TODO: make this abstract and implement in subclasses.
+ public void cancel() throws HiveSQLException {
+ setState(OperationState.CANCELED);
+ throw new UnsupportedOperationException("SQLOperation.cancel()");
+ }
+
+ public abstract void close() throws HiveSQLException;
+
+ public abstract TableSchema getResultSetSchema() throws HiveSQLException;
+
+ public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
+
+ public RowSet getNextRowSet() throws HiveSQLException {
+ return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
+ }
+
+ /**
+ * Verify if the given fetch orientation is part of the default orientation types.
+ * @param orientation
+ * @throws HiveSQLException
+ */
+ protected void validateDefaultFetchOrientation(FetchOrientation orientation)
+ throws HiveSQLException {
+ validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET);
+ }
+
+ /**
+ * Verify if the given fetch orientation is part of the supported orientation types.
+ * @param orientation
+ * @param supportedOrientations
+ * @throws HiveSQLException
+ */
+ protected void validateFetchOrientation(FetchOrientation orientation,
+ EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
+ if (!supportedOrientations.contains(orientation)) {
+ throw new HiveSQLException("The fetch type " + orientation.toString() +
+ " is not supported for this resultset", "HY106");
+ }
+ }
+
+ protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) {
+ HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(),
+ response.getSQLState(), response.getResponseCode());
+ if (response.getException() != null) {
+ ex.initCause(response.getException());
+ }
+ return ex;
+ }
+}