diff options
author | Cheng Lian <lian@databricks.com> | 2015-08-05 20:03:54 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-08-05 20:03:54 +0800 |
commit | 70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3 (patch) | |
tree | 92df84b9b14043e9e712edcd373fadb2d30da994 /sql | |
parent | e27a8c4cb3564f1b2d1ee5445dff341c8e0087b0 (diff) | |
download | spark-70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3.tar.gz spark-70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3.tar.bz2 spark-70112ff22bd1aee7689c5d3af9b66c9b8ceb3ec3.zip |
[SPARK-9593] [SQL] Fixes Hadoop shims loading
This PR is used to workaround CDH Hadoop versions like 2.0.0-mr1-cdh4.1.1.
Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking version information gathered from Hadoop jar files. If the major version number is 1, `Hadoop20SShims` will be loaded. Otherwise, if the major version number is 2, `Hadoop23Shims` will be chosen. However, CDH Hadoop versions like 2.0.0-mr1-cdh4.1.1 have 2 as major version number, but contain Hadoop 1 code. This confuses Hive `ShimLoader` and loads wrong version of shims.
In this PR we check for existence of the `Path.getPathWithoutSchemeAndAuthority` method, which doesn't exist in Hadoop 1 (it's also the method that reveals this shims loading issue), and load `Hadoop20SShims` when it doesn't exist.
Author: Cheng Lian <lian@databricks.com>
Closes #7929 from liancheng/spark-9593/fix-hadoop-shims-loading and squashes the following commits:
c99b497 [Cheng Lian] Narrows down the fix to handle "2.0.0-*cdh4*" Hadoop versions only
b17e955 [Cheng Lian] Updates comments
490d8f2 [Cheng Lian] Fixes Scala style issue
9c6c12d [Cheng Lian] Fixes Hadoop shims loading
Diffstat (limited to 'sql')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index dc372be0e5..211a3b879c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.{Driver, metadata} +import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} +import org.apache.hadoop.util.VersionInfo import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.Expression @@ -62,6 +64,52 @@ private[hive] class ClientWrapper( extends ClientInterface with Logging { + overrideHadoopShims() + + // !! HACK ALERT !! + // + // This method is a surgical fix for Hadoop version 2.0.0-mr1-cdh4.1.1, which is used by Spark EC2 + // scripts. We should remove this after upgrading Spark EC2 scripts to some more recent Hadoop + // version in the future. + // + // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking + // version information gathered from Hadoop jar files. If the major version number is 1, + // `Hadoop20SShims` will be loaded. Otherwise, if the major version number is 2, `Hadoop23Shims` + // will be chosen. + // + // However, part of APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical + // reasons. So 2.0.0-mr1-cdh4.1.1 is actually more Hadoop-1-like and should be used together with + // `Hadoop20SShims`, but `Hadoop20SShims` is chose because the major version number here is 2. + // + // Here we check for this specific version and loads `Hadoop20SShims` via reflection. Note that + // we can't check for string literal "2.0.0-mr1-cdh4.1.1" because the obtained version string + // comes from Maven artifact org.apache.hadoop:hadoop-common:2.0.0-cdh4.1.1, which doesn't have + // the "mr1" tag in its version string. + private def overrideHadoopShims(): Unit = { + val VersionPattern = """2\.0\.0.*cdh4.*""".r + + VersionInfo.getVersion match { + case VersionPattern() => + val shimClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims" + logInfo(s"Loading Hadoop shims $shimClassName") + + try { + val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims") + // scalastyle:off classforname + val shimsClass = Class.forName(shimClassName) + // scalastyle:on classforname + val shims = classOf[HadoopShims].cast(shimsClass.newInstance()) + shimsField.setAccessible(true) + shimsField.set(null, shims) + } catch { case cause: Throwable => + logError(s"Failed to load $shimClassName") + // Falls back to normal Hive `ShimLoader` logic + } + + case _ => + } + } + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() |