aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/v0.13.1
diff options
context:
space:
mode:
authorZhan Zhang <zhazhan@gmail.com>2014-10-24 11:03:17 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-24 11:03:17 -0700
commit7c89a8f0c81ecf91dba34c1f44393f45845d438c (patch)
tree523149c7dda49cb193680bed53cc1a112c213fcc /sql/hive/v0.13.1
parent0e886610eedd8ea24761cdcefa25ccedeca72dc8 (diff)
downloadspark-7c89a8f0c81ecf91dba34c1f44393f45845d438c.tar.gz
spark-7c89a8f0c81ecf91dba34c1f44393f45845d438c.tar.bz2
spark-7c89a8f0c81ecf91dba34c1f44393f45845d438c.zip
[SPARK-2706][SQL] Enable Spark to support Hive 0.13
Given that a lot of users are trying to use hive 0.13 in spark, and the incompatibility between hive-0.12 and hive-0.13 on the API level I want to propose following approach, which has no or minimum impact on existing hive-0.12 support, but be able to jumpstart the development of hive-0.13 and future version support. Approach: Introduce “hive-version” property, and manipulate pom.xml files to support different hive version at compiling time through shim layer, e.g., hive-0.12.0 and hive-0.13.1. More specifically, 1. For each different hive version, there is a very light layer of shim code to handle API differences, sitting in sql/hive/hive-version, e.g., sql/hive/v0.12.0 or sql/hive/v0.13.1 2. Add a new profile hive-default active by default, which picks up all existing configuration and hive-0.12.0 shim (v0.12.0) if no hive.version is specified. 3. If user specifies different version (currently only 0.13.1 by -Dhive.version = 0.13.1), hive-versions profile will be activated, which pick up hive-version specific shim layer and configuration, mainly the hive jars and hive-version shim, e.g., v0.13.1. 4. With this approach, nothing is changed with current hive-0.12 support. No change by default: sbt/sbt -Phive For example: sbt/sbt -Phive -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 assembly To enable hive-0.13: sbt/sbt -Dhive.version=0.13.1 For example: sbt/sbt -Dhive.version=0.13.1 -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 assembly Note that in hive-0.13, hive-thriftserver is not enabled, which should be fixed by other Jira, and we don’t need -Phive with -Dhive.version in building (probably we should use -Phive -Dhive.version=xxx instead after thrift server is also supported in hive-0.13.1). Author: Zhan Zhang <zhazhan@gmail.com> Author: zhzhan <zhazhan@gmail.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #2241 from zhzhan/spark-2706 and squashes the following commits: 3ece905 [Zhan Zhang] minor fix 410b668 [Zhan Zhang] solve review comments cbb4691 [Zhan Zhang] change run-test for new options 0d4d2ed [Zhan Zhang] rebase 497b0f4 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 8fad1cf [Zhan Zhang] change the pom file and make hive-0.13.1 as the default ab028d1 [Zhan Zhang] rebase 4a2e36d [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 4cb1b93 [zhzhan] Merge pull request #1 from pwendell/pr-2241 b0478c0 [Patrick Wendell] Changes to simplify the build of SPARK-2706 2b50502 [Zhan Zhang] rebase a72c0d4 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark cb22863 [Zhan Zhang] correct the typo 20f6cf7 [Zhan Zhang] solve compatability issue f7912a9 [Zhan Zhang] rebase and solve review feedback 301eb4a [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 10c3565 [Zhan Zhang] address review comments 6bc9204 [Zhan Zhang] rebase and remove temparory repo d3aa3f2 [Zhan Zhang] Merge branch 'master' into spark-2706 cedcc6f [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 3ced0d7 [Zhan Zhang] rebase d9b981d [Zhan Zhang] rebase and fix error due to rollback adf4924 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 3dd50e8 [Zhan Zhang] solve conflicts and remove unnecessary implicts d10bf00 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark dc7bdb3 [Zhan Zhang] solve conflicts 7e0cc36 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark d7c3e1e [Zhan Zhang] Merge branch 'master' into spark-2706 68deb11 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark d48bd18 [Zhan Zhang] address review comments 3ee3b2b [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 57ea52e [Zhan Zhang] Merge branch 'master' into spark-2706 2b0d513 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 9412d24 [Zhan Zhang] address review comments f4af934 [Zhan Zhang] rebase 1ccd7cc [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 128b60b [Zhan Zhang] ignore 0.12.0 test cases for the time being af9feb9 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 5f5619f [Zhan Zhang] restructure the directory and different hive version support 05d3683 [Zhan Zhang] solve conflicts e4c1982 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark 94b4fdc [Zhan Zhang] Spark-2706: hive-0.13.1 support on spark 87ebf3b [Zhan Zhang] Merge branch 'master' into spark-2706 921e914 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark f896b2a [Zhan Zhang] Merge branch 'master' into spark-2706 789ea21 [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark cb53a2c [Zhan Zhang] Merge branch 'master' of https://github.com/apache/spark f6a8a40 [Zhan Zhang] revert ba14f28 [Zhan Zhang] test dbedff3 [Zhan Zhang] Merge remote-tracking branch 'upstream/master' 70964fe [Zhan Zhang] revert fe0f379 [Zhan Zhang] Merge branch 'master' of https://github.com/zhzhan/spark 70ffd93 [Zhan Zhang] revert 42585ec [Zhan Zhang] test 7d5fce2 [Zhan Zhang] test
Diffstat (limited to 'sql/hive/v0.13.1')
-rw-r--r--sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala170
1 files changed, 170 insertions, 0 deletions
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala
new file mode 100644
index 0000000000..8678c0c475
--- /dev/null
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.{ArrayList => JArrayList}
+import java.util.Properties
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.common.StatsSetupConst
+import org.apache.hadoop.hive.common.`type`.{HiveDecimal}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.ql.metadata.{Table, Hive, Partition}
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
+import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer}
+import org.apache.hadoop.mapred.InputFormat
+import org.apache.spark.Logging
+import org.apache.hadoop.{io => hadoopIo}
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+
+/**
+ * A compatibility layer for interacting with Hive version 0.13.1.
+ */
+private[hive] object HiveShim {
+ val version = "0.13.1"
+ /*
+ * TODO: hive-0.13 support DECIMAL(precision, scale), DECIMAL in hive-0.12 is actually DECIMAL(38,unbounded)
+ * Full support of new decimal feature need to be fixed in seperate PR.
+ */
+ val metastoreDecimal = "decimal\\((\\d+),(\\d+)\\)".r
+
+ def getTableDesc(
+ serdeClass: Class[_ <: Deserializer],
+ inputFormatClass: Class[_ <: InputFormat[_, _]],
+ outputFormatClass: Class[_],
+ properties: Properties) = {
+ new TableDesc(inputFormatClass, outputFormatClass, properties)
+ }
+
+ def createDriverResultsArray = new JArrayList[Object]
+
+ def processResults(results: JArrayList[Object]) = {
+ results.map { r =>
+ r match {
+ case s: String => s
+ case a: Array[Object] => a(0).asInstanceOf[String]
+ }
+ }
+ }
+
+ def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE
+
+ def createDefaultDBIfNeeded(context: HiveContext) = {
+ context.runSqlHive("CREATE DATABASE default")
+ context.runSqlHive("USE default")
+ }
+
+ /* The string used to denote an empty comments field in the schema. */
+ def getEmptyCommentsFieldValue = ""
+
+ def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
+ CommandProcessorFactory.get(cmd, conf)
+ }
+
+ def createDecimal(bd: java.math.BigDecimal): HiveDecimal = {
+ HiveDecimal.create(bd)
+ }
+
+ /*
+ * This function in hive-0.13 become private, but we have to do this to walkaround hive bug
+ */
+ private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) {
+ val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")
+ val result: StringBuilder = new StringBuilder(old)
+ var first: Boolean = old.isEmpty
+
+ for (col <- cols) {
+ if (first) {
+ first = false
+ } else {
+ result.append(',')
+ }
+ result.append(col)
+ }
+ conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString)
+ }
+
+ /*
+ * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty
+ */
+ def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
+ if (ids != null && ids.size > 0) {
+ ColumnProjectionUtils.appendReadColumns(conf, ids)
+ }
+ if (names != null && names.size > 0) {
+ appendReadColumnNames(conf, names)
+ }
+ }
+
+ def getExternalTmpPath(context: Context, path: Path) = {
+ context.getExternalTmpPath(path.toUri)
+ }
+
+ def getDataLocationPath(p: Partition) = p.getDataLocation
+
+ def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsOf(tbl)
+
+ /*
+ * Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
+ * Fix it through wrapper.
+ * */
+ implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
+ var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
+ f.setCompressCodec(w.compressCodec)
+ f.setCompressType(w.compressType)
+ f.setTableInfo(w.tableInfo)
+ f.setDestTableId(w.destTableId)
+ f
+ }
+}
+
+/*
+ * Bug introdiced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
+ * Fix it through wrapper.
+ */
+class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
+ extends Serializable with Logging {
+ var compressCodec: String = _
+ var compressType: String = _
+ var destTableId: Int = _
+
+ def setCompressed(compressed: Boolean) {
+ this.compressed = compressed
+ }
+
+ def getDirName = dir
+
+ def setDestTableId(destTableId: Int) {
+ this.destTableId = destTableId
+ }
+
+ def setTableInfo(tableInfo: TableDesc) {
+ this.tableInfo = tableInfo
+ }
+
+ def setCompressCodec(intermediateCompressorCodec: String) {
+ compressCodec = intermediateCompressorCodec
+ }
+
+ def setCompressType(intermediateCompressType: String) {
+ compressType = intermediateCompressType
+ }
+}