diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-09-25 22:57:31 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-09-25 22:57:31 -0700 |
commit | 59d87d24079bc633e63ce032f0a5ddd18a3b02cb (patch) | |
tree | aa630bb12d2aa88ea56e13370e92784834127434 /core/src/main | |
parent | de333d121da4cb80d45819cbcf8b4246e48ec4d0 (diff) | |
download | spark-59d87d24079bc633e63ce032f0a5ddd18a3b02cb.tar.gz spark-59d87d24079bc633e63ce032f0a5ddd18a3b02cb.tar.bz2 spark-59d87d24079bc633e63ce032f0a5ddd18a3b02cb.zip |
[SPARK-17650] malformed url's throw exceptions before bricking Executors
## What changes were proposed in this pull request?
When a malformed URL was sent to Executors through `sc.addJar` and `sc.addFile`, the executors become unusable, because they constantly throw `MalformedURLException`s and can never acknowledge that the file or jar is just bad input.
This PR tries to fix that problem by making sure MalformedURLs can never be submitted through `sc.addJar` and `sc.addFile`. Another solution would be to blacklist bad files and jars on Executors. Maybe fail the first time, and then ignore the second time (but print a warning message).
## How was this patch tested?
Unit tests in SparkContextSuite
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #15224 from brkyvz/SPARK-17650.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 20 |
2 files changed, 29 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f58037e100..4694790c72 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.io._ import java.lang.reflect.Constructor -import java.net.URI +import java.net.{MalformedURLException, URI} import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} @@ -36,18 +36,15 @@ import com.google.common.collect.MapMaker import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, - FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} -import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, - TextInputFormat} +import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} +import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, - WholeTextFileInputFormat} +import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec @@ -1452,6 +1449,9 @@ class SparkContext(config: SparkConf) extends Logging { throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + "turned on.") } + } else { + // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies + Utils.validateURL(uri) } val key = if (!isLocal && scheme == "file") { @@ -1711,6 +1711,8 @@ class SparkContext(config: SparkConf) extends Logging { key = env.rpcEnv.fileServer.addJar(new File(path)) } else { val uri = new URI(path) + // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies + Utils.validateURL(uri) key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 09896c4e2f..e09666c610 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -698,6 +698,26 @@ private[spark] object Utils extends Logging { } /** + * Validate that a given URI is actually a valid URL as well. + * @param uri The URI to validate + */ + @throws[MalformedURLException]("when the URI is an invalid URL") + def validateURL(uri: URI): Unit = { + Option(uri.getScheme).getOrElse("file") match { + case "http" | "https" | "ftp" => + try { + uri.toURL + } catch { + case e: MalformedURLException => + val ex = new MalformedURLException(s"URI (${uri.toString}) is not a valid URL.") + ex.initCause(e) + throw ex + } + case _ => // will not be turned into a URL anyway + } + } + + /** * Get the path of a temporary directory. Spark's local directories can be configured through * multiple settings, which are used with the following precedence: * |