aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheolsoo Park <cheolsoop@netflix.com>2015-06-04 13:27:35 -0700
committerReynold Xin <rxin@databricks.com>2015-06-04 13:27:35 -0700
commit0526fea483066086dfc27d1606f74220fe822f7f (patch)
tree78257ef04b84972800f47fae3b6044b6bc0d5d36
parent3dc005282a694e105f40e429b28b0a677743341f (diff)
downloadspark-0526fea483066086dfc27d1606f74220fe822f7f.tar.gz
spark-0526fea483066086dfc27d1606f74220fe822f7f.tar.bz2
spark-0526fea483066086dfc27d1606f74220fe822f7f.zip
[SPARK-6909][SQL] Remove Hive Shim code
This is a follow-up on #6393. I am removing the following files in this PR. ``` ./sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala ./sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala ``` Basically, I re-factored the shim code as follows- * Rewrote code directly with Hive 0.13 methods, or * Converted code into private methods, or * Extracted code into separate classes But for leftover code that didn't fit in any of these cases, I created a HiveShim object. For eg, helper functions which wrap Hive 0.13 methods to work around Hive bugs are placed here. Author: Cheolsoo Park <cheolsoop@netflix.com> Closes #6604 from piaozhexiu/SPARK-6909 and squashes the following commits: 5dccc20 [Cheolsoo Park] Remove hive shim code
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala10
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala (renamed from sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala)102
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala6
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala7
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala (renamed from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala)20
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala4
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala75
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala8
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala187
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala247
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala46
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala25
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala58
-rw-r--r--sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala457
23 files changed, 619 insertions, 716 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 94687eeda4..5b391d3dce 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -17,9 +17,6 @@
package org.apache.spark.sql.hive.thriftserver
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -29,12 +26,15 @@ import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
-import org.apache.spark.sql.hive.{HiveContext, HiveShim}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkContext}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
* `HiveThriftServer2` thrift server.
@@ -51,7 +51,7 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: HiveContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
- sqlContext.setConf("spark.sql.hive.version", HiveShim.version)
+ sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
server.init(sqlContext.hiveconf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index b9d4f1c58c..c0d1266212 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -18,66 +18,31 @@
package org.apache.spark.sql.hive.thriftserver
import java.sql.{Date, Timestamp}
-import java.util.concurrent.Executors
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID}
-
-import org.apache.commons.logging.Log
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.cli.thrift.TProtocolVersion
-import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import java.util.{Map => JMap, UUID}
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
-import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
+import org.apache.hive.service.cli.session.HiveSession
-import org.apache.spark.{SparkContext, Logging}
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
+import org.apache.spark.Logging
import org.apache.spark.sql.execution.SetCommand
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
-/**
- * A compatibility layer for interacting with Hive version 0.13.1.
- */
-private[thriftserver] object HiveThriftServerShim {
- val version = "0.13.1"
-
- def setServerUserName(
- sparkServiceUGI: UserGroupInformation,
- sparkCliService:SparkSQLCLIService) = {
- setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
- }
-}
-
-private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
- extends AbstractSparkSQLDriver(_context) {
- override def getResults(res: JList[_]): Boolean = {
- if (hiveResponse == null) {
- false
- } else {
- res.asInstanceOf[JArrayList[String]].addAll(hiveResponse)
- hiveResponse = null
- true
- }
- }
-}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
- runInBackground: Boolean = true)(
- hiveContext: HiveContext,
- sessionToActivePool: SMap[SessionHandle, String])
+ runInBackground: Boolean = true)
+ (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String])
// NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
- extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {
+ extends ExecuteStatementOperation(parentSession, statement, confOverlay, false)
+ with Logging {
private var result: DataFrame = _
private var iter: Iterator[SparkRow] = _
@@ -88,7 +53,7 @@ private[hive] class SparkExecuteStatementOperation(
logDebug("CLOSING")
}
- def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
+ def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to += from.getString(ordinal)
@@ -209,48 +174,3 @@ private[hive] class SparkExecuteStatementOperation(
HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
-
-private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
- extends SessionManager
- with ReflectedCompositeService {
-
- private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
-
- override def init(hiveConf: HiveConf) {
- setSuperField(this, "hiveConf", hiveConf)
-
- val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
- setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
- getAncestorField[Log](this, 3, "LOG").info(
- s"HiveServer2: Async execution pool size $backgroundPoolSize")
-
- setSuperField(this, "operationManager", sparkSqlOperationManager)
- addService(sparkSqlOperationManager)
-
- initCompositeService(hiveConf)
- }
-
- override def openSession(
- protocol: TProtocolVersion,
- username: String,
- passwd: String,
- sessionConf: java.util.Map[String, String],
- withImpersonation: Boolean,
- delegationToken: String): SessionHandle = {
- hiveContext.openSession()
- val sessionHandle = super.openSession(
- protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
- val session = super.getSession(sessionHandle)
- HiveThriftServer2.listener.onSessionCreated(
- session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
- sessionHandle
- }
-
- override def closeSession(sessionHandle: SessionHandle) {
- HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
- super.closeSession(sessionHandle)
- sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-
- hiveContext.detachSession()
- }
-}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 14f6f658d9..039cfa40d2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -32,12 +32,12 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor}
+import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.thrift.transport.TSocket
import org.apache.spark.Logging
-import org.apache.spark.sql.hive.{HiveContext, HiveShim}
+import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.util.Utils
private[hive] object SparkSQLCLIDriver {
@@ -267,7 +267,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
} else {
var ret = 0
val hconf = conf.asInstanceOf[HiveConf]
- val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
+ val proc: CommandProcessor = CommandProcessorFactory.get(Array(tokens(0)), hconf)
if (proc != null) {
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 499e077d72..41f647d5f8 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -21,8 +21,6 @@ import java.io.IOException
import java.util.{List => JList}
import javax.security.auth.login.LoginException
-import scala.collection.JavaConversions._
-
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
@@ -34,7 +32,8 @@ import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.util.Utils
+
+import scala.collection.JavaConversions._
private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
extends CLIService
@@ -52,7 +51,7 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
- HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
+ setSuperField(this, "serviceUGI", sparkServiceUGI)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 48ac9062af..77272aecf2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.thriftserver
-import scala.collection.JavaConversions._
+import java.util.{ArrayList => JArrayList, List => JList}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
@@ -27,8 +27,12 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.Logging
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-private[hive] abstract class AbstractSparkSQLDriver(
- val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {
+import scala.collection.JavaConversions._
+
+private[hive] class SparkSQLDriver(
+ val context: HiveContext = SparkSQLEnv.hiveContext)
+ extends Driver
+ with Logging {
private[hive] var tableSchema: Schema = _
private[hive] var hiveResponse: Seq[String] = _
@@ -71,6 +75,16 @@ private[hive] abstract class AbstractSparkSQLDriver(
0
}
+ override def getResults(res: JList[_]): Boolean = {
+ if (hiveResponse == null) {
+ false
+ } else {
+ res.asInstanceOf[JArrayList[String]].addAll(hiveResponse)
+ hiveResponse = null
+ true
+ }
+ }
+
override def getSchema: Schema = tableSchema
override def destroy() {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 7c0c505e2d..79eda1f512 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -22,7 +22,7 @@ import java.io.PrintStream
import scala.collection.JavaConversions._
import org.apache.spark.scheduler.StatsReportListener
-import org.apache.spark.sql.hive.{HiveShim, HiveContext}
+import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.util.Utils
@@ -56,7 +56,7 @@ private[hive] object SparkSQLEnv extends Logging {
hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
- hiveContext.setConf("spark.sql.hive.version", HiveShim.version)
+ hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
if (log.isDebugEnabled) {
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
new file mode 100644
index 0000000000..357b27f740
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.util.concurrent.Executors
+
+import org.apache.commons.logging.Log
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hive.service.cli.SessionHandle
+import org.apache.hive.service.cli.session.SessionManager
+import org.apache.hive.service.cli.thrift.TProtocolVersion
+
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+
+private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
+ extends SessionManager
+ with ReflectedCompositeService {
+
+ private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+
+ override def init(hiveConf: HiveConf) {
+ setSuperField(this, "hiveConf", hiveConf)
+
+ val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
+ setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
+ getAncestorField[Log](this, 3, "LOG").info(
+ s"HiveServer2: Async execution pool size $backgroundPoolSize")
+
+ setSuperField(this, "operationManager", sparkSqlOperationManager)
+ addService(sparkSqlOperationManager)
+
+ initCompositeService(hiveConf)
+ }
+
+ override def openSession(protocol: TProtocolVersion,
+ username: String,
+ passwd: String,
+ sessionConf: java.util.Map[String, String],
+ withImpersonation: Boolean,
+ delegationToken: String): SessionHandle = {
+ hiveContext.openSession()
+ val sessionHandle = super.openSession(
+ protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
+ val session = super.getSession(sessionHandle)
+ HiveThriftServer2.listener.onSessionCreated(
+ session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
+ sessionHandle
+ }
+
+ override def closeSession(sessionHandle: SessionHandle) {
+ HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
+ super.closeSession(sessionHandle)
+ sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+
+ hiveContext.detachSession()
+ }
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index a93a3dee43..f57c7083ea 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -40,7 +40,7 @@ import org.apache.thrift.transport.TSocket
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.util.Utils
object TestData {
@@ -111,7 +111,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
withJdbcStatement { statement =>
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
- assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
+ assert(resultSet.getString(1) ===
+ s"spark.sql.hive.version=${HiveContext.hiveExecutionVersion}")
}
}
@@ -365,7 +366,8 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
withJdbcStatement { statement =>
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
- assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
+ assert(resultSet.getString(1) ===
+ s"spark.sql.hive.version=${HiveContext.hiveExecutionVersion}")
}
}
}
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 0b1917a392..048f78b4da 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -23,7 +23,6 @@ import java.util.{Locale, TimeZone}
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.hive.test.TestHive
/**
@@ -254,7 +253,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// the answer is sensitive for jdk version
"udf_java_method"
- ) ++ HiveShim.compatibilityBlackList
+ )
/**
* The set of tests that are believed to be working in catalyst. Tests not on whiteList or
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index fbf2c7d8cb..800f51c5e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -17,37 +17,34 @@
package org.apache.spark.sql.hive
-import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
+import java.io.File
import java.net.{URL, URLClassLoader}
import java.sql.Timestamp
-import java.util.{ArrayList => JArrayList}
-import org.apache.hadoop.hive.ql.parse.VariableSubstitution
+import org.apache.hadoop.hive.common.StatsSetupConst
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.spark.sql.catalyst.ParserDialect
import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
-import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand}
+import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
+import org.apache.spark.sql.sources.DataSourceStrategy
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -331,7 +328,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val tableParameters = relation.hiveQlTable.getParameters
val oldTotalSize =
- Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize))
+ Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
.map(_.toLong)
.getOrElse(0L)
val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
@@ -342,7 +339,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.client.alterTable(
relation.table.copy(
properties = relation.table.properties +
- (HiveShim.getStatsSetupConstTotalSize -> newTotalSize.toString)))
+ (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
}
case otherRelation =>
throw new UnsupportedOperationException(
@@ -564,7 +561,7 @@ private[hive] object HiveContext {
case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
case (decimal: java.math.BigDecimal, DecimalType()) =>
// Hive strips trailing zeros so use its toString
- HiveShim.createDecimal(decimal).toString
+ HiveDecimal.create(decimal).toString
case (other, tpe) if primitiveTypes contains tpe => other.toString
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 24cd335082..c466203cd0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _}
+import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfoFactory}
import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.{io => hadoopIo}
@@ -350,7 +351,7 @@ private[hive] trait HiveInspectors {
new HiveVarchar(s, s.size)
case _: JavaHiveDecimalObjectInspector =>
- (o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toJavaBigDecimal)
+ (o: Any) => HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal)
case _: JavaDateObjectInspector =>
(o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
@@ -439,31 +440,31 @@ private[hive] trait HiveInspectors {
case _ if a == null => null
case x: PrimitiveObjectInspector => x match {
// TODO we don't support the HiveVarcharObjectInspector yet.
- case _: StringObjectInspector if x.preferWritable() => HiveShim.getStringWritable(a)
+ case _: StringObjectInspector if x.preferWritable() => getStringWritable(a)
case _: StringObjectInspector => a.asInstanceOf[UTF8String].toString()
- case _: IntObjectInspector if x.preferWritable() => HiveShim.getIntWritable(a)
+ case _: IntObjectInspector if x.preferWritable() => getIntWritable(a)
case _: IntObjectInspector => a.asInstanceOf[java.lang.Integer]
- case _: BooleanObjectInspector if x.preferWritable() => HiveShim.getBooleanWritable(a)
+ case _: BooleanObjectInspector if x.preferWritable() => getBooleanWritable(a)
case _: BooleanObjectInspector => a.asInstanceOf[java.lang.Boolean]
- case _: FloatObjectInspector if x.preferWritable() => HiveShim.getFloatWritable(a)
+ case _: FloatObjectInspector if x.preferWritable() => getFloatWritable(a)
case _: FloatObjectInspector => a.asInstanceOf[java.lang.Float]
- case _: DoubleObjectInspector if x.preferWritable() => HiveShim.getDoubleWritable(a)
+ case _: DoubleObjectInspector if x.preferWritable() => getDoubleWritable(a)
case _: DoubleObjectInspector => a.asInstanceOf[java.lang.Double]
- case _: LongObjectInspector if x.preferWritable() => HiveShim.getLongWritable(a)
+ case _: LongObjectInspector if x.preferWritable() => getLongWritable(a)
case _: LongObjectInspector => a.asInstanceOf[java.lang.Long]
- case _: ShortObjectInspector if x.preferWritable() => HiveShim.getShortWritable(a)
+ case _: ShortObjectInspector if x.preferWritable() => getShortWritable(a)
case _: ShortObjectInspector => a.asInstanceOf[java.lang.Short]
- case _: ByteObjectInspector if x.preferWritable() => HiveShim.getByteWritable(a)
+ case _: ByteObjectInspector if x.preferWritable() => getByteWritable(a)
case _: ByteObjectInspector => a.asInstanceOf[java.lang.Byte]
case _: HiveDecimalObjectInspector if x.preferWritable() =>
- HiveShim.getDecimalWritable(a.asInstanceOf[Decimal])
+ getDecimalWritable(a.asInstanceOf[Decimal])
case _: HiveDecimalObjectInspector =>
- HiveShim.createDecimal(a.asInstanceOf[Decimal].toJavaBigDecimal)
- case _: BinaryObjectInspector if x.preferWritable() => HiveShim.getBinaryWritable(a)
+ HiveDecimal.create(a.asInstanceOf[Decimal].toJavaBigDecimal)
+ case _: BinaryObjectInspector if x.preferWritable() => getBinaryWritable(a)
case _: BinaryObjectInspector => a.asInstanceOf[Array[Byte]]
- case _: DateObjectInspector if x.preferWritable() => HiveShim.getDateWritable(a)
+ case _: DateObjectInspector if x.preferWritable() => getDateWritable(a)
case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
- case _: TimestampObjectInspector if x.preferWritable() => HiveShim.getTimestampWritable(a)
+ case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a)
case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp]
}
case x: SettableStructObjectInspector =>
@@ -574,31 +575,31 @@ private[hive] trait HiveInspectors {
*/
def toInspector(expr: Expression): ObjectInspector = expr match {
case Literal(value, StringType) =>
- HiveShim.getStringWritableConstantObjectInspector(value)
+ getStringWritableConstantObjectInspector(value)
case Literal(value, IntegerType) =>
- HiveShim.getIntWritableConstantObjectInspector(value)
+ getIntWritableConstantObjectInspector(value)
case Literal(value, DoubleType) =>
- HiveShim.getDoubleWritableConstantObjectInspector(value)
+ getDoubleWritableConstantObjectInspector(value)
case Literal(value, BooleanType) =>
- HiveShim.getBooleanWritableConstantObjectInspector(value)
+ getBooleanWritableConstantObjectInspector(value)
case Literal(value, LongType) =>
- HiveShim.getLongWritableConstantObjectInspector(value)
+ getLongWritableConstantObjectInspector(value)
case Literal(value, FloatType) =>
- HiveShim.getFloatWritableConstantObjectInspector(value)
+ getFloatWritableConstantObjectInspector(value)
case Literal(value, ShortType) =>
- HiveShim.getShortWritableConstantObjectInspector(value)
+ getShortWritableConstantObjectInspector(value)
case Literal(value, ByteType) =>
- HiveShim.getByteWritableConstantObjectInspector(value)
+ getByteWritableConstantObjectInspector(value)
case Literal(value, BinaryType) =>
- HiveShim.getBinaryWritableConstantObjectInspector(value)
+ getBinaryWritableConstantObjectInspector(value)
case Literal(value, DateType) =>
- HiveShim.getDateWritableConstantObjectInspector(value)
+ getDateWritableConstantObjectInspector(value)
case Literal(value, TimestampType) =>
- HiveShim.getTimestampWritableConstantObjectInspector(value)
+ getTimestampWritableConstantObjectInspector(value)
case Literal(value, DecimalType()) =>
- HiveShim.getDecimalWritableConstantObjectInspector(value)
+ getDecimalWritableConstantObjectInspector(value)
case Literal(_, NullType) =>
- HiveShim.getPrimitiveNullWritableConstantObjectInspector
+ getPrimitiveNullWritableConstantObjectInspector
case Literal(value, ArrayType(dt, _)) =>
val listObjectInspector = toInspector(dt)
if (value == null) {
@@ -658,8 +659,8 @@ private[hive] trait HiveInspectors {
case _: JavaFloatObjectInspector => FloatType
case _: WritableBinaryObjectInspector => BinaryType
case _: JavaBinaryObjectInspector => BinaryType
- case w: WritableHiveDecimalObjectInspector => HiveShim.decimalTypeInfoToCatalyst(w)
- case j: JavaHiveDecimalObjectInspector => HiveShim.decimalTypeInfoToCatalyst(j)
+ case w: WritableHiveDecimalObjectInspector => decimalTypeInfoToCatalyst(w)
+ case j: JavaHiveDecimalObjectInspector => decimalTypeInfoToCatalyst(j)
case _: WritableDateObjectInspector => DateType
case _: JavaDateObjectInspector => DateType
case _: WritableTimestampObjectInspector => TimestampType
@@ -668,10 +669,136 @@ private[hive] trait HiveInspectors {
case _: JavaVoidObjectInspector => NullType
}
+ private def decimalTypeInfoToCatalyst(inspector: PrimitiveObjectInspector): DecimalType = {
+ val info = inspector.getTypeInfo.asInstanceOf[DecimalTypeInfo]
+ DecimalType(info.precision(), info.scale())
+ }
+
+ private def getStringWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.stringTypeInfo, getStringWritable(value))
+
+ private def getIntWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.intTypeInfo, getIntWritable(value))
+
+ private def getDoubleWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.doubleTypeInfo, getDoubleWritable(value))
+
+ private def getBooleanWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.booleanTypeInfo, getBooleanWritable(value))
+
+ private def getLongWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.longTypeInfo, getLongWritable(value))
+
+ private def getFloatWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.floatTypeInfo, getFloatWritable(value))
+
+ private def getShortWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.shortTypeInfo, getShortWritable(value))
+
+ private def getByteWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.byteTypeInfo, getByteWritable(value))
+
+ private def getBinaryWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.binaryTypeInfo, getBinaryWritable(value))
+
+ private def getDateWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.dateTypeInfo, getDateWritable(value))
+
+ private def getTimestampWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.timestampTypeInfo, getTimestampWritable(value))
+
+ private def getDecimalWritableConstantObjectInspector(value: Any): ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.decimalTypeInfo, getDecimalWritable(value))
+
+ private def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+ TypeInfoFactory.voidTypeInfo, null)
+
+ private def getStringWritable(value: Any): hadoopIo.Text =
+ if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
+
+ private def getIntWritable(value: Any): hadoopIo.IntWritable =
+ if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
+
+ private def getDoubleWritable(value: Any): hiveIo.DoubleWritable =
+ if (value == null) {
+ null
+ } else {
+ new hiveIo.DoubleWritable(value.asInstanceOf[Double])
+ }
+
+ private def getBooleanWritable(value: Any): hadoopIo.BooleanWritable =
+ if (value == null) {
+ null
+ } else {
+ new hadoopIo.BooleanWritable(value.asInstanceOf[Boolean])
+ }
+
+ private def getLongWritable(value: Any): hadoopIo.LongWritable =
+ if (value == null) null else new hadoopIo.LongWritable(value.asInstanceOf[Long])
+
+ private def getFloatWritable(value: Any): hadoopIo.FloatWritable =
+ if (value == null) {
+ null
+ } else {
+ new hadoopIo.FloatWritable(value.asInstanceOf[Float])
+ }
+
+ private def getShortWritable(value: Any): hiveIo.ShortWritable =
+ if (value == null) null else new hiveIo.ShortWritable(value.asInstanceOf[Short])
+
+ private def getByteWritable(value: Any): hiveIo.ByteWritable =
+ if (value == null) null else new hiveIo.ByteWritable(value.asInstanceOf[Byte])
+
+ private def getBinaryWritable(value: Any): hadoopIo.BytesWritable =
+ if (value == null) {
+ null
+ } else {
+ new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
+ }
+
+ private def getDateWritable(value: Any): hiveIo.DateWritable =
+ if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
+
+ private def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
+ if (value == null) {
+ null
+ } else {
+ new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
+ }
+
+ private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
+ if (value == null) {
+ null
+ } else {
+ // TODO precise, scale?
+ new hiveIo.HiveDecimalWritable(
+ HiveDecimal.create(value.asInstanceOf[Decimal].toJavaBigDecimal))
+ }
+
implicit class typeInfoConversions(dt: DataType) {
import org.apache.hadoop.hive.serde2.typeinfo._
import TypeInfoFactory._
+ private def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match {
+ case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale)
+ case _ => new DecimalTypeInfo(
+ HiveShim.UNLIMITED_DECIMAL_PRECISION,
+ HiveShim.UNLIMITED_DECIMAL_SCALE)
+ }
+
def toTypeInfo: TypeInfo = dt match {
case ArrayType(elemType, _) =>
getListTypeInfo(elemType.toTypeInfo)
@@ -690,7 +817,7 @@ private[hive] trait HiveInspectors {
case LongType => longTypeInfo
case ShortType => shortTypeInfo
case StringType => stringTypeInfo
- case d: DecimalType => HiveShim.decimalTypeInfo(d)
+ case d: DecimalType => decimalTypeInfo(d)
case DateType => dateTypeInfo
case TimestampType => timestampTypeInfo
case NullType => voidTypeInfo
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ca1f49b546..5a4651a887 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive
import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.Warehouse
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata._
-import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
@@ -37,7 +39,6 @@ import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
-import org.apache.spark.util.Utils
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -670,8 +671,8 @@ private[hive] case class MetastoreRelation
@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
- val totalSize = hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstTotalSize)
- val rawDataSize = hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstRawDataSize)
+ val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
+ val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore. An
@@ -697,11 +698,7 @@ private[hive] case class MetastoreRelation
}
}
- val tableDesc = HiveShim.getTableDesc(
- Class.forName(
- hiveQlTable.getSerializationLib,
- true,
- Utils.getContextOrSparkClassLoader).asInstanceOf[Class[Deserializer]],
+ val tableDesc = new TableDesc(
hiveQlTable.getInputFormatClass,
// The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
// getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
@@ -743,6 +740,11 @@ private[hive] case class MetastoreRelation
private[hive] object HiveMetastoreTypes {
def toDataType(metastoreType: String): DataType = DataTypeParser.parse(metastoreType)
+ def decimalMetastoreString(decimalType: DecimalType): String = decimalType match {
+ case DecimalType.Fixed(precision, scale) => s"decimal($precision,$scale)"
+ case _ => s"decimal($HiveShim.UNLIMITED_DECIMAL_PRECISION,$HiveShim.UNLIMITED_DECIMAL_SCALE)"
+ }
+
def toMetastoreType(dt: DataType): String = dt match {
case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>"
case StructType(fields) =>
@@ -759,7 +761,7 @@ private[hive] object HiveMetastoreTypes {
case BinaryType => "binary"
case BooleanType => "boolean"
case DateType => "date"
- case d: DecimalType => HiveShim.decimalMetastoreString(d)
+ case d: DecimalType => decimalMetastoreString(d)
case TimestampType => "timestamp"
case NullType => "void"
case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index a5ca3613c5..9544d12c90 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive
import java.sql.Date
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
@@ -39,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.sources.DescribeCommand
+import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
import org.apache.spark.sql.types._
@@ -46,6 +45,7 @@ import org.apache.spark.util.random.RandomSampler
/* Implicit conversions */
import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
/**
* Used when we need to start parsing the AST before deciding that we are going to pass the command
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
new file mode 100644
index 0000000000..fa5409f602
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.{InputStream, OutputStream}
+import java.rmi.server.UID
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
+import org.apache.hadoop.io.Writable
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.util.Utils
+
+/* Implicit conversions */
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+private[hive] object HiveShim {
+ // Precision and scale to pass for unlimited decimals; these are the same as the precision and
+ // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs)
+ val UNLIMITED_DECIMAL_PRECISION = 38
+ val UNLIMITED_DECIMAL_SCALE = 18
+
+ /*
+ * This function in hive-0.13 become private, but we have to do this to walkaround hive bug
+ */
+ private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) {
+ val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")
+ val result: StringBuilder = new StringBuilder(old)
+ var first: Boolean = old.isEmpty
+
+ for (col <- cols) {
+ if (first) {
+ first = false
+ } else {
+ result.append(',')
+ }
+ result.append(col)
+ }
+ conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString)
+ }
+
+ /*
+ * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty
+ */
+ def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
+ if (ids != null && ids.size > 0) {
+ ColumnProjectionUtils.appendReadColumns(conf, ids)
+ }
+ if (names != null && names.size > 0) {
+ appendReadColumnNames(conf, names)
+ }
+ }
+
+ /*
+ * Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
+ * is needed to initialize before serialization.
+ */
+ def prepareWritable(w: Writable): Writable = {
+ w match {
+ case w: AvroGenericRecordWritable =>
+ w.setRecordReaderID(new UID())
+ case _ =>
+ }
+ w
+ }
+
+ def toCatalystDecimal(hdoi: HiveDecimalObjectInspector, data: Any): Decimal = {
+ if (hdoi.preferWritable()) {
+ Decimal(hdoi.getPrimitiveWritableObject(data).getHiveDecimal().bigDecimalValue,
+ hdoi.precision(), hdoi.scale())
+ } else {
+ Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
+ }
+ }
+
+ /**
+ * This class provides the UDF creation and also the UDF instance serialization and
+ * de-serialization cross process boundary.
+ *
+ * Detail discussion can be found at https://github.com/apache/spark/pull/3640
+ *
+ * @param functionClassName UDF class name
+ */
+ private[hive] case class HiveFunctionWrapper(var functionClassName: String)
+ extends java.io.Externalizable {
+
+ // for Serialization
+ def this() = this(null)
+
+ @transient
+ def deserializeObjectByKryo[T: ClassTag](
+ kryo: Kryo,
+ in: InputStream,
+ clazz: Class[_]): T = {
+ val inp = new Input(in)
+ val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
+ inp.close()
+ t
+ }
+
+ @transient
+ def serializeObjectByKryo(
+ kryo: Kryo,
+ plan: Object,
+ out: OutputStream) {
+ val output: Output = new Output(out)
+ kryo.writeObject(output, plan)
+ output.close()
+ }
+
+ def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
+ deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
+ .asInstanceOf[UDFType]
+ }
+
+ def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
+ serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
+ }
+
+ private var instance: AnyRef = null
+
+ def writeExternal(out: java.io.ObjectOutput) {
+ // output the function name
+ out.writeUTF(functionClassName)
+
+ // Write a flag if instance is null or not
+ out.writeBoolean(instance != null)
+ if (instance != null) {
+ // Some of the UDF are serializable, but some others are not
+ // Hive Utilities can handle both cases
+ val baos = new java.io.ByteArrayOutputStream()
+ serializePlan(instance, baos)
+ val functionInBytes = baos.toByteArray
+
+ // output the function bytes
+ out.writeInt(functionInBytes.length)
+ out.write(functionInBytes, 0, functionInBytes.length)
+ }
+ }
+
+ def readExternal(in: java.io.ObjectInput) {
+ // read the function name
+ functionClassName = in.readUTF()
+
+ if (in.readBoolean()) {
+ // if the instance is not null
+ // read the function in bytes
+ val functionInBytesLength = in.readInt()
+ val functionInBytes = new Array[Byte](functionInBytesLength)
+ in.read(functionInBytes, 0, functionInBytesLength)
+
+ // deserialize the function object via Hive Utilities
+ instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes),
+ Utils.getContextOrSparkClassLoader.loadClass(functionClassName))
+ }
+ }
+
+ def createFunction[UDFType <: AnyRef](): UDFType = {
+ if (instance != null) {
+ instance.asInstanceOf[UDFType]
+ } else {
+ val func = Utils.getContextOrSparkClassLoader
+ .loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
+ if (!func.isInstanceOf[UDF]) {
+ // We cache the function if it's no the Simple UDF,
+ // as we always have to create new instance for Simple UDF
+ instance = func
+ }
+ func
+ }
+ }
+ }
+
+ /*
+ * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
+ * Fix it through wrapper.
+ * */
+ implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
+ var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
+ f.setCompressCodec(w.compressCodec)
+ f.setCompressType(w.compressType)
+ f.setTableInfo(w.tableInfo)
+ f.setDestTableId(w.destTableId)
+ f
+ }
+
+ /*
+ * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
+ * Fix it through wrapper.
+ */
+ private[hive] class ShimFileSinkDesc(
+ var dir: String,
+ var tableInfo: TableDesc,
+ var compressed: Boolean)
+ extends Serializable with Logging {
+ var compressCodec: String = _
+ var compressType: String = _
+ var destTableId: Int = _
+
+ def setCompressed(compressed: Boolean) {
+ this.compressed = compressed
+ }
+
+ def getDirName(): String = dir
+
+ def setDestTableId(destTableId: Int) {
+ this.destTableId = destTableId
+ }
+
+ def setTableInfo(tableInfo: TableDesc) {
+ this.tableInfo = tableInfo
+ }
+
+ def setCompressCodec(intermediateCompressorCodec: String) {
+ compressCodec = intermediateCompressorCodec
+ }
+
+ def setCompressType(intermediateCompressType: String) {
+ compressType = intermediateCompressType
+ }
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 294fc3bd7d..334bfccc9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -25,14 +25,13 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
-import org.apache.spark.SerializableWritable
+import org.apache.spark.{Logging, SerializableWritable}
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.Logging
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
@@ -172,7 +171,7 @@ class HadoopTableReader(
path.toString + tails
}
- val partPath = HiveShim.getDataLocationPath(partition)
+ val partPath = partition.getDataLocation
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
var pathPatternStr = getPathPatternByPath(partNum, partPath)
if (!pathPatternSet.contains(pathPatternStr)) {
@@ -187,7 +186,7 @@ class HadoopTableReader(
val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
- val partPath = HiveShim.getDataLocationPath(partition)
+ val partPath = partition.getDataLocation
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
val ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
@@ -325,7 +324,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
} else {
- HiveShim.getConvertedOI(
+ ObjectInspectorConverters.getConvertedOI(
rawDeser.getObjectInspector,
tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 8613332186..eeb472602b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,27 +19,25 @@ package org.apache.spark.sql.hive.execution
import java.util
-import scala.collection.JavaConversions._
-
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.MetaStoreUtils
-import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
+import scala.collection.JavaConversions._
+
private[hive]
case class InsertIntoHiveTable(
table: MetastoreRelation,
@@ -126,7 +124,7 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = HiveShim.getExternalTmpPath(hiveContext, tableLocation)
+ val tmpLocation = hiveContext.getExternalTmpPath(tableLocation.toUri)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = sc.hiveconf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 1658bb93b0..01f47352b2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.types._
/* Implicit conversions */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 2bb526b14b..ee440e304e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -35,8 +35,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.util.DateUtils
-import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 58e2d1fbfa..af586712e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -561,30 +561,28 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
}
}
- if (HiveShim.version == "0.13.1") {
- test("scan a parquet table created through a CTAS statement") {
- withSQLConf(
- "spark.sql.hive.convertMetastoreParquet" -> "true",
- SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
-
- withTempTable("jt") {
- (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
-
- withTable("test_parquet_ctas") {
- sql(
- """CREATE TABLE test_parquet_ctas STORED AS PARQUET
- |AS SELECT tmp.a FROM jt tmp WHERE tmp.a < 5
- """.stripMargin)
-
- checkAnswer(
- sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
- Row(3) :: Row(4) :: Nil)
-
- table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation2) => // OK
- case _ =>
- fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
- }
+ test("scan a parquet table created through a CTAS statement") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "true",
+ SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+
+ withTempTable("jt") {
+ (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
+
+ withTable("test_parquet_ctas") {
+ sql(
+ """CREATE TABLE test_parquet_ctas STORED AS PARQUET
+ |AS SELECT tmp.a FROM jt tmp WHERE tmp.a < 5
+ """.stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
+ Row(3) :: Row(4) :: Nil)
+
+ table("test_parquet_ctas").queryExecution.optimizedPlan match {
+ case LogicalRelation(p: ParquetRelation2) => // OK
+ case _ =>
+ fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 00a69de9e4..e16e530555 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -79,10 +79,6 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
- // TODO: How does it works? needs to add it back for other hive version.
- if (HiveShim.version =="0.12.0") {
- assert(queryTotalSize("analyzeTable") === conf.defaultSizeInBytes)
- }
sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")
assert(queryTotalSize("analyzeTable") === BigInt(11624))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 440b7c87b0..6d8d99ebc8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -874,15 +874,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|WITH serdeproperties('s1'='9')
""".stripMargin)
}
- // Now only verify 0.12.0, and ignore other versions due to binary compatibility
- // current TestSerDe.jar is from 0.12.0
- if (HiveShim.version == "0.12.0") {
- sql(s"ADD JAR $testJar")
- sql(
- """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
- |WITH serdeproperties('s1'='9')
- """.stripMargin)
- }
sql("DROP TABLE alter1")
}
@@ -890,15 +881,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
// this is a test case from mapjoin_addjar.q
val testJar = TestHive.getHiveFile("hive-hcatalog-core-0.13.1.jar").getCanonicalPath
val testData = TestHive.getHiveFile("data/files/sample.json").getCanonicalPath
- if (HiveShim.version == "0.13.1") {
- sql(s"ADD JAR $testJar")
- sql(
- """CREATE TABLE t1(a string, b string)
- |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'""".stripMargin)
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""")
- sql("select * from src join t1 on src.key = t1.a")
- sql("DROP TABLE t1")
- }
+ sql(s"ADD JAR $testJar")
+ sql(
+ """CREATE TABLE t1(a string, b string)
+ |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'""".stripMargin)
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""")
+ sql("select * from src join t1 on src.key = t1.a")
+ sql("DROP TABLE t1")
}
test("ADD FILE command") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index aba3becb1b..40a35674e4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
+import org.apache.spark.sql.hive.{HiveQLDialect, MetastoreRelation}
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
@@ -330,35 +330,33 @@ class SQLQuerySuite extends QueryTest {
"serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22", "MANAGED_TABLE"
)
- if (HiveShim.version =="0.13.1") {
- val origUseParquetDataSource = conf.parquetUseDataSourceApi
- try {
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
- sql(
- """CREATE TABLE ctas5
- | STORED AS parquet AS
- | SELECT key, value
- | FROM src
- | ORDER BY key, value""".stripMargin).collect()
-
- checkExistence(sql("DESC EXTENDED ctas5"), true,
- "name:key", "type:string", "name:value", "ctas5",
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
- "MANAGED_TABLE"
- )
-
- val default = getConf("spark.sql.hive.convertMetastoreParquet", "true")
- // use the Hive SerDe for parquet tables
- sql("set spark.sql.hive.convertMetastoreParquet = false")
- checkAnswer(
- sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
- sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
- sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
- } finally {
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource.toString)
- }
+ val origUseParquetDataSource = conf.parquetUseDataSourceApi
+ try {
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+ sql(
+ """CREATE TABLE ctas5
+ | STORED AS parquet AS
+ | SELECT key, value
+ | FROM src
+ | ORDER BY key, value""".stripMargin).collect()
+
+ checkExistence(sql("DESC EXTENDED ctas5"), true,
+ "name:key", "type:string", "name:value", "ctas5",
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
+ "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
+ "MANAGED_TABLE"
+ )
+
+ val default = getConf("spark.sql.hive.convertMetastoreParquet", "true")
+ // use the Hive SerDe for parquet tables
+ sql("set spark.sql.hive.convertMetastoreParquet = false")
+ checkAnswer(
+ sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
+ sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
+ sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
+ } finally {
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource.toString)
}
}
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
deleted file mode 100644
index dbc5e029e2..0000000000
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.rmi.server.UID
-import java.util.{Properties, ArrayList => JArrayList}
-import java.io.{OutputStream, InputStream}
-
-import scala.collection.JavaConversions._
-import scala.language.implicitConversions
-import scala.reflect.ClassTag
-
-import com.esotericsoftware.kryo.Kryo
-import com.esotericsoftware.kryo.io.Input
-import com.esotericsoftware.kryo.io.Output
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
-import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorConverters, PrimitiveObjectInspector}
-import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfo, TypeInfoFactory}
-import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
-import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.{io => hadoopIo}
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
-import org.apache.spark.util.Utils._
-
-/**
- * This class provides the UDF creation and also the UDF instance serialization and
- * de-serialization cross process boundary.
- *
- * Detail discussion can be found at https://github.com/apache/spark/pull/3640
- *
- * @param functionClassName UDF class name
- */
-private[hive] case class HiveFunctionWrapper(var functionClassName: String)
- extends java.io.Externalizable {
-
- // for Serialization
- def this() = this(null)
-
- @transient
- def deserializeObjectByKryo[T: ClassTag](
- kryo: Kryo,
- in: InputStream,
- clazz: Class[_]): T = {
- val inp = new Input(in)
- val t: T = kryo.readObject(inp,clazz).asInstanceOf[T]
- inp.close()
- t
- }
-
- @transient
- def serializeObjectByKryo(
- kryo: Kryo,
- plan: Object,
- out: OutputStream ) {
- val output: Output = new Output(out)
- kryo.writeObject(output, plan)
- output.close()
- }
-
- def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
- deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
- .asInstanceOf[UDFType]
- }
-
- def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
- serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
- }
-
- private var instance: AnyRef = null
-
- def writeExternal(out: java.io.ObjectOutput) {
- // output the function name
- out.writeUTF(functionClassName)
-
- // Write a flag if instance is null or not
- out.writeBoolean(instance != null)
- if (instance != null) {
- // Some of the UDF are serializable, but some others are not
- // Hive Utilities can handle both cases
- val baos = new java.io.ByteArrayOutputStream()
- serializePlan(instance, baos)
- val functionInBytes = baos.toByteArray
-
- // output the function bytes
- out.writeInt(functionInBytes.length)
- out.write(functionInBytes, 0, functionInBytes.length)
- }
- }
-
- def readExternal(in: java.io.ObjectInput) {
- // read the function name
- functionClassName = in.readUTF()
-
- if (in.readBoolean()) {
- // if the instance is not null
- // read the function in bytes
- val functionInBytesLength = in.readInt()
- val functionInBytes = new Array[Byte](functionInBytesLength)
- in.read(functionInBytes, 0, functionInBytesLength)
-
- // deserialize the function object via Hive Utilities
- instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes),
- getContextOrSparkClassLoader.loadClass(functionClassName))
- }
- }
-
- def createFunction[UDFType <: AnyRef](): UDFType = {
- if (instance != null) {
- instance.asInstanceOf[UDFType]
- } else {
- val func = getContextOrSparkClassLoader
- .loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
- if (!func.isInstanceOf[UDF]) {
- // We cache the function if it's no the Simple UDF,
- // as we always have to create new instance for Simple UDF
- instance = func
- }
- func
- }
- }
-}
-
-/**
- * A compatibility layer for interacting with Hive version 0.13.1.
- */
-private[hive] object HiveShim {
- val version = "0.13.1"
-
- def getTableDesc(
- serdeClass: Class[_ <: Deserializer],
- inputFormatClass: Class[_ <: InputFormat[_, _]],
- outputFormatClass: Class[_],
- properties: Properties) = {
- new TableDesc(inputFormatClass, outputFormatClass, properties)
- }
-
-
- def getStringWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.stringTypeInfo, getStringWritable(value))
-
- def getIntWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.intTypeInfo, getIntWritable(value))
-
- def getDoubleWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.doubleTypeInfo, getDoubleWritable(value))
-
- def getBooleanWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.booleanTypeInfo, getBooleanWritable(value))
-
- def getLongWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.longTypeInfo, getLongWritable(value))
-
- def getFloatWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.floatTypeInfo, getFloatWritable(value))
-
- def getShortWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.shortTypeInfo, getShortWritable(value))
-
- def getByteWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.byteTypeInfo, getByteWritable(value))
-
- def getBinaryWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.binaryTypeInfo, getBinaryWritable(value))
-
- def getDateWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.dateTypeInfo, getDateWritable(value))
-
- def getTimestampWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.timestampTypeInfo, getTimestampWritable(value))
-
- def getDecimalWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.decimalTypeInfo, getDecimalWritable(value))
-
- def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- TypeInfoFactory.voidTypeInfo, null)
-
- def getStringWritable(value: Any): hadoopIo.Text =
- if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
-
- def getIntWritable(value: Any): hadoopIo.IntWritable =
- if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
-
- def getDoubleWritable(value: Any): hiveIo.DoubleWritable =
- if (value == null) {
- null
- } else {
- new hiveIo.DoubleWritable(value.asInstanceOf[Double])
- }
-
- def getBooleanWritable(value: Any): hadoopIo.BooleanWritable =
- if (value == null) {
- null
- } else {
- new hadoopIo.BooleanWritable(value.asInstanceOf[Boolean])
- }
-
- def getLongWritable(value: Any): hadoopIo.LongWritable =
- if (value == null) null else new hadoopIo.LongWritable(value.asInstanceOf[Long])
-
- def getFloatWritable(value: Any): hadoopIo.FloatWritable =
- if (value == null) {
- null
- } else {
- new hadoopIo.FloatWritable(value.asInstanceOf[Float])
- }
-
- def getShortWritable(value: Any): hiveIo.ShortWritable =
- if (value == null) null else new hiveIo.ShortWritable(value.asInstanceOf[Short])
-
- def getByteWritable(value: Any): hiveIo.ByteWritable =
- if (value == null) null else new hiveIo.ByteWritable(value.asInstanceOf[Byte])
-
- def getBinaryWritable(value: Any): hadoopIo.BytesWritable =
- if (value == null) {
- null
- } else {
- new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
- }
-
- def getDateWritable(value: Any): hiveIo.DateWritable =
- if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
-
- def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
- if (value == null) {
- null
- } else {
- new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
- }
-
- def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
- if (value == null) {
- null
- } else {
- // TODO precise, scale?
- new hiveIo.HiveDecimalWritable(
- HiveShim.createDecimal(value.asInstanceOf[Decimal].toJavaBigDecimal))
- }
-
- def getPrimitiveNullWritable: NullWritable = NullWritable.get()
-
- def createDriverResultsArray = new JArrayList[Object]
-
- def processResults(results: JArrayList[Object]) = {
- results.map { r =>
- r match {
- case s: String => s
- case a: Array[Object] => a(0).asInstanceOf[String]
- }
- }
- }
-
- def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE
-
- def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE
-
- def createDefaultDBIfNeeded(context: HiveContext) = {
- context.runSqlHive("CREATE DATABASE default")
- context.runSqlHive("USE default")
- }
-
- def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
- CommandProcessorFactory.get(cmd, conf)
- }
-
- def createDecimal(bd: java.math.BigDecimal): HiveDecimal = {
- HiveDecimal.create(bd)
- }
-
- /*
- * This function in hive-0.13 become private, but we have to do this to walkaround hive bug
- */
- private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) {
- val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")
- val result: StringBuilder = new StringBuilder(old)
- var first: Boolean = old.isEmpty
-
- for (col <- cols) {
- if (first) {
- first = false
- } else {
- result.append(',')
- }
- result.append(col)
- }
- conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString)
- }
-
- /*
- * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty
- */
- def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
- if (ids != null && ids.size > 0) {
- ColumnProjectionUtils.appendReadColumns(conf, ids)
- }
- if (names != null && names.size > 0) {
- appendReadColumnNames(conf, names)
- }
- }
-
- def getExternalTmpPath(context: Context, path: Path) = {
- context.getExternalTmpPath(path.toUri)
- }
-
- def getDataLocationPath(p: Partition) = p.getDataLocation
-
- def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsOf(tbl)
-
- def compatibilityBlackList = Seq()
-
- def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
- tbl.setDataLocation(new Path(crtTbl.getLocation()))
- }
-
- /*
- * Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
- * Fix it through wrapper.
- * */
- implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
- var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
- f.setCompressCodec(w.compressCodec)
- f.setCompressType(w.compressType)
- f.setTableInfo(w.tableInfo)
- f.setDestTableId(w.destTableId)
- f
- }
-
- // Precision and scale to pass for unlimited decimals; these are the same as the precision and
- // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs)
- private val UNLIMITED_DECIMAL_PRECISION = 38
- private val UNLIMITED_DECIMAL_SCALE = 18
-
- def decimalMetastoreString(decimalType: DecimalType): String = decimalType match {
- case DecimalType.Fixed(precision, scale) => s"decimal($precision,$scale)"
- case _ => s"decimal($UNLIMITED_DECIMAL_PRECISION,$UNLIMITED_DECIMAL_SCALE)"
- }
-
- def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match {
- case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale)
- case _ => new DecimalTypeInfo(UNLIMITED_DECIMAL_PRECISION, UNLIMITED_DECIMAL_SCALE)
- }
-
- def decimalTypeInfoToCatalyst(inspector: PrimitiveObjectInspector): DecimalType = {
- val info = inspector.getTypeInfo.asInstanceOf[DecimalTypeInfo]
- DecimalType(info.precision(), info.scale())
- }
-
- def toCatalystDecimal(hdoi: HiveDecimalObjectInspector, data: Any): Decimal = {
- if (hdoi.preferWritable()) {
- Decimal(hdoi.getPrimitiveWritableObject(data).getHiveDecimal().bigDecimalValue,
- hdoi.precision(), hdoi.scale())
- } else {
- Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
- }
- }
-
- def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
- ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
- }
-
- /*
- * Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
- * is needed to initialize before serialization.
- */
- def prepareWritable(w: Writable): Writable = {
- w match {
- case w: AvroGenericRecordWritable =>
- w.setRecordReaderID(new UID())
- case _ =>
- }
- w
- }
-
- def setTblNullFormat(crtTbl: CreateTableDesc, tbl: Table) = {
- if (crtTbl != null && crtTbl.getNullFormat() != null) {
- tbl.setSerdeParam(serdeConstants.SERIALIZATION_NULL_FORMAT, crtTbl.getNullFormat())
- }
- }
-}
-
-/*
- * Bug introduced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
- * Fix it through wrapper.
- */
-private[hive] class ShimFileSinkDesc(
- var dir: String,
- var tableInfo: TableDesc,
- var compressed: Boolean)
- extends Serializable with Logging {
- var compressCodec: String = _
- var compressType: String = _
- var destTableId: Int = _
-
- def setCompressed(compressed: Boolean) {
- this.compressed = compressed
- }
-
- def getDirName = dir
-
- def setDestTableId(destTableId: Int) {
- this.destTableId = destTableId
- }
-
- def setTableInfo(tableInfo: TableDesc) {
- this.tableInfo = tableInfo
- }
-
- def setCompressCodec(intermediateCompressorCodec: String) {
- compressCodec = intermediateCompressorCodec
- }
-
- def setCompressType(intermediateCompressType: String) {
- compressType = intermediateCompressType
- }
-}