aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-07-03 23:02:36 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-03 23:02:47 -0700
commit313f202e27878bb9a1ec425defd248203bc73c5f (patch)
tree4e8312fa6fd0a547cb34de43f4c6578c2df2488d /core
parentcf1d46e46518c818d20f07cdaabbd8069d877ca8 (diff)
downloadspark-313f202e27878bb9a1ec425defd248203bc73c5f.tar.gz
spark-313f202e27878bb9a1ec425defd248203bc73c5f.tar.bz2
spark-313f202e27878bb9a1ec425defd248203bc73c5f.zip
SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
JIRA: https://issues.apache.org/jira/browse/SPARK-2282 This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage that lasts for some period of time after the communication closes. This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with the buildup of the rapid creation of these sockets. Author: Aaron Davidson <aaron@databricks.com> Closes #1220 from aarondav/SPARK-2282 and squashes the following commits: 2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark (cherry picked from commit 97a0bfe1c0261384f09d53f9350de52fb6446d59) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 57b28b9972..0217a58e08 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -402,6 +402,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
} else {
// This happens on the master, where we pass the updates to Python through a socket
val socket = new Socket(serverHost, serverPort)
+ // SPARK-2282: Immediately reuse closed sockets because we create one per task.
+ socket.setReuseAddress(true)
val in = socket.getInputStream
val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
out.writeInt(val2.size)