aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-05 20:03:54 +0800
committerCheng Lian <lian@databricks.com>2015-08-05 20:03:54 +0800
commit70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3 (patch)
tree92df84b9b14043e9e712edcd373fadb2d30da994 /sql
parente27a8c4cb3564f1b2d1ee5445dff341c8e0087b0 (diff)
downloadspark-70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3.tar.gz
spark-70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3.tar.bz2
spark-70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3.zip
[SPARK-9593] [SQL] Fixes Hadoop shims loading
This PR is used to workaround CDH Hadoop versions like 2.0.0-mr1-cdh4.1.1. Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking version information gathered from Hadoop jar files. If the major version number is 1, `Hadoop20SShims` will be loaded. Otherwise, if the major version number is 2, `Hadoop23Shims` will be chosen. However, CDH Hadoop versions like 2.0.0-mr1-cdh4.1.1 have 2 as major version number, but contain Hadoop 1 code. This confuses Hive `ShimLoader` and loads wrong version of shims. In this PR we check for existence of the `Path.getPathWithoutSchemeAndAuthority` method, which doesn't exist in Hadoop 1 (it's also the method that reveals this shims loading issue), and load `Hadoop20SShims` when it doesn't exist. Author: Cheng Lian <lian@databricks.com> Closes #7929 from liancheng/spark-9593/fix-hadoop-shims-loading and squashes the following commits: c99b497 [Cheng Lian] Narrows down the fix to handle "2.0.0-*cdh4*" Hadoop versions only b17e955 [Cheng Lian] Updates comments 490d8f2 [Cheng Lian] Fixes Scala style issue 9c6c12d [Cheng Lian] Fixes Hadoop shims loading
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala48
1 files changed, 48 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index dc372be0e5..211a3b879c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.{Driver, metadata}
+import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
+import org.apache.hadoop.util.VersionInfo
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -62,6 +64,52 @@ private[hive] class ClientWrapper(
extends ClientInterface
with Logging {
+ overrideHadoopShims()
+
+ // !! HACK ALERT !!
+ //
+ // This method is a surgical fix for Hadoop version 2.0.0-mr1-cdh4.1.1, which is used by Spark EC2
+ // scripts. We should remove this after upgrading Spark EC2 scripts to some more recent Hadoop
+ // version in the future.
+ //
+ // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking
+ // version information gathered from Hadoop jar files. If the major version number is 1,
+ // `Hadoop20SShims` will be loaded. Otherwise, if the major version number is 2, `Hadoop23Shims`
+ // will be chosen.
+ //
+ // However, part of APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical
+ // reasons. So 2.0.0-mr1-cdh4.1.1 is actually more Hadoop-1-like and should be used together with
+ // `Hadoop20SShims`, but `Hadoop20SShims` is chose because the major version number here is 2.
+ //
+ // Here we check for this specific version and loads `Hadoop20SShims` via reflection. Note that
+ // we can't check for string literal "2.0.0-mr1-cdh4.1.1" because the obtained version string
+ // comes from Maven artifact org.apache.hadoop:hadoop-common:2.0.0-cdh4.1.1, which doesn't have
+ // the "mr1" tag in its version string.
+ private def overrideHadoopShims(): Unit = {
+ val VersionPattern = """2\.0\.0.*cdh4.*""".r
+
+ VersionInfo.getVersion match {
+ case VersionPattern() =>
+ val shimClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
+ logInfo(s"Loading Hadoop shims $shimClassName")
+
+ try {
+ val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
+ // scalastyle:off classforname
+ val shimsClass = Class.forName(shimClassName)
+ // scalastyle:on classforname
+ val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
+ shimsField.setAccessible(true)
+ shimsField.set(null, shims)
+ } catch { case cause: Throwable =>
+ logError(s"Failed to load $shimClassName")
+ // Falls back to normal Hive `ShimLoader` logic
+ }
+
+ case _ =>
+ }
+ }
+
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new CircularBuffer()