aboutsummaryrefslogtreecommitdiff
path: root/ec2/spark_ec2.py
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-21 15:34:31 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-21 15:34:31 -0700
commit6be6b71c8cf9f8033e0a5cc8e00fbb2ed4ab778c (patch)
tree549bcd4d4ed258fb8df4e4b6737620be61d6e734 /ec2/spark_ec2.py
parent51a1a0c602481620dd914f3024dee52c3312058d (diff)
parent2905611c139e6e014ee4679a2d59f763c515d6f0 (diff)
downloadspark-6be6b71c8cf9f8033e0a5cc8e00fbb2ed4ab778c.tar.gz
spark-6be6b71c8cf9f8033e0a5cc8e00fbb2ed4ab778c.tar.bz2
spark-6be6b71c8cf9f8033e0a5cc8e00fbb2ed4ab778c.zip
Merge branch 'master' into ec2-updates
Conflicts: ec2/spark_ec2.py
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-xec2/spark_ec2.py36
1 files changed, 17 insertions, 19 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6c48d9765f..30253a94b8 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -9,9 +9,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,7 +52,7 @@ def parse_args():
help="Seconds to wait for nodes to start (default: 120)")
parser.add_option("-k", "--key-pair",
help="Key pair to use on instances")
- parser.add_option("-i", "--identity-file",
+ parser.add_option("-i", "--identity-file",
help="SSH private key file to use for logging into instances")
parser.add_option("-t", "--instance-type", default="m1.large",
help="Type of instance to launch (default: m1.large). " +
@@ -65,9 +65,7 @@ def parse_args():
help="Availability zone to launch instances in, or 'all' to spread " +
"slaves across multiple (an additional $0.01/Gb for bandwidth" +
"between zones applies)")
- parser.add_option("-a", "--ami",
- help="Amazon Machine Image ID to use")
-
+ parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use")
parser.add_option("-v", "--spark-version", default="0.7.3",
help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
parser.add_option("--spark-git-repo",
@@ -75,7 +73,6 @@ def parse_args():
help="Github repo from which to checkout supplied commit hash")
parser.add_option("--hadoop-major-version", default="1",
help="Major version of Hadoop (default: 1)")
-
parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)")
@@ -100,7 +97,7 @@ def parse_args():
help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
help="When destroying a cluster, delete the security groups that were created")
-
+
(opts, args) = parser.parse_args()
if len(args) != 2:
parser.print_help()
@@ -222,6 +219,7 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
+ master_group.authorize('tcp', 3030, 3035, '0.0.0.0/0')
if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created
@@ -284,7 +282,7 @@ def launch_cluster(conn, opts, cluster_name):
block_device_map = block_map)
my_req_ids += [req.id for req in slave_reqs]
i += 1
-
+
print "Waiting for spot instances to be granted..."
try:
while True:
@@ -422,7 +420,7 @@ def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/root/spark/bin/start-all.sh")
-
+
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
@@ -538,7 +536,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
dest.write(text)
dest.close()
# rsync the whole directory over to the master machine
- command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
+ command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
"'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
subprocess.check_call(command, shell=True)
# Remove the temp directory we created above
@@ -567,9 +565,9 @@ def ssh(host, opts, command):
print "Error connecting to host {0}, sleeping 30".format(e)
time.sleep(30)
tries = tries + 1
-
-
-
+
+
+
# Gets a list of zones to launch instances in
@@ -645,7 +643,7 @@ def main():
from_port=rule.from_port,
to_port=rule.to_port,
src_group=grant)
-
+
# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time.sleep(30) # Yes, it does have to be this long :-(
@@ -656,13 +654,13 @@ def main():
except boto.exception.EC2ResponseError:
success = False;
print "Failed to delete security group " + group.name
-
+
# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success: break;
-
+
attempt += 1
-
+
if not success:
print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes."
@@ -685,7 +683,7 @@ def main():
elif action == "stop":
response = raw_input("Are you sure you want to stop the cluster " +
cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
- "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
+ "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":