aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-02-17 07:48:27 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-17 07:51:05 -0800
commitaeb85cdeea39ace5a41e5196663d5aa3ebf1517f (patch)
tree5f6e14b4d8385d38dacc0658b81549874a89edcb
parentb8da5c390b7272ded5476d4531dcd757c5431fab (diff)
downloadspark-aeb85cdeea39ace5a41e5196663d5aa3ebf1517f.tar.gz
spark-aeb85cdeea39ace5a41e5196663d5aa3ebf1517f.tar.bz2
spark-aeb85cdeea39ace5a41e5196663d5aa3ebf1517f.zip
Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"
This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f.
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala21
-rw-r--r--python/pyspark/worker.py1
2 files changed, 4 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e94c390df8..2527211929 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -144,24 +144,11 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
-
// Check whether the worker is ready to be re-used.
- if (reuse_worker) {
- // It has a high possibility that the ending mark is already available,
- // And current task should not be blocked by checking it
-
- if (stream.available() >= 4) {
- val ending = stream.readInt()
- if (ending == SpecialLengths.END_OF_STREAM) {
- env.releasePythonWorker(pythonExec, envVars.toMap, worker)
- released = true
- logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
- } else {
- logInfo(s"Communication with worker did not end cleanly " +
- s"(ending with $ending), close it: $worker")
- }
- } else {
- logInfo(s"The ending mark from worker is not available, close it: $worker")
+ if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
+ if (reuse_worker) {
+ env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+ released = true
}
}
null
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 180bdbb4c2..8a93c320ec 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -121,7 +121,6 @@ def main(infile, outfile):
write_int(len(_accumulatorRegistry), outfile)
for (aid, accum) in _accumulatorRegistry.items():
pickleSer._write_with_length((aid, accum._value), outfile)
- outfile.flush()
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM: