From d12c0711faa3d4333513fcbbbee4868bcb784a26 Mon Sep 17 00:00:00 2001 From: Mike Jennings Date: Tue, 16 Dec 2014 12:13:21 -0800 Subject: [SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py Based on this gist: https://gist.github.com/amar-analytx/0b62543621e1f246c0a2 We use security group ids instead of security group to get around this issue: https://github.com/boto/boto/issues/350 Author: Mike Jennings Author: Mike Jennings Closes #2872 from mvj101/SPARK-3405 and squashes the following commits: be9cb43 [Mike Jennings] `pep8 spark_ec2.py` runs cleanly. 4dc6756 [Mike Jennings] Remove duplicate comment 731d94c [Mike Jennings] Update for code review. ad90a36 [Mike Jennings] Merge branch 'master' of https://github.com/apache/spark into SPARK-3405 1ebffa1 [Mike Jennings] Merge branch 'master' into SPARK-3405 52aaeec [Mike Jennings] [SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py --- ec2/spark_ec2.py | 66 +++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 15 deletions(-) (limited to 'ec2') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 5f9e484212..92adfd2d07 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -162,6 +162,10 @@ def parse_args(): parser.add_option( "--copy-aws-credentials", action="store_true", default=False, help="Add AWS credentials to hadoop configuration to allow Spark to access S3") + parser.add_option( + "--subnet-id", default=None, help="VPC subnet to launch instances in") + parser.add_option( + "--vpc-id", default=None, help="VPC to launch instances in") (opts, args) = parser.parse_args() if len(args) != 2: @@ -186,14 +190,14 @@ def parse_args(): # Get the EC2 security group of the given name, creating it if it doesn't exist -def get_or_make_group(conn, name): +def get_or_make_group(conn, name, vpc_id): groups = conn.get_all_security_groups() group = [g for g in groups if g.name == name] if len(group) > 0: return group[0] else: print "Creating security group " + name - return conn.create_security_group(name, "Spark EC2 group") + return conn.create_security_group(name, "Spark EC2 group", vpc_id) # Check whether a given EC2 instance object is in a state we consider active, @@ -303,12 +307,26 @@ def launch_cluster(conn, opts, cluster_name): user_data_content = user_data_file.read() print "Setting up security groups..." - master_group = get_or_make_group(conn, cluster_name + "-master") - slave_group = get_or_make_group(conn, cluster_name + "-slaves") + master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) + slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) authorized_address = opts.authorized_address if master_group.rules == []: # Group was just now created - master_group.authorize(src_group=master_group) - master_group.authorize(src_group=slave_group) + if opts.vpc_id is None: + master_group.authorize(src_group=master_group) + master_group.authorize(src_group=slave_group) + else: + master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=master_group) + master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=master_group) + master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=master_group) + master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=slave_group) + master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=slave_group) + master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=slave_group) master_group.authorize('tcp', 22, 22, authorized_address) master_group.authorize('tcp', 8080, 8081, authorized_address) master_group.authorize('tcp', 18080, 18080, authorized_address) @@ -320,8 +338,22 @@ def launch_cluster(conn, opts, cluster_name): if opts.ganglia: master_group.authorize('tcp', 5080, 5080, authorized_address) if slave_group.rules == []: # Group was just now created - slave_group.authorize(src_group=master_group) - slave_group.authorize(src_group=slave_group) + if opts.vpc_id is None: + slave_group.authorize(src_group=master_group) + slave_group.authorize(src_group=slave_group) + else: + slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=master_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=master_group) + slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=master_group) + slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=slave_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=slave_group) + slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=slave_group) slave_group.authorize('tcp', 22, 22, authorized_address) slave_group.authorize('tcp', 8080, 8081, authorized_address) slave_group.authorize('tcp', 50060, 50060, authorized_address) @@ -341,11 +373,12 @@ def launch_cluster(conn, opts, cluster_name): if opts.ami is None: opts.ami = get_spark_ami(opts) - additional_groups = [] + # we use group ids to work around https://github.com/boto/boto/issues/350 + additional_group_ids = [] if opts.additional_security_group: - additional_groups = [sg - for sg in conn.get_all_security_groups() - if opts.additional_security_group in (sg.name, sg.id)] + additional_group_ids = [sg.id + for sg in conn.get_all_security_groups() + if opts.additional_security_group in (sg.name, sg.id)] print "Launching instances..." try: @@ -392,9 +425,10 @@ def launch_cluster(conn, opts, cluster_name): placement=zone, count=num_slaves_this_zone, key_name=opts.key_pair, - security_groups=[slave_group] + additional_groups, + security_group_ids=[slave_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, + subnet_id=opts.subnet_id, user_data=user_data_content) my_req_ids += [req.id for req in slave_reqs] i += 1 @@ -441,12 +475,13 @@ def launch_cluster(conn, opts, cluster_name): num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) if num_slaves_this_zone > 0: slave_res = image.run(key_name=opts.key_pair, - security_groups=[slave_group] + additional_groups, + security_group_ids=[slave_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, min_count=num_slaves_this_zone, max_count=num_slaves_this_zone, block_device_map=block_map, + subnet_id=opts.subnet_id, user_data=user_data_content) slave_nodes += slave_res.instances print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, @@ -467,12 +502,13 @@ def launch_cluster(conn, opts, cluster_name): if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run(key_name=opts.key_pair, - security_groups=[master_group] + additional_groups, + security_group_ids=[master_group.id] + additional_group_ids, instance_type=master_type, placement=opts.zone, min_count=1, max_count=1, block_device_map=block_map, + subnet_id=opts.subnet_id, user_data=user_data_content) master_nodes = master_res.instances print "Launched master in %s, regid = %s" % (zone, master_res.id) -- cgit v1.2.3