diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-07-03 23:02:36 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-07-03 23:02:36 -0700 |
commit | 97a0bfe1c0261384f09d53f9350de52fb6446d59 (patch) | |
tree | b4e0a124a0e5186a05acb38c20ef101f56818430 | |
parent | 3894a49be9b532cc026d908a0f49bca850504498 (diff) | |
download | spark-97a0bfe1c0261384f09d53f9350de52fb6446d59.tar.gz spark-97a0bfe1c0261384f09d53f9350de52fb6446d59.tar.bz2 spark-97a0bfe1c0261384f09d53f9350de52fb6446d59.zip |
SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
JIRA: https://issues.apache.org/jira/browse/SPARK-2282
This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage that lasts for some period of time after the communication closes.
This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with the buildup of the rapid creation of these sockets.
Author: Aaron Davidson <aaron@databricks.com>
Closes #1220 from aarondav/SPARK-2282 and squashes the following commits:
2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 |
1 files changed, 2 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f6570d3357..462e09466b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: } else { // This happens on the master, where we pass the updates to Python through a socket val socket = new Socket(serverHost, serverPort) + // SPARK-2282: Immediately reuse closed sockets because we create one per task. + socket.setReuseAddress(true) val in = socket.getInputStream val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize)) out.writeInt(val2.size) |