aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJacek Lewandowski <lewandowski.jacek@gmail.com>2015-07-21 09:53:33 -0700
committerAndrew Or <andrew@databricks.com>2015-07-21 09:53:33 -0700
commit31954910d67c29874d2af22ee30590a7346a464c (patch)
tree4264d50c14b4c5f0e176e1574b16e627fda84c20 /core
parent9a4fd875b39b6a1ef7038823d1c49b0826110fbc (diff)
downloadspark-31954910d67c29874d2af22ee30590a7346a464c.tar.gz
spark-31954910d67c29874d2af22ee30590a7346a464c.tar.bz2
spark-31954910d67c29874d2af22ee30590a7346a464c.zip
[SPARK-7171] Added a method to retrieve metrics sources in TaskContext
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Closes #5805 from jacek-lewandowski/SPARK-7171 and squashes the following commits: ed20bda [Jacek Lewandowski] SPARK-7171: Added a method to retrieve metrics sources in TaskContext
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala8
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala6
12 files changed, 59 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index e93eb93124..b48836d5c8 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -21,6 +21,7 @@ import java.io.Serializable
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.metrics.source.Source
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.TaskCompletionListener
@@ -149,6 +150,14 @@ abstract class TaskContext extends Serializable {
def taskMetrics(): TaskMetrics
/**
+ * ::DeveloperApi::
+ * Returns all metrics sources with the given name which are associated with the instance
+ * which runs the task. For more information see [[org.apache.spark.metrics.MetricsSystem!]].
+ */
+ @DeveloperApi
+ def getMetricsSources(sourceName: String): Seq[Source]
+
+ /**
* Returns the manager for this task's managed memory.
*/
private[spark] def taskMemoryManager(): TaskMemoryManager
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 6e394f1b12..9ee168ae01 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -20,6 +20,8 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.source.Source
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
@@ -29,6 +31,7 @@ private[spark] class TaskContextImpl(
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
+ @transient private val metricsSystem: MetricsSystem,
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
@@ -95,6 +98,9 @@ private[spark] class TaskContextImpl(
override def isInterrupted(): Boolean = interrupted
+ override def getMetricsSources(sourceName: String): Seq[Source] =
+ metricsSystem.getSourcesByName(sourceName)
+
@transient private val accumulators = new HashMap[Long, Accumulable[_, _]]
private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9087debde8..66624ffbe4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -210,7 +210,10 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
val (value, accumUpdates) = try {
- task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
+ task.run(
+ taskAttemptId = taskId,
+ attemptNumber = attemptNumber,
+ metricsSystem = env.metricsSystem)
} finally {
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
// when changing this, make sure to update both copies.
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 67f64d5e27..4517f465eb 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -142,6 +142,9 @@ private[spark] class MetricsSystem private (
} else { defaultName }
}
+ def getSourcesByName(sourceName: String): Seq[Source] =
+ sources.filter(_.sourceName == sourceName)
+
def registerSource(source: Source) {
sources += source
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 71a219a4f3..b829d06923 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -682,6 +682,7 @@ class DAGScheduler(
taskAttemptId = 0,
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
+ metricsSystem = env.metricsSystem,
runningLocally = true)
TaskContext.setTaskContext(taskContext)
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 76a19aeac4..d11a00956a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
+import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.{TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
@@ -61,13 +62,18 @@ private[spark] abstract class Task[T](
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task along with updates of Accumulators.
*/
- final def run(taskAttemptId: Long, attemptNumber: Int): (T, AccumulatorUpdates) = {
+ final def run(
+ taskAttemptId: Long,
+ attemptNumber: Int,
+ metricsSystem: MetricsSystem)
+ : (T, AccumulatorUpdates) = {
context = new TaskContextImpl(
stageId = stageId,
partitionId = partitionId,
taskAttemptId = taskAttemptId,
attemptNumber = attemptNumber,
taskMemoryManager = taskMemoryManager,
+ metricsSystem = metricsSystem,
runningLocally = false)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index dfd86d3e51..1b04a3b1cf 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1011,7 +1011,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, new TaskMetrics());
+ TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, null, false, new TaskMetrics());
Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
}
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index af81e46a65..618a5fb247 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -65,7 +65,7 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
// in blockManager.put is a losing battle. You have been warned.
blockManager = sc.env.blockManager
cacheManager = sc.env.cacheManager
- val context = new TaskContextImpl(0, 0, 0, 0, null)
+ val context = new TaskContextImpl(0, 0, 0, 0, null, null)
val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
assert(computeValue.toList === List(1, 2, 3, 4))
@@ -77,7 +77,7 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
- val context = new TaskContextImpl(0, 0, 0, 0, null)
+ val context = new TaskContextImpl(0, 0, 0, 0, null, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
@@ -86,14 +86,14 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
// Local computation should not persist the resulting value, so don't expect a put().
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
- val context = new TaskContextImpl(0, 0, 0, 0, null, true)
+ val context = new TaskContextImpl(0, 0, 0, 0, null, null, true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
- val context = new TaskContextImpl(0, 0, 0, 0, null)
+ val context = new TaskContextImpl(0, 0, 0, 0, null, null)
cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 32f04d54ef..3e8816a4c6 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -175,7 +175,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
- val tContext = new TaskContextImpl(0, 0, 0, 0, null)
+ val tContext = new TaskContextImpl(0, 0, 0, 0, null, null)
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index b9b0eccb0d..9201d1e1f3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -24,11 +24,27 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener}
+import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
+import org.apache.spark.metrics.source.JvmSource
class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
+ test("provide metrics sources") {
+ val filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile
+ val conf = new SparkConf(loadDefaults = false)
+ .set("spark.metrics.conf", filePath)
+ sc = new SparkContext("local", "test", conf)
+ val rdd = sc.makeRDD(1 to 1)
+ val result = sc.runJob(rdd, (tc: TaskContext, it: Iterator[Int]) => {
+ tc.getMetricsSources("jvm").count {
+ case source: JvmSource => true
+ case _ => false
+ }
+ }).sum
+ assert(result > 0)
+ }
+
test("calls TaskCompletionListener after failure") {
TaskContextSuite.completed = false
sc = new SparkContext("local", "test")
@@ -44,13 +60,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val task = new ResultTask[String, String](0, 0,
sc.broadcast(closureSerializer.serialize((rdd, func)).array), rdd.partitions(0), Seq(), 0)
intercept[RuntimeException] {
- task.run(0, 0)
+ task.run(0, 0, null)
}
assert(TaskContextSuite.completed === true)
}
test("all TaskCompletionListeners should be called even if some fail") {
- val context = new TaskContextImpl(0, 0, 0, 0, null)
+ val context = new TaskContextImpl(0, 0, 0, 0, null, null)
val listener = mock(classOf[TaskCompletionListener])
context.addTaskCompletionListener(_ => throw new Exception("blah"))
context.addTaskCompletionListener(listener)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala
index 6c9cb448e7..db718ecabb 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala
@@ -138,7 +138,7 @@ class HashShuffleReaderSuite extends SparkFunSuite with LocalSparkContext {
shuffleHandle,
reduceId,
reduceId + 1,
- new TaskContextImpl(0, 0, 0, 0, null),
+ new TaskContextImpl(0, 0, 0, 0, null, null),
blockManager,
mapOutputTracker)
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 64f3fbdceb..cf8bd8ae69 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -95,7 +95,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
)
val iterator = new ShuffleBlockFetcherIterator(
- new TaskContextImpl(0, 0, 0, 0, null),
+ new TaskContextImpl(0, 0, 0, 0, null, null),
transfer,
blockManager,
blocksByAddress,
@@ -165,7 +165,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))
- val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
+ val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null)
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,
@@ -227,7 +227,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))
- val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
+ val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null)
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,