aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/building-spark.md6
-rw-r--r--pom.xml16
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala278
-rw-r--r--sql/hive/pom.xml10
-rw-r--r--sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala265
5 files changed, 1 insertions, 574 deletions
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 4dbccb9e6e..3ca7f2746e 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -118,14 +118,10 @@ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests
# Building With Hive and JDBC Support
To enable Hive integration for Spark SQL along with its JDBC server and CLI,
add the `-Phive` and `Phive-thriftserver` profiles to your existing build options.
-By default Spark will build with Hive 0.13.1 bindings. You can also build for
-Hive 0.12.0 using the `-Phive-0.12.0` profile.
+By default Spark will build with Hive 0.13.1 bindings.
{% highlight bash %}
# Apache Hadoop 2.4.X with Hive 13 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
-
-# Apache Hadoop 2.4.X with Hive 12 support
-mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package
{% endhighlight %}
# Building for Scala 2.11
diff --git a/pom.xml b/pom.xml
index c72d7cbf84..711edf9efa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1753,22 +1753,6 @@
<module>sql/hive-thriftserver</module>
</modules>
</profile>
- <profile>
- <id>hive-0.12.0</id>
- <properties>
- <hive.version>0.12.0-protobuf-2.5</hive.version>
- <hive.version.short>0.12.0</hive.version.short>
- <derby.version>10.4.2.0</derby.version>
- </properties>
- </profile>
- <profile>
- <id>hive-0.13.1</id>
- <properties>
- <hive.version>0.13.1a</hive.version>
- <hive.version.short>0.13.1</hive.version.short>
- <derby.version>10.10.1.1</derby.version>
- </properties>
- </profile>
<profile>
<id>scala-2.10</id>
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
deleted file mode 100644
index b3a79ba1c7..0000000000
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver
-
-import java.sql.{Date, Timestamp}
-import java.util.concurrent.Executors
-import java.util.{ArrayList => JArrayList, Map => JMap, UUID}
-
-import org.apache.commons.logging.Log
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.cli.thrift.TProtocolVersion
-import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map => SMap}
-
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.shims.ShimLoader
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hive.service.cli._
-import org.apache.hive.service.cli.operation.ExecuteStatementOperation
-import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow}
-import org.apache.spark.sql.execution.SetCommand
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.types._
-
-/**
- * A compatibility layer for interacting with Hive version 0.12.0.
- */
-private[thriftserver] object HiveThriftServerShim {
- val version = "0.12.0"
-
- def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
- val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI)
- setSuperField(sparkCliService, "serverUserName", serverUserName)
- }
-}
-
-private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
- extends AbstractSparkSQLDriver(_context) {
- override def getResults(res: JArrayList[String]): Boolean = {
- if (hiveResponse == null) {
- false
- } else {
- res.addAll(hiveResponse)
- hiveResponse = null
- true
- }
- }
-}
-
-private[hive] class SparkExecuteStatementOperation(
- parentSession: HiveSession,
- statement: String,
- confOverlay: JMap[String, String])(
- hiveContext: HiveContext,
- sessionToActivePool: SMap[SessionHandle, String])
- extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
-
- private var result: DataFrame = _
- private var iter: Iterator[SparkRow] = _
- private var dataTypes: Array[DataType] = _
-
- def close(): Unit = {
- // RDDs will be cleaned automatically upon garbage collection.
- logDebug("CLOSING")
- }
-
- def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
- if (!iter.hasNext) {
- new RowSet()
- } else {
- // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
- val maxRows = maxRowsL.toInt
- var curRow = 0
- var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
-
- while (curRow < maxRows && iter.hasNext) {
- val sparkRow = iter.next()
- val row = new Row()
- var curCol = 0
-
- while (curCol < sparkRow.length) {
- if (sparkRow.isNullAt(curCol)) {
- addNullColumnValue(sparkRow, row, curCol)
- } else {
- addNonNullColumnValue(sparkRow, row, curCol)
- }
- curCol += 1
- }
- rowSet += row
- curRow += 1
- }
- new RowSet(rowSet, 0)
- }
- }
-
- def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
- dataTypes(ordinal) match {
- case StringType =>
- to.addString(from(ordinal).asInstanceOf[String])
- case IntegerType =>
- to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
- case BooleanType =>
- to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
- case DoubleType =>
- to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
- case FloatType =>
- to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
- case DecimalType() =>
- val hiveDecimal = from.getDecimal(ordinal)
- to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
- case LongType =>
- to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
- case ByteType =>
- to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
- case ShortType =>
- to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
- case DateType =>
- to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
- case TimestampType =>
- to.addColumnValue(
- ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
- case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
- to.addColumnValue(ColumnValue.stringValue(hiveString))
- }
- }
-
- def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
- dataTypes(ordinal) match {
- case StringType =>
- to.addString(null)
- case IntegerType =>
- to.addColumnValue(ColumnValue.intValue(null))
- case BooleanType =>
- to.addColumnValue(ColumnValue.booleanValue(null))
- case DoubleType =>
- to.addColumnValue(ColumnValue.doubleValue(null))
- case FloatType =>
- to.addColumnValue(ColumnValue.floatValue(null))
- case DecimalType() =>
- to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
- case LongType =>
- to.addColumnValue(ColumnValue.longValue(null))
- case ByteType =>
- to.addColumnValue(ColumnValue.byteValue(null))
- case ShortType =>
- to.addColumnValue(ColumnValue.shortValue(null))
- case DateType =>
- to.addColumnValue(ColumnValue.dateValue(null))
- case TimestampType =>
- to.addColumnValue(ColumnValue.timestampValue(null))
- case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- to.addColumnValue(ColumnValue.stringValue(null: String))
- }
- }
-
- def getResultSetSchema: TableSchema = {
- logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
- if (result.queryExecution.analyzed.output.size == 0) {
- new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
- } else {
- val schema = result.queryExecution.analyzed.output.map { attr =>
- new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
- }
- new TableSchema(schema)
- }
- }
-
- def run(): Unit = {
- val statementId = UUID.randomUUID().toString
- logInfo(s"Running query '$statement'")
- setState(OperationState.RUNNING)
- HiveThriftServer2.listener.onStatementStart(
- statementId, parentSession.getSessionHandle.getSessionId.toString, statement, statementId)
- hiveContext.sparkContext.setJobGroup(statementId, statement)
- sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
- hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
- }
- try {
- result = hiveContext.sql(statement)
- logDebug(result.queryExecution.toString())
- result.queryExecution.logical match {
- case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
- sessionToActivePool(parentSession.getSessionHandle) = value
- logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
- case _ =>
- }
- HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
- iter = {
- val useIncrementalCollect =
- hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
- if (useIncrementalCollect) {
- result.rdd.toLocalIterator
- } else {
- result.collect().iterator
- }
- }
- dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
- setHasResultSet(true)
- } catch {
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
- // HiveServer will silently swallow them.
- case e: Throwable =>
- setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, e.getStackTraceString)
- logError("Error executing query:",e)
- throw new HiveSQLException(e.toString)
- }
- setState(OperationState.FINISHED)
- HiveThriftServer2.listener.onStatementFinish(statementId)
- }
-}
-
-private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
- extends SessionManager
- with ReflectedCompositeService {
-
- private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
-
- override def init(hiveConf: HiveConf) {
- setSuperField(this, "hiveConf", hiveConf)
-
- val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
- setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
- getAncestorField[Log](this, 3, "LOG").info(
- s"HiveServer2: Async execution pool size $backgroundPoolSize")
-
- setSuperField(this, "operationManager", sparkSqlOperationManager)
- addService(sparkSqlOperationManager)
-
- initCompositeService(hiveConf)
- }
-
- override def openSession(
- username: String,
- passwd: String,
- sessionConf: java.util.Map[String, String],
- withImpersonation: Boolean,
- delegationToken: String): SessionHandle = {
- hiveContext.openSession()
- val sessionHandle = super.openSession(
- username, passwd, sessionConf, withImpersonation, delegationToken)
- HiveThriftServer2.listener.onSessionCreated("UNKNOWN", sessionHandle.getSessionId.toString)
- sessionHandle
- }
-
- override def closeSession(sessionHandle: SessionHandle) {
- HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
- super.closeSession(sessionHandle)
- sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-
- hiveContext.detachSession()
- }
-}
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index e322340094..615b07e74d 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -136,16 +136,6 @@
</plugins>
</build>
</profile>
- <profile>
- <id>hive-0.12.0</id>
- <dependencies>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-hive-bundle</artifactId>
- <version>1.5.0</version>
- </dependency>
- </dependencies>
- </profile>
</profiles>
<build>
diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
deleted file mode 100644
index 33e96eaabf..0000000000
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-import java.util.{ArrayList => JArrayList, Properties}
-
-import scala.collection.JavaConversions._
-import scala.language.implicitConversions
-
-import org.apache.hadoop.{io => hadoopIo}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
-import org.apache.hadoop.hive.ql.processors._
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst
-import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, ObjectInspector, PrimitiveObjectInspector}
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
-import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
-import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.InputFormat
-
-import org.apache.spark.sql.types.{UTF8String, Decimal, DecimalType}
-
-private[hive] case class HiveFunctionWrapper(functionClassName: String)
- extends java.io.Serializable {
-
- // for Serialization
- def this() = this(null)
-
- import org.apache.spark.util.Utils._
- def createFunction[UDFType <: AnyRef](): UDFType = {
- getContextOrSparkClassLoader
- .loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
- }
-}
-
-/**
- * A compatibility layer for interacting with Hive version 0.12.0.
- */
-private[hive] object HiveShim {
- val version = "0.12.0"
-
- def getTableDesc(
- serdeClass: Class[_ <: Deserializer],
- inputFormatClass: Class[_ <: InputFormat[_, _]],
- outputFormatClass: Class[_],
- properties: Properties) = {
- new TableDesc(serdeClass, inputFormatClass, outputFormatClass, properties)
- }
-
- def getStringWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.STRING,
- getStringWritable(value))
-
- def getIntWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.INT,
- getIntWritable(value))
-
- def getDoubleWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.DOUBLE,
- getDoubleWritable(value))
-
- def getBooleanWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.BOOLEAN,
- getBooleanWritable(value))
-
- def getLongWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.LONG,
- getLongWritable(value))
-
- def getFloatWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.FLOAT,
- getFloatWritable(value))
-
- def getShortWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.SHORT,
- getShortWritable(value))
-
- def getByteWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.BYTE,
- getByteWritable(value))
-
- def getBinaryWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.BINARY,
- getBinaryWritable(value))
-
- def getDateWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.DATE,
- getDateWritable(value))
-
- def getTimestampWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.TIMESTAMP,
- getTimestampWritable(value))
-
- def getDecimalWritableConstantObjectInspector(value: Any): ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.DECIMAL,
- getDecimalWritable(value))
-
- def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector =
- PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
- PrimitiveCategory.VOID, null)
-
- def getStringWritable(value: Any): hadoopIo.Text =
- if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
-
- def getIntWritable(value: Any): hadoopIo.IntWritable =
- if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
-
- def getDoubleWritable(value: Any): hiveIo.DoubleWritable =
- if (value == null) null else new hiveIo.DoubleWritable(value.asInstanceOf[Double])
-
- def getBooleanWritable(value: Any): hadoopIo.BooleanWritable =
- if (value == null) null else new hadoopIo.BooleanWritable(value.asInstanceOf[Boolean])
-
- def getLongWritable(value: Any): hadoopIo.LongWritable =
- if (value == null) null else new hadoopIo.LongWritable(value.asInstanceOf[Long])
-
- def getFloatWritable(value: Any): hadoopIo.FloatWritable =
- if (value == null) null else new hadoopIo.FloatWritable(value.asInstanceOf[Float])
-
- def getShortWritable(value: Any): hiveIo.ShortWritable =
- if (value == null) null else new hiveIo.ShortWritable(value.asInstanceOf[Short])
-
- def getByteWritable(value: Any): hiveIo.ByteWritable =
- if (value == null) null else new hiveIo.ByteWritable(value.asInstanceOf[Byte])
-
- def getBinaryWritable(value: Any): hadoopIo.BytesWritable =
- if (value == null) null else new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
-
- def getDateWritable(value: Any): hiveIo.DateWritable =
- if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
-
- def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
- if (value == null) {
- null
- } else {
- new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
- }
-
- def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
- if (value == null) {
- null
- } else {
- new hiveIo.HiveDecimalWritable(
- HiveShim.createDecimal(value.asInstanceOf[Decimal].toJavaBigDecimal))
- }
-
- def getPrimitiveNullWritable: NullWritable = NullWritable.get()
-
- def createDriverResultsArray = new JArrayList[String]
-
- def processResults(results: JArrayList[String]) = results
-
- def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE
-
- def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE
-
- def createDefaultDBIfNeeded(context: HiveContext) = { }
-
- def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
- CommandProcessorFactory.get(cmd(0), conf)
- }
-
- def createDecimal(bd: java.math.BigDecimal): HiveDecimal = {
- new HiveDecimal(bd)
- }
-
- def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
- ColumnProjectionUtils.appendReadColumnIDs(conf, ids)
- ColumnProjectionUtils.appendReadColumnNames(conf, names)
- }
-
- def getExternalTmpPath(context: Context, uri: URI) = {
- context.getExternalTmpFileURI(uri)
- }
-
- def getDataLocationPath(p: Partition) = p.getPartitionPath
-
- def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)
-
- def compatibilityBlackList = Seq(
- "decimal_.*",
- "udf7",
- "drop_partitions_filter2",
- "show_.*",
- "serde_regex",
- "udf_to_date",
- "udaf_collect_set",
- "udf_concat"
- )
-
- def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
- tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri())
- }
-
- def decimalMetastoreString(decimalType: DecimalType): String = "decimal"
-
- def decimalTypeInfo(decimalType: DecimalType): TypeInfo =
- TypeInfoFactory.decimalTypeInfo
-
- def decimalTypeInfoToCatalyst(inspector: PrimitiveObjectInspector): DecimalType = {
- DecimalType.Unlimited
- }
-
- def toCatalystDecimal(hdoi: HiveDecimalObjectInspector, data: Any): Decimal = {
- if (hdoi.preferWritable()) {
- Decimal(hdoi.getPrimitiveWritableObject(data).getHiveDecimal().bigDecimalValue)
- } else {
- Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue())
- }
- }
-
- def getConvertedOI(
- inputOI: ObjectInspector,
- outputOI: ObjectInspector): ObjectInspector = {
- ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, true)
- }
-
- def prepareWritable(w: Writable): Writable = {
- w
- }
-
- def setTblNullFormat(crtTbl: CreateTableDesc, tbl: Table) = {}
-}
-
-private[hive] class ShimFileSinkDesc(
- var dir: String,
- var tableInfo: TableDesc,
- var compressed: Boolean)
- extends FileSinkDesc(dir, tableInfo, compressed) {
-}