diff options
author | Davies Liu <davies@databricks.com> | 2015-02-26 11:54:17 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-26 11:54:27 -0800 |
commit | 7fa960e653a905fc48d4097b49ce560cff919fa2 (patch) | |
tree | 4d4e68db4e551d11c58b6419ce1a1882899aa116 /core | |
parent | cfff397f0adb27ca102cca43a7696e9fb1819ee0 (diff) | |
download | spark-7fa960e653a905fc48d4097b49ce560cff919fa2.tar.gz spark-7fa960e653a905fc48d4097b49ce560cff919fa2.tar.bz2 spark-7fa960e653a905fc48d4097b49ce560cff919fa2.zip |
[SPARK-5363] Fix bug in PythonRDD: remove() inside iterator is not safe
Removing elements from a mutable HashSet while iterating over it can cause the
iteration to incorrectly skip over entries that were not removed. If this
happened, PythonRDD would write fewer broadcast variables than the Python
worker was expecting to read, which would cause the Python worker to hang
indefinitely.
Author: Davies Liu <davies@databricks.com>
Closes #4776 from davies/fix_hang and squashes the following commits:
a4384a5 [Davies Liu] fix bug: remvoe() inside iterator is not safe
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index dcb6e6313a..b1cec0f647 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -219,14 +219,13 @@ private[spark] class PythonRDD( val oldBids = PythonRDD.getWorkerBroadcasts(worker) val newBids = broadcastVars.map(_.id).toSet // number of different broadcasts - val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size + val toRemove = oldBids.diff(newBids) + val cnt = toRemove.size + newBids.diff(oldBids).size dataOut.writeInt(cnt) - for (bid <- oldBids) { - if (!newBids.contains(bid)) { - // remove the broadcast from worker - dataOut.writeLong(- bid - 1) // bid >= 0 - oldBids.remove(bid) - } + for (bid <- toRemove) { + // remove the broadcast from worker + dataOut.writeLong(- bid - 1) // bid >= 0 + oldBids.remove(bid) } for (broadcast <- broadcastVars) { if (!oldBids.contains(broadcast.id)) { |