aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2016-08-19 10:11:25 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-08-19 10:11:25 -0500
commit071eaaf9d2b63589f2e66e5279a16a5a484de6f5 (patch)
tree5ad7d37247fd2dba3fc89b6e2b468a0ec723a36f
parente98eb2146f1363956bfc3e5adcc11c246182d617 (diff)
downloadspark-071eaaf9d2b63589f2e66e5279a16a5a484de6f5.tar.gz
spark-071eaaf9d2b63589f2e66e5279a16a5a484de6f5.tar.bz2
spark-071eaaf9d2b63589f2e66e5279a16a5a484de6f5.zip
[SPARK-11227][CORE] UnknownHostException can be thrown when NameNode HA is enabled.
## What changes were proposed in this pull request? If the following conditions are satisfied, executors don't load properties in `hdfs-site.xml` and UnknownHostException can be thrown. (1) NameNode HA is enabled (2) spark.eventLogging is disabled or logging path is NOT on HDFS (3) Using Standalone or Mesos for the cluster manager (4) There are no code to load `HdfsCondition` class in the driver regardless of directly or indirectly. (5) The tasks access to HDFS (There might be some more conditions...) For example, following code causes UnknownHostException when the conditions above are satisfied. ``` sc.textFile("<path on HDFS>").collect ``` ``` java.lang.IllegalArgumentException: java.net.UnknownHostException: hacluster at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170) at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:656) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:438) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:411) at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986) at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177) at scala.Option.map(Option.scala:146) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:177) at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:213) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.UnknownHostException: hacluster ``` But following code doesn't cause the Exception because `textFile` method loads `HdfsConfiguration` indirectly. ``` sc.textFile("<path on HDFS>").collect ``` When a job includes some operations which access to HDFS, the object of `org.apache.hadoop.Configuration` is wrapped by `SerializableConfiguration`, serialized and broadcasted from driver to executors and each executor deserialize the object with `loadDefaults` false so HDFS related properties should be set before broadcasted. ## How was this patch tested? Tested manually on my standalone cluster. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #13738 from sarutak/SPARK-11227.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala22
1 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 60f042f1e0..2eaeab1d80 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -35,7 +35,7 @@ import scala.util.control.NonFatal
import com.google.common.collect.MapMaker
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
@@ -961,6 +961,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
+
+ // This is a hack to enforce loading hdfs-site.xml.
+ // See SPARK-11227 for details.
+ FileSystem.getLocal(conf)
+
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
@@ -981,6 +986,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
+
+ // This is a hack to enforce loading hdfs-site.xml.
+ // See SPARK-11227 for details.
+ FileSystem.get(new URI(path), hadoopConfiguration)
+
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
@@ -1065,6 +1075,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
+
+ // This is a hack to enforce loading hdfs-site.xml.
+ // See SPARK-11227 for details.
+ FileSystem.get(new URI(path), hadoopConfiguration)
+
// The call to NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = NewHadoopJob.getInstance(conf)
@@ -1099,6 +1114,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
+
+ // This is a hack to enforce loading hdfs-site.xml.
+ // See SPARK-11227 for details.
+ FileSystem.getLocal(conf)
+
// Add necessary security credentials to the JobConf. Required to access secure HDFS.
val jconf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jconf)