aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala21
1 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2527211929..c3c8336a43 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -144,11 +144,24 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
+
// Check whether the worker is ready to be re-used.
- if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
- if (reuse_worker) {
- env.releasePythonWorker(pythonExec, envVars.toMap, worker)
- released = true
+ if (reuse_worker) {
+ // It has a high possibility that the ending mark is already available,
+ // And current task should not be blocked by checking it
+
+ if (stream.available() >= 4) {
+ val ending = stream.readInt()
+ if (ending == SpecialLengths.END_OF_STREAM) {
+ env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+ released = true
+ logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
+ } else {
+ logInfo(s"Communication with worker did not end cleanly (ending with $ending), " +
+ s"close it: $worker")
+ }
+ } else {
+ logInfo(s"The ending mark from worker is not available, close it: $worker")
}
}
null