aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala64
1 files changed, 61 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 4250a9d02f..41ae0fec82 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -17,6 +17,9 @@
package org.apache.spark.rdd
+import java.io.File
+import java.io.FilenameFilter
+import java.io.IOException
import java.io.PrintWriter
import java.util.StringTokenizer
@@ -27,6 +30,7 @@ import scala.io.Source
import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.util.Utils
/**
@@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
command: Seq[String],
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
- printRDDElement: (T, String => Unit) => Unit)
+ printRDDElement: (T, String => Unit) => Unit,
+ separateWorkingDir: Boolean)
extends RDD[String](prev) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
- printRDDElement: (T, String => Unit) => Unit = null) =
- this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
+ printRDDElement: (T, String => Unit) => Unit = null,
+ separateWorkingDir: Boolean = false) =
+ this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
+ separateWorkingDir)
override def getPartitions: Array[Partition] = firstParent[T].partitions
+ /**
+ * A FilenameFilter that accepts anything that isn't equal to the name passed in.
+ * @param name of file or directory to leave out
+ */
+ class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
+ def accept(dir: File, name: String): Boolean = {
+ !name.equals(filterName)
+ }
+ }
+
override def compute(split: Partition, context: TaskContext): Iterator[String] = {
val pb = new ProcessBuilder(command)
// Add the environmental variables to the process.
@@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
}
+ // When spark.worker.separated.working.directory option is turned on, each
+ // task will be run in separate directory. This should be resolve file
+ // access conflict issue
+ val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
+ var workInTaskDirectory = false
+ logDebug("taskDirectory = " + taskDirectory)
+ if (separateWorkingDir == true) {
+ val currentDir = new File(".")
+ logDebug("currentDir = " + currentDir.getAbsolutePath())
+ val taskDirFile = new File(taskDirectory)
+ taskDirFile.mkdirs()
+
+ try {
+ val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
+
+ // Need to add symlinks to jars, files, and directories. On Yarn we could have
+ // directories and other files not known to the SparkContext that were added via the
+ // Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
+ // are creating here.
+ for (file <- currentDir.list(tasksDirFilter)) {
+ val fileWithDir = new File(currentDir, file)
+ Utils.symlink(new File(fileWithDir.getAbsolutePath()),
+ new File(taskDirectory + "/" + fileWithDir.getName()))
+ }
+ pb.directory(taskDirFile)
+ workInTaskDirectory = true
+ } catch {
+ case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
+ " (" + taskDirectory + ")")
+ }
+ }
+
val proc = pb.start()
val env = SparkEnv.get
@@ -112,6 +161,15 @@ class PipedRDD[T: ClassTag](
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
+
+ // cleanup task working directory if used
+ if (workInTaskDirectory == true) {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ Utils.deleteRecursively(new File(taskDirectory))
+ }
+ logDebug("Removed task working directory " + taskDirectory)
+ }
+
false
}
}