aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/epydoc.conf17
-rwxr-xr-xpython/examples/als.py17
-rw-r--r--python/examples/kmeans.py17
-rwxr-xr-xpython/examples/logistic_regression.py17
-rw-r--r--python/examples/pi.py17
-rw-r--r--python/examples/transitive_closure.py17
-rw-r--r--python/examples/wordcount.py17
-rw-r--r--python/pyspark/accumulators.py17
-rw-r--r--python/pyspark/broadcast.py17
-rw-r--r--python/pyspark/context.py17
-rw-r--r--python/pyspark/daemon.py181
-rw-r--r--python/pyspark/files.py17
-rw-r--r--python/pyspark/java_gateway.py17
-rw-r--r--python/pyspark/rdd.py17
-rw-r--r--python/pyspark/serializers.py21
-rw-r--r--python/pyspark/shell.py17
-rw-r--r--python/pyspark/tests.py60
-rw-r--r--python/pyspark/worker.py72
-rwxr-xr-xpython/run-tests20
19 files changed, 570 insertions, 22 deletions
diff --git a/python/epydoc.conf b/python/epydoc.conf
index 45102cd9fe..d5d5aa5454 100644
--- a/python/epydoc.conf
+++ b/python/epydoc.conf
@@ -1,5 +1,22 @@
[epydoc] # Epydoc section marker (required by ConfigParser)
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
# Information about the project.
name: PySpark
url: http://spark-project.org
diff --git a/python/examples/als.py b/python/examples/als.py
index 010f80097f..f2b2eee64c 100755
--- a/python/examples/als.py
+++ b/python/examples/als.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
This example requires numpy (http://www.numpy.org/)
"""
diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py
index 72cf9f88c6..c670556f2b 100644
--- a/python/examples/kmeans.py
+++ b/python/examples/kmeans.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
This example requires numpy (http://www.numpy.org/)
"""
diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py
index f13698a86f..54d227d0d3 100755
--- a/python/examples/logistic_regression.py
+++ b/python/examples/logistic_regression.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
This example requires numpy (http://www.numpy.org/)
"""
diff --git a/python/examples/pi.py b/python/examples/pi.py
index 127cba029b..33c026e824 100644
--- a/python/examples/pi.py
+++ b/python/examples/pi.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
import sys
from random import random
from operator import add
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py
index 7f85a1008e..40be3b5000 100644
--- a/python/examples/transitive_closure.py
+++ b/python/examples/transitive_closure.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
import sys
from random import Random
diff --git a/python/examples/wordcount.py b/python/examples/wordcount.py
index 857160624b..41c846ba79 100644
--- a/python/examples/wordcount.py
+++ b/python/examples/wordcount.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
import sys
from operator import add
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 3e9d7d36da..d367f91967 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index def810dd46..dfdaba274f 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 657fe6f989..2f741cb345 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
import os
import shutil
import sys
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
new file mode 100644
index 0000000000..eb18ec08c9
--- /dev/null
+++ b/python/pyspark/daemon.py
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import signal
+import socket
+import sys
+import traceback
+import multiprocessing
+from ctypes import c_bool
+from errno import EINTR, ECHILD
+from socket import AF_INET, SOCK_STREAM, SOMAXCONN
+from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
+from pyspark.worker import main as worker_main
+from pyspark.serializers import write_int
+
+try:
+ POOLSIZE = multiprocessing.cpu_count()
+except NotImplementedError:
+ POOLSIZE = 4
+
+exit_flag = multiprocessing.Value(c_bool, False)
+
+
+def should_exit():
+ global exit_flag
+ return exit_flag.value
+
+
+def compute_real_exit_code(exit_code):
+ # SystemExit's code can be integer or string, but os._exit only accepts integers
+ import numbers
+ if isinstance(exit_code, numbers.Integral):
+ return exit_code
+ else:
+ return 1
+
+
+def worker(listen_sock):
+ # Redirect stdout to stderr
+ os.dup2(2, 1)
+ sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
+
+ # Manager sends SIGHUP to request termination of workers in the pool
+ def handle_sighup(*args):
+ assert should_exit()
+ signal.signal(SIGHUP, handle_sighup)
+
+ # Cleanup zombie children
+ def handle_sigchld(*args):
+ pid = status = None
+ try:
+ while (pid, status) != (0, 0):
+ pid, status = os.waitpid(0, os.WNOHANG)
+ except EnvironmentError as err:
+ if err.errno == EINTR:
+ # retry
+ handle_sigchld()
+ elif err.errno != ECHILD:
+ raise
+ signal.signal(SIGCHLD, handle_sigchld)
+
+ # Handle clients
+ while not should_exit():
+ # Wait until a client arrives or we have to exit
+ sock = None
+ while not should_exit() and sock is None:
+ try:
+ sock, addr = listen_sock.accept()
+ except EnvironmentError as err:
+ if err.errno != EINTR:
+ raise
+
+ if sock is not None:
+ # Fork a child to handle the client.
+ # The client is handled in the child so that the manager
+ # never receives SIGCHLD unless a worker crashes.
+ if os.fork() == 0:
+ # Leave the worker pool
+ signal.signal(SIGHUP, SIG_DFL)
+ listen_sock.close()
+ # Read the socket using fdopen instead of socket.makefile() because the latter
+ # seems to be very slow; note that we need to dup() the file descriptor because
+ # otherwise writes also cause a seek that makes us miss data on the read side.
+ infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+ outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+ exit_code = 0
+ try:
+ worker_main(infile, outfile)
+ except SystemExit as exc:
+ exit_code = exc.code
+ finally:
+ outfile.flush()
+ sock.close()
+ os._exit(compute_real_exit_code(exit_code))
+ else:
+ sock.close()
+
+
+def launch_worker(listen_sock):
+ if os.fork() == 0:
+ try:
+ worker(listen_sock)
+ except Exception as err:
+ traceback.print_exc()
+ os._exit(1)
+ else:
+ assert should_exit()
+ os._exit(0)
+
+
+def manager():
+ # Create a new process group to corral our children
+ os.setpgid(0, 0)
+
+ # Create a listening socket on the AF_INET loopback interface
+ listen_sock = socket.socket(AF_INET, SOCK_STREAM)
+ listen_sock.bind(('127.0.0.1', 0))
+ listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN))
+ listen_host, listen_port = listen_sock.getsockname()
+ write_int(listen_port, sys.stdout)
+
+ # Launch initial worker pool
+ for idx in range(POOLSIZE):
+ launch_worker(listen_sock)
+ listen_sock.close()
+
+ def shutdown():
+ global exit_flag
+ exit_flag.value = True
+
+ # Gracefully exit on SIGTERM, don't die on SIGHUP
+ signal.signal(SIGTERM, lambda signum, frame: shutdown())
+ signal.signal(SIGHUP, SIG_IGN)
+
+ # Cleanup zombie children
+ def handle_sigchld(*args):
+ try:
+ pid, status = os.waitpid(0, os.WNOHANG)
+ if status != 0 and not should_exit():
+ raise RuntimeError("worker crashed: %s, %s" % (pid, status))
+ except EnvironmentError as err:
+ if err.errno not in (ECHILD, EINTR):
+ raise
+ signal.signal(SIGCHLD, handle_sigchld)
+
+ # Initialization complete
+ sys.stdout.close()
+ try:
+ while not should_exit():
+ try:
+ # Spark tells us to exit by closing stdin
+ if os.read(0, 512) == '':
+ shutdown()
+ except EnvironmentError as err:
+ if err.errno != EINTR:
+ shutdown()
+ raise
+ finally:
+ signal.signal(SIGTERM, SIG_DFL)
+ exit_flag.value = True
+ # Send SIGHUP to notify workers of shutdown
+ os.kill(0, SIGHUP)
+
+
+if __name__ == '__main__':
+ manager()
diff --git a/python/pyspark/files.py b/python/pyspark/files.py
index 001b7a28b6..89bcbcfe06 100644
--- a/python/pyspark/files.py
+++ b/python/pyspark/files.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
import os
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 2329e536cc..e503fb7621 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
import os
import sys
from subprocess import Popen, PIPE
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a9fec17a9d..c6a6b24c5a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 115cf28cc2..fecacd1241 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
import struct
import cPickle
@@ -46,6 +63,10 @@ def read_long(stream):
return struct.unpack("!q", length)[0]
+def write_long(value, stream):
+ stream.write(struct.pack("!q", value))
+
+
def read_int(stream):
length = stream.read(4)
if length == "":
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 54ff1bf8e7..cc8cd9e3c4 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
An interactive shell.
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 6a1962d267..dfd841b10a 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
@@ -12,6 +29,7 @@ import unittest
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.java_gateway import SPARK_HOME
+from pyspark.serializers import read_int
class PySparkTestCase(unittest.TestCase):
@@ -117,5 +135,47 @@ class TestIO(PySparkTestCase):
self.sc.parallelize([1]).foreach(func)
+class TestDaemon(unittest.TestCase):
+ def connect(self, port):
+ from socket import socket, AF_INET, SOCK_STREAM
+ sock = socket(AF_INET, SOCK_STREAM)
+ sock.connect(('127.0.0.1', port))
+ # send a split index of -1 to shutdown the worker
+ sock.send("\xFF\xFF\xFF\xFF")
+ sock.close()
+ return True
+
+ def do_termination_test(self, terminator):
+ from subprocess import Popen, PIPE
+ from errno import ECONNREFUSED
+
+ # start daemon
+ daemon_path = os.path.join(os.path.dirname(__file__), "daemon.py")
+ daemon = Popen([sys.executable, daemon_path], stdin=PIPE, stdout=PIPE)
+
+ # read the port number
+ port = read_int(daemon.stdout)
+
+ # daemon should accept connections
+ self.assertTrue(self.connect(port))
+
+ # request shutdown
+ terminator(daemon)
+ time.sleep(1)
+
+ # daemon should no longer accept connections
+ with self.assertRaises(EnvironmentError) as trap:
+ self.connect(port)
+ self.assertEqual(trap.exception.errno, ECONNREFUSED)
+
+ def test_termination_stdin(self):
+ """Ensure that daemon and workers terminate when stdin is closed."""
+ self.do_termination_test(lambda daemon: daemon.stdin.close())
+
+ def test_termination_sigterm(self):
+ """Ensure that daemon and workers terminate on SIGTERM."""
+ from signal import SIGTERM
+ self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
+
if __name__ == "__main__":
unittest.main()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 812e7a9da5..75d692beeb 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1,8 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
Worker that receives input from Piped RDD.
"""
import os
import sys
+import time
import traceback
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
@@ -12,48 +30,60 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, read_with_length, write_int, \
- read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
+ read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
-# Redirect stdout to stderr so that users must return values from functions.
-old_stdout = os.fdopen(os.dup(1), 'w')
-os.dup2(2, 1)
+def load_obj(infile):
+ return load_pickle(standard_b64decode(infile.readline().strip()))
-def load_obj():
- return load_pickle(standard_b64decode(sys.stdin.readline().strip()))
+def report_times(outfile, boot, init, finish):
+ write_int(-3, outfile)
+ write_long(1000 * boot, outfile)
+ write_long(1000 * init, outfile)
+ write_long(1000 * finish, outfile)
-def main():
- split_index = read_int(sys.stdin)
- spark_files_dir = load_pickle(read_with_length(sys.stdin))
+def main(infile, outfile):
+ boot_time = time.time()
+ split_index = read_int(infile)
+ if split_index == -1: # for unit tests
+ return
+ spark_files_dir = load_pickle(read_with_length(infile))
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
sys.path.append(spark_files_dir)
- num_broadcast_variables = read_int(sys.stdin)
+ num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
- bid = read_long(sys.stdin)
- value = read_with_length(sys.stdin)
+ bid = read_long(infile)
+ value = read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, load_pickle(value))
- func = load_obj()
- bypassSerializer = load_obj()
+ func = load_obj(infile)
+ bypassSerializer = load_obj(infile)
if bypassSerializer:
dumps = lambda x: x
else:
dumps = dump_pickle
- iterator = read_from_pickle_file(sys.stdin)
+ init_time = time.time()
+ iterator = read_from_pickle_file(infile)
try:
for obj in func(split_index, iterator):
- write_with_length(dumps(obj), old_stdout)
+ write_with_length(dumps(obj), outfile)
except Exception as e:
- write_int(-2, old_stdout)
- write_with_length(traceback.format_exc(), old_stdout)
+ write_int(-2, outfile)
+ write_with_length(traceback.format_exc(), outfile)
sys.exit(-1)
+ finish_time = time.time()
+ report_times(outfile, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output
- write_int(-1, old_stdout)
+ write_int(-1, outfile)
for aid, accum in _accumulatorRegistry.items():
- write_with_length(dump_pickle((aid, accum._value)), old_stdout)
+ write_with_length(dump_pickle((aid, accum._value)), outfile)
+ write_int(-1, outfile)
if __name__ == '__main__':
- main()
+ # Redirect stdout to stderr so that users must return values from functions.
+ old_stdout = os.fdopen(os.dup(1), 'w')
+ os.dup2(2, 1)
+ main(sys.stdin, old_stdout)
diff --git a/python/run-tests b/python/run-tests
index a3a9ff5dcb..1ee947d414 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -1,6 +1,24 @@
#!/usr/bin/env bash
-# Figure out where the Scala framework is installed
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+# Figure out where the Spark framework is installed
FWDIR="$(cd `dirname $0`; cd ../; pwd)"
FAILED=0