aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-09-25 22:57:31 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-25 22:57:31 -0700
commit59d87d24079bc633e63ce032f0a5ddd18a3b02cb (patch)
treeaa630bb12d2aa88ea56e13370e92784834127434
parentde333d121da4cb80d45819cbcf8b4246e48ec4d0 (diff)
downloadspark-59d87d24079bc633e63ce032f0a5ddd18a3b02cb.tar.gz
spark-59d87d24079bc633e63ce032f0a5ddd18a3b02cb.tar.bz2
spark-59d87d24079bc633e63ce032f0a5ddd18a3b02cb.zip
[SPARK-17650] malformed url's throw exceptions before bricking Executors
## What changes were proposed in this pull request? When a malformed URL was sent to Executors through `sc.addJar` and `sc.addFile`, the executors become unusable, because they constantly throw `MalformedURLException`s and can never acknowledge that the file or jar is just bad input. This PR tries to fix that problem by making sure MalformedURLs can never be submitted through `sc.addJar` and `sc.addFile`. Another solution would be to blacklist bad files and jars on Executors. Maybe fail the first time, and then ignore the second time (but print a warning message). ## How was this patch tested? Unit tests in SparkContextSuite Author: Burak Yavuz <brkyvz@gmail.com> Closes #15224 from brkyvz/SPARK-17650.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala22
3 files changed, 51 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f58037e100..4694790c72 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
-import java.net.URI
+import java.net.{MalformedURLException, URI}
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
@@ -36,18 +36,15 @@ import com.google.common.collect.MapMaker
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
- FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
-import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
- TextInputFormat}
+import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat,
- WholeTextFileInputFormat}
+import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
@@ -1452,6 +1449,9 @@ class SparkContext(config: SparkConf) extends Logging {
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
"turned on.")
}
+ } else {
+ // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
+ Utils.validateURL(uri)
}
val key = if (!isLocal && scheme == "file") {
@@ -1711,6 +1711,8 @@ class SparkContext(config: SparkConf) extends Logging {
key = env.rpcEnv.fileServer.addJar(new File(path))
} else {
val uri = new URI(path)
+ // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
+ Utils.validateURL(uri)
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 09896c4e2f..e09666c610 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -698,6 +698,26 @@ private[spark] object Utils extends Logging {
}
/**
+ * Validate that a given URI is actually a valid URL as well.
+ * @param uri The URI to validate
+ */
+ @throws[MalformedURLException]("when the URI is an invalid URL")
+ def validateURL(uri: URI): Unit = {
+ Option(uri.getScheme).getOrElse("file") match {
+ case "http" | "https" | "ftp" =>
+ try {
+ uri.toURL
+ } catch {
+ case e: MalformedURLException =>
+ val ex = new MalformedURLException(s"URI (${uri.toString}) is not a valid URL.")
+ ex.initCause(e)
+ throw ex
+ }
+ case _ => // will not be turned into a URL anyway
+ }
+ }
+
+ /**
* Get the path of a temporary directory. Spark's local directories can be configured through
* multiple settings, which are used with the following precedence:
*
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index f8d143dc61..c451c596b0 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.io.File
+import java.net.MalformedURLException
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
@@ -173,6 +174,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("SPARK-17650: malformed url's throw exceptions before bricking Executors") {
+ try {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ Seq("http", "https", "ftp").foreach { scheme =>
+ val badURL = s"$scheme://user:pwd/path"
+ val e1 = intercept[MalformedURLException] {
+ sc.addFile(badURL)
+ }
+ assert(e1.getMessage.contains(badURL))
+ val e2 = intercept[MalformedURLException] {
+ sc.addJar(badURL)
+ }
+ assert(e2.getMessage.contains(badURL))
+ assert(sc.addedFiles.isEmpty)
+ assert(sc.addedJars.isEmpty)
+ }
+ } finally {
+ sc.stop()
+ }
+ }
+
test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)