aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/Utils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/Utils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala37
1 files changed, 33 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8e69f1d335..0eb2f78b73 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.util
import java.io._
-import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL}
+import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
import java.nio.ByteBuffer
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
@@ -33,10 +33,11 @@ import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
-import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
+
/**
* Various utility methods used by Spark.
*/
@@ -233,13 +234,29 @@ private[spark] object Utils extends Logging {
}
/**
+ * Construct a URI container information used for authentication.
+ * This also sets the default authenticator to properly negotiation the
+ * user/password based on the URI.
+ *
+ * Note this relies on the Authenticator.setDefault being set properly to decode
+ * the user name and password. This is currently set in the SecurityManager.
+ */
+ def constructURIForAuthentication(uri: URI, securityMgr: SecurityManager): URI = {
+ val userCred = securityMgr.getSecretKey()
+ if (userCred == null) throw new Exception("Secret key is null with authentication on")
+ val userInfo = securityMgr.getHttpUser() + ":" + userCred
+ new URI(uri.getScheme(), userInfo, uri.getHost(), uri.getPort(), uri.getPath(),
+ uri.getQuery(), uri.getFragment())
+ }
+
+ /**
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
- def fetchFile(url: String, targetDir: File, conf: SparkConf) {
+ def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) {
val filename = url.split("/").last
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
@@ -249,7 +266,19 @@ private[spark] object Utils extends Logging {
uri.getScheme match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)
- val in = new URL(url).openStream()
+
+ var uc: URLConnection = null
+ if (securityMgr.isAuthenticationEnabled()) {
+ logDebug("fetchFile with security enabled")
+ val newuri = constructURIForAuthentication(uri, securityMgr)
+ uc = newuri.toURL().openConnection()
+ uc.setAllowUserInteraction(false)
+ } else {
+ logDebug("fetchFile not using security")
+ uc = new URL(url).openConnection()
+ }
+
+ val in = uc.getInputStream();
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {