aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorMike Jennings <mvj101@gmail.com>2014-12-16 12:13:21 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-16 12:13:21 -0800
commitd12c0711faa3d4333513fcbbbee4868bcb784a26 (patch)
treee4fd21418ffa640b9bd0a668d5c098f58c15c512 /ec2
parentcb484474934d664000df3d63a326bcd6b12f2f09 (diff)
downloadspark-d12c0711faa3d4333513fcbbbee4868bcb784a26.tar.gz
spark-d12c0711faa3d4333513fcbbbee4868bcb784a26.tar.bz2
spark-d12c0711faa3d4333513fcbbbee4868bcb784a26.zip
[SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py
Based on this gist: https://gist.github.com/amar-analytx/0b62543621e1f246c0a2 We use security group ids instead of security group to get around this issue: https://github.com/boto/boto/issues/350 Author: Mike Jennings <mvj101@gmail.com> Author: Mike Jennings <mvj@google.com> Closes #2872 from mvj101/SPARK-3405 and squashes the following commits: be9cb43 [Mike Jennings] `pep8 spark_ec2.py` runs cleanly. 4dc6756 [Mike Jennings] Remove duplicate comment 731d94c [Mike Jennings] Update for code review. ad90a36 [Mike Jennings] Merge branch 'master' of https://github.com/apache/spark into SPARK-3405 1ebffa1 [Mike Jennings] Merge branch 'master' into SPARK-3405 52aaeec [Mike Jennings] [SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py66
1 files changed, 51 insertions, 15 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 5f9e484212..92adfd2d07 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -162,6 +162,10 @@ def parse_args():
parser.add_option(
"--copy-aws-credentials", action="store_true", default=False,
help="Add AWS credentials to hadoop configuration to allow Spark to access S3")
+ parser.add_option(
+ "--subnet-id", default=None, help="VPC subnet to launch instances in")
+ parser.add_option(
+ "--vpc-id", default=None, help="VPC to launch instances in")
(opts, args) = parser.parse_args()
if len(args) != 2:
@@ -186,14 +190,14 @@ def parse_args():
# Get the EC2 security group of the given name, creating it if it doesn't exist
-def get_or_make_group(conn, name):
+def get_or_make_group(conn, name, vpc_id):
groups = conn.get_all_security_groups()
group = [g for g in groups if g.name == name]
if len(group) > 0:
return group[0]
else:
print "Creating security group " + name
- return conn.create_security_group(name, "Spark EC2 group")
+ return conn.create_security_group(name, "Spark EC2 group", vpc_id)
# Check whether a given EC2 instance object is in a state we consider active,
@@ -303,12 +307,26 @@ def launch_cluster(conn, opts, cluster_name):
user_data_content = user_data_file.read()
print "Setting up security groups..."
- master_group = get_or_make_group(conn, cluster_name + "-master")
- slave_group = get_or_make_group(conn, cluster_name + "-slaves")
+ master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
+ slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
authorized_address = opts.authorized_address
if master_group.rules == []: # Group was just now created
- master_group.authorize(src_group=master_group)
- master_group.authorize(src_group=slave_group)
+ if opts.vpc_id is None:
+ master_group.authorize(src_group=master_group)
+ master_group.authorize(src_group=slave_group)
+ else:
+ master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
+ src_group=master_group)
+ master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
+ src_group=master_group)
+ master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
+ src_group=master_group)
+ master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
+ src_group=slave_group)
+ master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
+ src_group=slave_group)
+ master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
+ src_group=slave_group)
master_group.authorize('tcp', 22, 22, authorized_address)
master_group.authorize('tcp', 8080, 8081, authorized_address)
master_group.authorize('tcp', 18080, 18080, authorized_address)
@@ -320,8 +338,22 @@ def launch_cluster(conn, opts, cluster_name):
if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, authorized_address)
if slave_group.rules == []: # Group was just now created
- slave_group.authorize(src_group=master_group)
- slave_group.authorize(src_group=slave_group)
+ if opts.vpc_id is None:
+ slave_group.authorize(src_group=master_group)
+ slave_group.authorize(src_group=slave_group)
+ else:
+ slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
+ src_group=master_group)
+ slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
+ src_group=master_group)
+ slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
+ src_group=master_group)
+ slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
+ src_group=slave_group)
+ slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
+ src_group=slave_group)
+ slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
+ src_group=slave_group)
slave_group.authorize('tcp', 22, 22, authorized_address)
slave_group.authorize('tcp', 8080, 8081, authorized_address)
slave_group.authorize('tcp', 50060, 50060, authorized_address)
@@ -341,11 +373,12 @@ def launch_cluster(conn, opts, cluster_name):
if opts.ami is None:
opts.ami = get_spark_ami(opts)
- additional_groups = []
+ # we use group ids to work around https://github.com/boto/boto/issues/350
+ additional_group_ids = []
if opts.additional_security_group:
- additional_groups = [sg
- for sg in conn.get_all_security_groups()
- if opts.additional_security_group in (sg.name, sg.id)]
+ additional_group_ids = [sg.id
+ for sg in conn.get_all_security_groups()
+ if opts.additional_security_group in (sg.name, sg.id)]
print "Launching instances..."
try:
@@ -392,9 +425,10 @@ def launch_cluster(conn, opts, cluster_name):
placement=zone,
count=num_slaves_this_zone,
key_name=opts.key_pair,
- security_groups=[slave_group] + additional_groups,
+ security_group_ids=[slave_group.id] + additional_group_ids,
instance_type=opts.instance_type,
block_device_map=block_map,
+ subnet_id=opts.subnet_id,
user_data=user_data_content)
my_req_ids += [req.id for req in slave_reqs]
i += 1
@@ -441,12 +475,13 @@ def launch_cluster(conn, opts, cluster_name):
num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
if num_slaves_this_zone > 0:
slave_res = image.run(key_name=opts.key_pair,
- security_groups=[slave_group] + additional_groups,
+ security_group_ids=[slave_group.id] + additional_group_ids,
instance_type=opts.instance_type,
placement=zone,
min_count=num_slaves_this_zone,
max_count=num_slaves_this_zone,
block_device_map=block_map,
+ subnet_id=opts.subnet_id,
user_data=user_data_content)
slave_nodes += slave_res.instances
print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
@@ -467,12 +502,13 @@ def launch_cluster(conn, opts, cluster_name):
if opts.zone == 'all':
opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name=opts.key_pair,
- security_groups=[master_group] + additional_groups,
+ security_group_ids=[master_group.id] + additional_group_ids,
instance_type=master_type,
placement=opts.zone,
min_count=1,
max_count=1,
block_device_map=block_map,
+ subnet_id=opts.subnet_id,
user_data=user_data_content)
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)