aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-05-26 23:17:39 -0700
committerAaron Davidson <aaron@databricks.com>2014-05-26 23:17:39 -0700
commit549830b0db2c8b069391224f3a73bb0d7f397f71 (patch)
treedba55b1fb8bc76ddf16d2093805ab27b34775992 /core
parent90e281b55aecbfbe4431ac582311d5790fe7aad3 (diff)
downloadspark-549830b0db2c8b069391224f3a73bb0d7f397f71.tar.gz
spark-549830b0db2c8b069391224f3a73bb0d7f397f71.tar.bz2
spark-549830b0db2c8b069391224f3a73bb0d7f397f71.zip
SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing <zsxwing@gmail.com> Closes #887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
2 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index dcbbc18531..5dd5fd0047 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
implicit val futureExecContext = ExecutionContext.fromExecutor(
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
- private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
+ @volatile
+ private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
private val authEnabled = securityManager.isAuthenticationEnabled()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6534095811..6e450081dc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/
- var cachedPeers: Seq[BlockManagerId] = null
+ @volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)