diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-02-17 07:48:27 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-17 07:49:02 -0800 |
commit | ee6e3eff02e9e08b1113ba6faf3397d7e7775087 (patch) | |
tree | d5dc43f81dfa66db59436df0f1932f25d36a763a /core | |
parent | a65766bf0244a41b793b9dc5fbdd2882664ad00e (diff) | |
download | spark-ee6e3eff02e9e08b1113ba6faf3397d7e7775087.tar.gz spark-ee6e3eff02e9e08b1113ba6faf3397d7e7775087.tar.bz2 spark-ee6e3eff02e9e08b1113ba6faf3397d7e7775087.zip |
Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"
This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 21 |
1 files changed, 4 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index e94c390df8..2527211929 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -144,24 +144,11 @@ private[spark] class PythonRDD( stream.readFully(update) accumulator += Collections.singletonList(update) } - // Check whether the worker is ready to be re-used. - if (reuse_worker) { - // It has a high possibility that the ending mark is already available, - // And current task should not be blocked by checking it - - if (stream.available() >= 4) { - val ending = stream.readInt() - if (ending == SpecialLengths.END_OF_STREAM) { - env.releasePythonWorker(pythonExec, envVars.toMap, worker) - released = true - logInfo(s"Communication with worker ended cleanly, re-use it: $worker") - } else { - logInfo(s"Communication with worker did not end cleanly " + - s"(ending with $ending), close it: $worker") - } - } else { - logInfo(s"The ending mark from worker is not available, close it: $worker") + if (stream.readInt() == SpecialLengths.END_OF_STREAM) { + if (reuse_worker) { + env.releasePythonWorker(pythonExec, envVars.toMap, worker) + released = true } } null |