aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala6
1 files changed, 5 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 7d75929b96..ec711480eb 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -25,13 +25,17 @@ import org.apache.spark.scheduler._
/**
* :: DeveloperApi ::
* A SparkListener that maintains executor storage status.
+ *
+ * This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
- def storageStatusList: Seq[StorageStatus] = executorIdToStorageStatus.values.toSeq
+ def storageStatusList: Seq[StorageStatus] = synchronized {
+ executorIdToStorageStatus.values.toSeq
+ }
/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {