aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-11-16 16:59:16 -0800
committerAndrew Or <andrew@databricks.com>2015-11-16 16:59:16 -0800
commit30f3cfda1cce8760f15c0a48a8c47f09a5167fca (patch)
tree54f8837277cdb423ee45b8b6b3022073e107af1c
parentea6f53e48a911b49dc175ccaac8c80e0a1d97a09 (diff)
downloadspark-30f3cfda1cce8760f15c0a48a8c47f09a5167fca.tar.gz
spark-30f3cfda1cce8760f15c0a48a8c47f09a5167fca.tar.bz2
spark-30f3cfda1cce8760f15c0a48a8c47f09a5167fca.zip
[SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync
When we call AsyncRDDActions#takeAsync, actually another DAGScheduler#runJob is called from another thread so we cannot get proper callsite infomation. Following screenshots are before this patch applied and after. Before: <img width="1268" alt="2015-11-04 1 26 40" src="https://cloud.githubusercontent.com/assets/4736016/10914069/0ffc1306-8294-11e5-8e89-c4fadf58dd12.png"> <img width="1258" alt="2015-11-04 1 26 52" src="https://cloud.githubusercontent.com/assets/4736016/10914070/0ffe84ce-8294-11e5-8b2a-69d36276bedb.png"> After: <img width="1268" alt="2015-11-04 0 48 07" src="https://cloud.githubusercontent.com/assets/4736016/10914080/1d8cfb7a-8294-11e5-9e09-ede25c2563e8.png"> <img width="1269" alt="2015-11-04 0 48 26" src="https://cloud.githubusercontent.com/assets/4736016/10914081/1d934e3a-8294-11e5-8b5e-e3dc37aaced3.png"> Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #9437 from sarutak/SPARK-11480.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ca1eb1f4e4..d5e853613b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -66,6 +66,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
*/
def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
val f = new ComplexFutureAction[Seq[T]]
+ val callSite = self.context.getCallSite
f.run {
// This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which
@@ -73,6 +74,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
val results = new ArrayBuffer[T](num)
val totalParts = self.partitions.length
var partsScanned = 0
+ self.context.setCallSite(callSite)
while (results.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.