aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-04-04 17:16:31 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-04 17:16:31 -0700
commit198892fe8d39a2fad585fa2a7579d8b478456c33 (patch)
treebdc3fd144584b57fbe85b24f09ee46401180f889 /core
parenta02b535d5e18e987a4b9c4c352838d294f9e853b (diff)
downloadspark-198892fe8d39a2fad585fa2a7579d8b478456c33.tar.gz
spark-198892fe8d39a2fad585fa2a7579d8b478456c33.tar.bz2
spark-198892fe8d39a2fad585fa2a7579d8b478456c33.zip
[SPARK-1198] Allow pipes tasks to run in different sub-directories
This works as is on Linux/Mac/etc but doesn't cover working on Windows. In here I use ln -sf for symlinks. Putting this up for comments on that. Do we want to create perhaps some classes for doing shell commands - Linux vs Windows. Is there some other way we want to do this? I assume we are still supporting jdk1.6? Also should I update the Java API for pipes to allow this parameter? Author: Thomas Graves <tgraves@apache.org> Closes #128 from tgravescs/SPARK1198 and squashes the following commits: abc1289 [Thomas Graves] remove extra tag in pom file ba23fc0 [Thomas Graves] Add support for symlink on windows, remove commons-io usage da4b221 [Thomas Graves] Merge branch 'master' of https://github.com/tgravescs/spark into SPARK1198 61be271 [Thomas Graves] Fix file name filter 6b783bd [Thomas Graves] style fixes 1ab49ca [Thomas Graves] Add support for running pipe tasks is separate directories
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/PipedRDDSuite.scala28
4 files changed, 137 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 4250a9d02f..41ae0fec82 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -17,6 +17,9 @@
package org.apache.spark.rdd
+import java.io.File
+import java.io.FilenameFilter
+import java.io.IOException
import java.io.PrintWriter
import java.util.StringTokenizer
@@ -27,6 +30,7 @@ import scala.io.Source
import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.util.Utils
/**
@@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
command: Seq[String],
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
- printRDDElement: (T, String => Unit) => Unit)
+ printRDDElement: (T, String => Unit) => Unit,
+ separateWorkingDir: Boolean)
extends RDD[String](prev) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
- printRDDElement: (T, String => Unit) => Unit = null) =
- this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
+ printRDDElement: (T, String => Unit) => Unit = null,
+ separateWorkingDir: Boolean = false) =
+ this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
+ separateWorkingDir)
override def getPartitions: Array[Partition] = firstParent[T].partitions
+ /**
+ * A FilenameFilter that accepts anything that isn't equal to the name passed in.
+ * @param name of file or directory to leave out
+ */
+ class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
+ def accept(dir: File, name: String): Boolean = {
+ !name.equals(filterName)
+ }
+ }
+
override def compute(split: Partition, context: TaskContext): Iterator[String] = {
val pb = new ProcessBuilder(command)
// Add the environmental variables to the process.
@@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
}
+ // When spark.worker.separated.working.directory option is turned on, each
+ // task will be run in separate directory. This should be resolve file
+ // access conflict issue
+ val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
+ var workInTaskDirectory = false
+ logDebug("taskDirectory = " + taskDirectory)
+ if (separateWorkingDir == true) {
+ val currentDir = new File(".")
+ logDebug("currentDir = " + currentDir.getAbsolutePath())
+ val taskDirFile = new File(taskDirectory)
+ taskDirFile.mkdirs()
+
+ try {
+ val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
+
+ // Need to add symlinks to jars, files, and directories. On Yarn we could have
+ // directories and other files not known to the SparkContext that were added via the
+ // Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
+ // are creating here.
+ for (file <- currentDir.list(tasksDirFilter)) {
+ val fileWithDir = new File(currentDir, file)
+ Utils.symlink(new File(fileWithDir.getAbsolutePath()),
+ new File(taskDirectory + "/" + fileWithDir.getName()))
+ }
+ pb.directory(taskDirFile)
+ workInTaskDirectory = true
+ } catch {
+ case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
+ " (" + taskDirectory + ")")
+ }
+ }
+
val proc = pb.start()
val env = SparkEnv.get
@@ -112,6 +161,15 @@ class PipedRDD[T: ClassTag](
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
+
+ // cleanup task working directory if used
+ if (workInTaskDirectory == true) {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ Utils.deleteRecursively(new File(taskDirectory))
+ }
+ logDebug("Removed task working directory " + taskDirectory)
+ }
+
false
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ce2b8ac272..08c42c5ee8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -481,16 +481,19 @@ abstract class RDD[T: ClassTag](
* instead of constructing a huge String to concat all the elements:
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2){f(e)}
+ * @param separateWorkingDir Use separate working directories for each task.
* @return the result RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
- printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
+ printRDDElement: (T, String => Unit) => Unit = null,
+ separateWorkingDir: Boolean = false): RDD[String] = {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
- if (printRDDElement ne null) sc.clean(printRDDElement) else null)
+ if (printRDDElement ne null) sc.clean(printRDDElement) else null,
+ separateWorkingDir)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 62ee704d58..737b765e2a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.SortedSet
import scala.io.Source
import scala.reflect.ClassTag
@@ -43,6 +44,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
*/
private[spark] object Utils extends Logging {
+ val osName = System.getProperty("os.name")
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -521,9 +524,10 @@ private[spark] object Utils extends Logging {
/**
* Delete a file or directory and its contents recursively.
+ * Don't follow directories if they are symlinks.
*/
def deleteRecursively(file: File) {
- if (file.isDirectory) {
+ if ((file.isDirectory) && !isSymlink(file)) {
for (child <- listFilesSafely(file)) {
deleteRecursively(child)
}
@@ -537,6 +541,25 @@ private[spark] object Utils extends Logging {
}
/**
+ * Check to see if file is a symbolic link.
+ */
+ def isSymlink(file: File): Boolean = {
+ if (file == null) throw new NullPointerException("File must not be null")
+ if (osName.startsWith("Windows")) return false
+ val fileInCanonicalDir = if (file.getParent() == null) {
+ file
+ } else {
+ new File(file.getParentFile().getCanonicalFile(), file.getName())
+ }
+
+ if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
def memoryStringToMb(str: String): Int = {
@@ -898,6 +921,26 @@ private[spark] object Utils extends Logging {
count
}
+ /**
+ * Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
+ * for jdk1.6 support. Supports windows by doing copy, everything else uses "ln -sf".
+ * @param src absolute path to the source
+ * @param dst relative path for the destination
+ */
+ def symlink(src: File, dst: File) {
+ if (!src.isAbsolute()) {
+ throw new IOException("Source must be absolute")
+ }
+ if (dst.isAbsolute()) {
+ throw new IOException("Destination must be relative")
+ }
+ val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf"
+ import scala.sys.process._
+ (linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
+ (logInfo(line)))
+ }
+
+
/** Return the class name of the given object, removing all dollar signs */
def getFormattedClassName(obj: AnyRef) = {
obj.getClass.getSimpleName.replace("$", "")
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 6e7fd55fa4..627e9b5cd9 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -17,8 +17,11 @@
package org.apache.spark
-import org.scalatest.FunSuite
+import java.io.File
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
@@ -126,6 +129,29 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
}
+ test("basic pipe with separate working directory") {
+ if (testCommandAvailable("cat")) {
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
+ val c = piped.collect()
+ assert(c.size === 4)
+ assert(c(0) === "1")
+ assert(c(1) === "2")
+ assert(c(2) === "3")
+ assert(c(3) === "4")
+ val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
+ val collectPwd = pipedPwd.collect()
+ assert(collectPwd(0).contains("tasks/"))
+ val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
+ // make sure symlinks were created
+ assert(pipedLs.length > 0)
+ // clean up top level tasks directory
+ new File("tasks").delete()
+ } else {
+ assert(true)
+ }
+ }
+
test("test pipe exports map_input_file") {
testExportInputFile("map_input_file")
}