aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWeiqing Yang <yangweiqing001@gmail.com>2016-09-27 08:10:38 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-09-27 08:10:38 -0500
commit6a68c5d7b4eb07e4ed6b702dd1536cd08d9bba7d (patch)
tree574c59538b955bc0a11c28242ffd6135cd95c257
parent7f16affa262b059580ed2775a7b05a767aa72315 (diff)
downloadspark-6a68c5d7b4eb07e4ed6b702dd1536cd08d9bba7d.tar.gz
spark-6a68c5d7b4eb07e4ed6b702dd1536cd08d9bba7d.tar.bz2
spark-6a68c5d7b4eb07e4ed6b702dd1536cd08d9bba7d.zip
[SPARK-16757] Set up Spark caller context to HDFS and YARN
## What changes were proposed in this pull request? 1. Pass `jobId` to Task. 2. Invoke Hadoop APIs. * A new function `setCallerContext` is added in `Utils`. `setCallerContext` function invokes APIs of `org.apache.hadoop.ipc.CallerContext` to set up spark caller contexts, which will be written into `hdfs-audit.log` and Yarn RM audit log. * For HDFS: Spark sets up its caller context by invoking`org.apache.hadoop.ipc.CallerContext` in `Task` and Yarn `Client` and `ApplicationMaster`. * For Yarn: Spark sets up its caller context by invoking `org.apache.hadoop.ipc.CallerContext` in Yarn `Client`. ## How was this patch tested? Manual Tests against some Spark applications in Yarn client mode and Yarn cluster mode. Need to check if spark caller contexts are written into HDFS hdfs-audit.log and Yarn RM audit log successfully. For example, run SparkKmeans in Yarn client mode: ``` ./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5 ``` **Before**: There will be no Spark caller context in records of `hdfs-audit.log` and Yarn RM audit log. **After**: Spark caller contexts will be written in records of `hdfs-audit.log` and Yarn RM audit log. These are records in `hdfs-audit.log`: ``` 2016-09-20 11:54:24,116 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_CLIENT_AppId_application_1474394339641_0005 2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_2_AttemptNum_0 2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_1_AttemptNum_0 2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_0_AttemptNum_0 ``` ``` 2016-09-20 11:59:33,868 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=mkdirs src=/private/tmp/hadoop-wyang/nm-local-dir/usercache/wyang/appcache/application_1474394339641_0006/container_1474394339641_0006_01_000001/spark-warehouse dst=null perm=wyang:supergroup:rwxr-xr-x proto=rpc callerContext=SPARK_APPLICATION_MASTER_AppId_application_1474394339641_0006_AttemptId_1 2016-09-20 11:59:37,214 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_1_AttemptNum_0 2016-09-20 11:59:37,215 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_2_AttemptNum_0 2016-09-20 11:59:37,215 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_0_AttemptNum_0 2016-09-20 11:59:42,391 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE) ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_3_AttemptNum_0 ``` This is a record in Yarn RM log: ``` 2016-09-20 11:59:24,050 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=wyang IP=127.0.0.1 OPERATION=Submit Application Request TARGET=ClientRMService RESULT=SUCCESS APPID=application_1474394339641_0006 CALLERCONTEXT=SPARK_CLIENT_AppId_application_1474394339641_0006 ``` Author: Weiqing Yang <yangweiqing001@gmail.com> Closes #14659 from Sherry302/callercontextSubmit.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala12
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
8 files changed, 126 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dd47c1dbbe..5ea0b48f6e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1015,7 +1015,8 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
+ taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
+ Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
@@ -1024,7 +1025,8 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
+ taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
+ Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 609f10aee9..1e7c63af2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -43,7 +43,12 @@ import org.apache.spark.rdd.RDD
* input RDD's partitions).
* @param localProperties copy of thread-local properties set by the user on the driver side.
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
- */
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
+ */
private[spark] class ResultTask[T, U](
stageId: Int,
stageAttemptId: Int,
@@ -52,8 +57,12 @@ private[spark] class ResultTask[T, U](
locs: Seq[TaskLocation],
val outputId: Int,
localProperties: Properties,
- metrics: TaskMetrics)
- extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties)
+ metrics: TaskMetrics,
+ jobId: Option[Int] = None,
+ appId: Option[String] = None,
+ appAttemptId: Option[String] = None)
+ extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
+ appId, appAttemptId)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 448fe02084..66d6790e16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -44,6 +44,11 @@ import org.apache.spark.shuffle.ShuffleWriter
* @param locs preferred task execution locations for locality scheduling
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] class ShuffleMapTask(
stageId: Int,
@@ -52,8 +57,12 @@ private[spark] class ShuffleMapTask(
partition: Partition,
@transient private var locs: Seq[TaskLocation],
metrics: TaskMetrics,
- localProperties: Properties)
- extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties)
+ localProperties: Properties,
+ jobId: Option[Int] = None,
+ appId: Option[String] = None,
+ appAttemptId: Option[String] = None)
+ extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
+ appId, appAttemptId)
with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 48daa344f3..9385e3c31e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -29,7 +29,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOutputStream, Utils}
+import org.apache.spark.util._
/**
* A unit of execution. We have two kinds of Task's in Spark:
@@ -47,6 +47,11 @@ import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOu
* @param partitionId index of the number in the RDD
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] abstract class Task[T](
val stageId: Int,
@@ -54,7 +59,10 @@ private[spark] abstract class Task[T](
val partitionId: Int,
// The default value is only used in tests.
val metrics: TaskMetrics = TaskMetrics.registered,
- @transient var localProperties: Properties = new Properties) extends Serializable {
+ @transient var localProperties: Properties = new Properties,
+ val jobId: Option[Int] = None,
+ val appId: Option[String] = None,
+ val appAttemptId: Option[String] = None) extends Serializable {
/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
@@ -79,9 +87,14 @@ private[spark] abstract class Task[T](
metrics)
TaskContext.setTaskContext(context)
taskThread = Thread.currentThread()
+
if (_killed) {
kill(interruptThread = false)
}
+
+ new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),
+ Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()
+
try {
runTask(context)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index e09666c610..caa768cfbd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2441,6 +2441,68 @@ private[spark] object Utils extends Logging {
}
/**
+ * An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be
+ * constructed by parameters passed in.
+ * When Spark applications run on Yarn and HDFS, its caller contexts will be written into Yarn RM
+ * audit log and hdfs-audit.log. That can help users to better diagnose and understand how
+ * specific applications impacting parts of the Hadoop system and potential problems they may be
+ * creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's
+ * very helpful to track which upper level job issues it.
+ *
+ * @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
+ *
+ * The parameters below are optional:
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
+ * @param jobId id of the job this task belongs to
+ * @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
+ * @param taskId task id
+ * @param taskAttemptNumber task attempt id
+ */
+private[spark] class CallerContext(
+ from: String,
+ appId: Option[String] = None,
+ appAttemptId: Option[String] = None,
+ jobId: Option[Int] = None,
+ stageId: Option[Int] = None,
+ stageAttemptId: Option[Int] = None,
+ taskId: Option[Long] = None,
+ taskAttemptNumber: Option[Int] = None) extends Logging {
+
+ val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
+ val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
+ val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
+ val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
+ val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
+ val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
+ val taskAttemptNumberStr =
+ if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""
+
+ val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
+ jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr
+
+ /**
+ * Set up the caller context [[context]] by invoking Hadoop CallerContext API of
+ * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
+ */
+ def setCurrentContext(): Boolean = {
+ var succeed = false
+ try {
+ val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
+ val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
+ val builderInst = Builder.getConstructor(classOf[String]).newInstance(context)
+ val hdfsContext = Builder.getMethod("build").invoke(builderInst)
+ callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
+ succeed = true
+ } catch {
+ case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+ }
+ succeed
+ }
+}
+
+/**
* A utility class to redirect the child process's stdout or stderr.
*/
private[spark] class RedirectThread(
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4715fd2937..bc28b2d9cb 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -788,6 +788,18 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
.set("spark.executor.instances", "1")) === 3)
}
+ test("Set Spark CallerContext") {
+ val context = "test"
+ try {
+ val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
+ assert(new CallerContext(context).setCurrentContext())
+ assert(s"SPARK_$context" ===
+ callerContext.getMethod("getCurrent").invoke(null).toString)
+ } catch {
+ case e: ClassNotFoundException =>
+ assert(!new CallerContext(context).setCurrentContext())
+ }
+ }
test("encodeFileNameToURIRawPath") {
assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ad50ea789a..aabae140af 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -184,6 +184,8 @@ private[spark] class ApplicationMaster(
try {
val appAttemptId = client.getAttemptId()
+ var attemptID: Option[String] = None
+
if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
@@ -196,8 +198,13 @@ private[spark] class ApplicationMaster(
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+
+ attemptID = Option(appAttemptId.getAttemptId.toString)
}
+ new CallerContext("APPMASTER",
+ Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
+
logInfo("ApplicationAttemptId: " + appAttemptId)
val fs = FileSystem.get(yarnConf)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2398f0aea3..ea4e1160b7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -54,7 +54,7 @@ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallerContext, Utils}
private[spark] class Client(
val args: ClientArguments,
@@ -161,6 +161,8 @@ private[spark] class Client(
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)
+ new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
+
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)