From 198892fe8d39a2fad585fa2a7579d8b478456c33 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 4 Apr 2014 17:16:31 -0700 Subject: [SPARK-1198] Allow pipes tasks to run in different sub-directories This works as is on Linux/Mac/etc but doesn't cover working on Windows. In here I use ln -sf for symlinks. Putting this up for comments on that. Do we want to create perhaps some classes for doing shell commands - Linux vs Windows. Is there some other way we want to do this? I assume we are still supporting jdk1.6? Also should I update the Java API for pipes to allow this parameter? Author: Thomas Graves Closes #128 from tgravescs/SPARK1198 and squashes the following commits: abc1289 [Thomas Graves] remove extra tag in pom file ba23fc0 [Thomas Graves] Add support for symlink on windows, remove commons-io usage da4b221 [Thomas Graves] Merge branch 'master' of https://github.com/tgravescs/spark into SPARK1198 61be271 [Thomas Graves] Fix file name filter 6b783bd [Thomas Graves] style fixes 1ab49ca [Thomas Graves] Add support for running pipe tasks is separate directories --- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 64 +++++++++++++++++++++- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 ++- .../main/scala/org/apache/spark/util/Utils.scala | 45 ++++++++++++++- .../scala/org/apache/spark/PipedRDDSuite.scala | 28 +++++++++- 4 files changed, 137 insertions(+), 7 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 4250a9d02f..41ae0fec82 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -17,6 +17,9 @@ package org.apache.spark.rdd +import java.io.File +import java.io.FilenameFilter +import java.io.IOException import java.io.PrintWriter import java.util.StringTokenizer @@ -27,6 +30,7 @@ import scala.io.Source import scala.reflect.ClassTag import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.util.Utils /** @@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag]( command: Seq[String], envVars: Map[String, String], printPipeContext: (String => Unit) => Unit, - printRDDElement: (T, String => Unit) => Unit) + printRDDElement: (T, String => Unit) => Unit, + separateWorkingDir: Boolean) extends RDD[String](prev) { // Similar to Runtime.exec(), if we are given a single string, split it into words @@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag]( command: String, envVars: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, - printRDDElement: (T, String => Unit) => Unit = null) = - this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement) + printRDDElement: (T, String => Unit) => Unit = null, + separateWorkingDir: Boolean = false) = + this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement, + separateWorkingDir) override def getPartitions: Array[Partition] = firstParent[T].partitions + /** + * A FilenameFilter that accepts anything that isn't equal to the name passed in. + * @param name of file or directory to leave out + */ + class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter { + def accept(dir: File, name: String): Boolean = { + !name.equals(filterName) + } + } + override def compute(split: Partition, context: TaskContext): Iterator[String] = { val pb = new ProcessBuilder(command) // Add the environmental variables to the process. @@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag]( currentEnvVars.putAll(hadoopSplit.getPipeEnvVars()) } + // When spark.worker.separated.working.directory option is turned on, each + // task will be run in separate directory. This should be resolve file + // access conflict issue + val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString + var workInTaskDirectory = false + logDebug("taskDirectory = " + taskDirectory) + if (separateWorkingDir == true) { + val currentDir = new File(".") + logDebug("currentDir = " + currentDir.getAbsolutePath()) + val taskDirFile = new File(taskDirectory) + taskDirFile.mkdirs() + + try { + val tasksDirFilter = new NotEqualsFileNameFilter("tasks") + + // Need to add symlinks to jars, files, and directories. On Yarn we could have + // directories and other files not known to the SparkContext that were added via the + // Hadoop distributed cache. We also don't want to symlink to the /tasks directories we + // are creating here. + for (file <- currentDir.list(tasksDirFilter)) { + val fileWithDir = new File(currentDir, file) + Utils.symlink(new File(fileWithDir.getAbsolutePath()), + new File(taskDirectory + "/" + fileWithDir.getName())) + } + pb.directory(taskDirFile) + workInTaskDirectory = true + } catch { + case e: Exception => logError("Unable to setup task working directory: " + e.getMessage + + " (" + taskDirectory + ")") + } + } + val proc = pb.start() val env = SparkEnv.get @@ -112,6 +161,15 @@ class PipedRDD[T: ClassTag]( if (exitStatus != 0) { throw new Exception("Subprocess exited with status " + exitStatus) } + + // cleanup task working directory if used + if (workInTaskDirectory == true) { + scala.util.control.Exception.ignoring(classOf[IOException]) { + Utils.deleteRecursively(new File(taskDirectory)) + } + logDebug("Removed task working directory " + taskDirectory) + } + false } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ce2b8ac272..08c42c5ee8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -481,16 +481,19 @@ abstract class RDD[T: ClassTag]( * instead of constructing a huge String to concat all the elements: * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = * for (e <- record._2){f(e)} + * @param separateWorkingDir Use separate working directories for each task. * @return the result RDD */ def pipe( command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, - printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = { + printRDDElement: (T, String => Unit) => Unit = null, + separateWorkingDir: Boolean = false): RDD[String] = { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, - if (printRDDElement ne null) sc.clean(printRDDElement) else null) + if (printRDDElement ne null) sc.clean(printRDDElement) else null, + separateWorkingDir) } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 62ee704d58..737b765e2a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.SortedSet import scala.io.Source import scala.reflect.ClassTag @@ -43,6 +44,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, */ private[spark] object Utils extends Logging { + val osName = System.getProperty("os.name") + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -521,9 +524,10 @@ private[spark] object Utils extends Logging { /** * Delete a file or directory and its contents recursively. + * Don't follow directories if they are symlinks. */ def deleteRecursively(file: File) { - if (file.isDirectory) { + if ((file.isDirectory) && !isSymlink(file)) { for (child <- listFilesSafely(file)) { deleteRecursively(child) } @@ -536,6 +540,25 @@ private[spark] object Utils extends Logging { } } + /** + * Check to see if file is a symbolic link. + */ + def isSymlink(file: File): Boolean = { + if (file == null) throw new NullPointerException("File must not be null") + if (osName.startsWith("Windows")) return false + val fileInCanonicalDir = if (file.getParent() == null) { + file + } else { + new File(file.getParentFile().getCanonicalFile(), file.getName()) + } + + if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) { + return false; + } else { + return true; + } + } + /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */ @@ -898,6 +921,26 @@ private[spark] object Utils extends Logging { count } + /** + * Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here + * for jdk1.6 support. Supports windows by doing copy, everything else uses "ln -sf". + * @param src absolute path to the source + * @param dst relative path for the destination + */ + def symlink(src: File, dst: File) { + if (!src.isAbsolute()) { + throw new IOException("Source must be absolute") + } + if (dst.isAbsolute()) { + throw new IOException("Destination must be relative") + } + val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf" + import scala.sys.process._ + (linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line => + (logInfo(line))) + } + + /** Return the class name of the given object, removing all dollar signs */ def getFormattedClassName(obj: AnyRef) = { obj.getClass.getSimpleName.replace("$", "") diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 6e7fd55fa4..627e9b5cd9 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -17,8 +17,11 @@ package org.apache.spark -import org.scalatest.FunSuite +import java.io.File + +import com.google.common.io.Files +import org.scalatest.FunSuite import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition} import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit} @@ -126,6 +129,29 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { } } + test("basic pipe with separate working directory") { + if (testCommandAvailable("cat")) { + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) + val c = piped.collect() + assert(c.size === 4) + assert(c(0) === "1") + assert(c(1) === "2") + assert(c(2) === "3") + assert(c(3) === "4") + val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) + val collectPwd = pipedPwd.collect() + assert(collectPwd(0).contains("tasks/")) + val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect() + // make sure symlinks were created + assert(pipedLs.length > 0) + // clean up top level tasks directory + new File("tasks").delete() + } else { + assert(true) + } + } + test("test pipe exports map_input_file") { testExportInputFile("map_input_file") } -- cgit v1.2.3