aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-11-26 16:20:08 -0800
committerYin Huai <yhuai@databricks.com>2015-11-26 16:20:08 -0800
commitad76562390b81207f8f32491c0bd8ad0e020141f (patch)
tree65058912455c4195ac12e389cf6e619f9581ee16 /sql
parentbc16a67562560c732833260cbc34825f7e9dcb8f (diff)
downloadspark-ad76562390b81207f8f32491c0bd8ad0e020141f.tar.gz
spark-ad76562390b81207f8f32491c0bd8ad0e020141f.tar.bz2
spark-ad76562390b81207f8f32491c0bd8ad0e020141f.zip
[SPARK-11998][SQL][TEST-HADOOP2.0] When downloading Hadoop artifacts from maven, we need to try to download the version that is used by Spark
If we need to download Hive/Hadoop artifacts, try to download a Hadoop that matches the Hadoop used by Spark. If the Hadoop artifact cannot be resolved (e.g. Hadoop version is a vendor specific version like 2.0.0-cdh4.1.1), we will use Hadoop 2.4.0 (we used to hard code this version as the hadoop that we will download from maven) and we will not share Hadoop classes. I tested this match in my laptop with the following confs (these confs are used by our builds). All tests are good. ``` build/sbt -Phadoop-1 -Dhadoop.version=1.2.1 -Pkinesis-asl -Phive-thriftserver -Phive build/sbt -Phadoop-1 -Dhadoop.version=2.0.0-mr1-cdh4.1.1 -Pkinesis-asl -Phive-thriftserver -Phive build/sbt -Pyarn -Phadoop-2.2 -Pkinesis-asl -Phive-thriftserver -Phive build/sbt -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl -Phive-thriftserver -Phive ``` Author: Yin Huai <yhuai@databricks.com> Closes #9979 from yhuai/versionsSuite.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala62
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala23
3 files changed, 72 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 8a4264194a..e83941c2ec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
+import org.apache.hadoop.util.VersionInfo
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SQLConf.SQLConfEntry
@@ -288,7 +289,8 @@ class HiveContext private[hive](
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(
- version = hiveMetastoreVersion,
+ hiveMetastoreVersion = hiveMetastoreVersion,
+ hadoopVersion = VersionInfo.getVersion,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index e041e0d8e5..010051d255 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -34,23 +34,51 @@ import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.util.{MutableURLClassLoader, Utils}
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
-private[hive] object IsolatedClientLoader {
+private[hive] object IsolatedClientLoader extends Logging {
/**
* Creates isolated Hive client loaders by downloading the requested version from maven.
*/
def forVersion(
- version: String,
+ hiveMetastoreVersion: String,
+ hadoopVersion: String,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
- val resolvedVersion = hiveVersion(version)
- val files = resolvedVersions.getOrElseUpdate(resolvedVersion,
- downloadVersion(resolvedVersion, ivyPath))
+ val resolvedVersion = hiveVersion(hiveMetastoreVersion)
+ // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
+ // with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes.
+ var sharesHadoopClasses = true
+ val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
+ resolvedVersions((resolvedVersion, hadoopVersion))
+ } else {
+ val (downloadedFiles, actualHadoopVersion) =
+ try {
+ (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
+ } catch {
+ case e: RuntimeException if e.getMessage.contains("hadoop") =>
+ // If the error message contains hadoop, it is probably because the hadoop
+ // version cannot be resolved (e.g. it is a vendor specific version like
+ // 2.0.0-cdh4.1.1). If it is the case, we will try just
+ // "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0"
+ // is used just because we used to hard code it as the hadoop artifact to download.
+ logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " +
+ s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " +
+ "Hadoop classes will not be shared between Spark and Hive metastore client. " +
+ "It is recommended to set jars used by Hive metastore client through " +
+ "spark.sql.hive.metastore.jars in the production environment.")
+ sharesHadoopClasses = false
+ (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
+ }
+ resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
+ resolvedVersions((resolvedVersion, actualHadoopVersion))
+ }
+
new IsolatedClientLoader(
- version = hiveVersion(version),
+ version = hiveVersion(hiveMetastoreVersion),
execJars = files,
config = config,
+ sharesHadoopClasses = sharesHadoopClasses,
sharedPrefixes = sharedPrefixes,
barrierPrefixes = barrierPrefixes)
}
@@ -64,12 +92,15 @@ private[hive] object IsolatedClientLoader {
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
}
- private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): Seq[URL] = {
+ private def downloadVersion(
+ version: HiveVersion,
+ hadoopVersion: String,
+ ivyPath: Option[String]): Seq[URL] = {
val hiveArtifacts = version.extraDeps ++
Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
Seq("com.google.guava:guava:14.0.1",
- "org.apache.hadoop:hadoop-client:2.4.0")
+ s"org.apache.hadoop:hadoop-client:$hadoopVersion")
val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
@@ -86,7 +117,10 @@ private[hive] object IsolatedClientLoader {
tempDir.listFiles().map(_.toURI.toURL)
}
- private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
+ // A map from a given pair of HiveVersion and Hadoop version to jar files.
+ // It is only used by forVersion.
+ private val resolvedVersions =
+ new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]]
}
/**
@@ -106,6 +140,7 @@ private[hive] object IsolatedClientLoader {
* @param config A set of options that will be added to the HiveConf of the constructed client.
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
* true unless loading the version of hive that is on Sparks classloader.
+ * @param sharesHadoopClasses When true, we will share Hadoop classes between Spark and
* @param rootClassLoader The system root classloader. Must not know about Hive classes.
* @param baseClassLoader The spark classloader that is used to load shared classes.
*/
@@ -114,6 +149,7 @@ private[hive] class IsolatedClientLoader(
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
+ val sharesHadoopClasses: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
val sharedPrefixes: Seq[String] = Seq.empty,
@@ -126,16 +162,20 @@ private[hive] class IsolatedClientLoader(
/** All jars used by the hive specific classloader. */
protected def allJars = execJars.toArray
- protected def isSharedClass(name: String): Boolean =
+ protected def isSharedClass(name: String): Boolean = {
+ val isHadoopClass =
+ name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")
+
name.contains("slf4j") ||
name.contains("log4j") ||
name.startsWith("org.apache.spark.") ||
- (name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")) ||
+ (sharesHadoopClasses && isHadoopClass) ||
name.startsWith("scala.") ||
(name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
name.startsWith("java.lang.") ||
name.startsWith("java.net") ||
sharedPrefixes.exists(name.startsWith)
+ }
/** True if `name` refers to a spark class that must see specific version of Hive. */
protected def isBarrierClass(name: String): Boolean =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 7bc13bc60d..502b240f36 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.client
import java.io.File
+import org.apache.hadoop.util.VersionInfo
+
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
@@ -53,9 +55,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test("success sanity check") {
- val badClient = IsolatedClientLoader.forVersion(HiveContext.hiveExecutionVersion,
- buildConf(),
- ivyPath).createClient()
+ val badClient = IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+ hadoopVersion = VersionInfo.getVersion,
+ config = buildConf(),
+ ivyPath = ivyPath).createClient()
val db = new HiveDatabase("default", "")
badClient.createDatabase(db)
}
@@ -85,7 +89,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
ignore("failure sanity check") {
val e = intercept[Throwable] {
val badClient = quietly {
- IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).createClient()
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = "13",
+ hadoopVersion = VersionInfo.getVersion,
+ config = buildConf(),
+ ivyPath = ivyPath).createClient()
}
}
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
@@ -99,7 +107,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: create client") {
client = null
System.gc() // Hack to avoid SEGV on some JVM versions.
- client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).createClient()
+ client =
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = version,
+ hadoopVersion = VersionInfo.getVersion,
+ config = buildConf(),
+ ivyPath = ivyPath).createClient()
}
test(s"$version: createDatabase") {