aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala13
1 files changed, 6 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index dcb6e6313a..b1cec0f647 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -219,14 +219,13 @@ private[spark] class PythonRDD(
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
- val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
+ val toRemove = oldBids.diff(newBids)
+ val cnt = toRemove.size + newBids.diff(oldBids).size
dataOut.writeInt(cnt)
- for (bid <- oldBids) {
- if (!newBids.contains(bid)) {
- // remove the broadcast from worker
- dataOut.writeLong(- bid - 1) // bid >= 0
- oldBids.remove(bid)
- }
+ for (bid <- toRemove) {
+ // remove the broadcast from worker
+ dataOut.writeLong(- bid - 1) // bid >= 0
+ oldBids.remove(bid)
}
for (broadcast <- broadcastVars) {
if (!oldBids.contains(broadcast.id)) {