aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala48
1 files changed, 48 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index dc372be0e5..211a3b879c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.{Driver, metadata}
+import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
+import org.apache.hadoop.util.VersionInfo
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -62,6 +64,52 @@ private[hive] class ClientWrapper(
extends ClientInterface
with Logging {
+ overrideHadoopShims()
+
+ // !! HACK ALERT !!
+ //
+ // This method is a surgical fix for Hadoop version 2.0.0-mr1-cdh4.1.1, which is used by Spark EC2
+ // scripts. We should remove this after upgrading Spark EC2 scripts to some more recent Hadoop
+ // version in the future.
+ //
+ // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking
+ // version information gathered from Hadoop jar files. If the major version number is 1,
+ // `Hadoop20SShims` will be loaded. Otherwise, if the major version number is 2, `Hadoop23Shims`
+ // will be chosen.
+ //
+ // However, part of APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical
+ // reasons. So 2.0.0-mr1-cdh4.1.1 is actually more Hadoop-1-like and should be used together with
+ // `Hadoop20SShims`, but `Hadoop20SShims` is chose because the major version number here is 2.
+ //
+ // Here we check for this specific version and loads `Hadoop20SShims` via reflection. Note that
+ // we can't check for string literal "2.0.0-mr1-cdh4.1.1" because the obtained version string
+ // comes from Maven artifact org.apache.hadoop:hadoop-common:2.0.0-cdh4.1.1, which doesn't have
+ // the "mr1" tag in its version string.
+ private def overrideHadoopShims(): Unit = {
+ val VersionPattern = """2\.0\.0.*cdh4.*""".r
+
+ VersionInfo.getVersion match {
+ case VersionPattern() =>
+ val shimClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
+ logInfo(s"Loading Hadoop shims $shimClassName")
+
+ try {
+ val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
+ // scalastyle:off classforname
+ val shimsClass = Class.forName(shimClassName)
+ // scalastyle:on classforname
+ val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
+ shimsField.setAccessible(true)
+ shimsField.set(null, shims)
+ } catch { case cause: Throwable =>
+ logError(s"Failed to load $shimClassName")
+ // Falls back to normal Hive `ShimLoader` logic
+ }
+
+ case _ =>
+ }
+ }
+
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new CircularBuffer()