aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichelangelo D'Agostino <mdagostino@civisanalytics.com>2015-04-08 16:48:45 -0400
committerSean Owen <sowen@cloudera.com>2015-04-08 16:48:45 -0400
commit86403f5525782bc9656ab11790f7020baa6b2c1f (patch)
tree1b0489274f5978a58e979cdffa6ea3413edeee1e
parent2f482d706b9d38820472c3152dbd1612c98729bd (diff)
downloadspark-86403f5525782bc9656ab11790f7020baa6b2c1f.tar.gz
spark-86403f5525782bc9656ab11790f7020baa6b2c1f.tar.bz2
spark-86403f5525782bc9656ab11790f7020baa6b2c1f.zip
[SPARK-5242]: Add --private-ips flag to EC2 script
The `spark_ec2.py` script currently references the `ip_address` and `public_dns_name` attributes of an instance. On private networks, these fields aren't set, so we have problems. This PR introduces a `--private-ips` flag that instead refers to the `private_ip_address` attribute in both cases. Author: Michelangelo D'Agostino <mdagostino@civisanalytics.com> Closes #5244 from mdagost/ec2_private_nets and squashes the following commits: b684c67 [Michelangelo D'Agostino] STY: A few python lint changes. a4a2eac [Michelangelo D'Agostino] ENH: Fix IP's typo and refactor conditional logic into functions. c004604 [Michelangelo D'Agostino] ENH: Add --private-ips flag.
-rwxr-xr-xec2/spark_ec2.py64
1 files changed, 47 insertions, 17 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 879a52cef8..0c1f24761d 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -282,6 +282,10 @@ def parse_args():
parser.add_option(
"--vpc-id", default=None,
help="VPC to launch instances in")
+ parser.add_option(
+ "--private-ips", action="store_true", default=False,
+ help="Use private IPs for instances rather than public if VPC/subnet " +
+ "requires that.")
(opts, args) = parser.parse_args()
if len(args) != 2:
@@ -707,7 +711,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
- master = master_nodes[0].public_dns_name
+ master = get_dns_name(master_nodes[0], opts.private_ips)
if deploy_ssh_key:
print "Generating cluster's SSH key on master..."
key_setup = """
@@ -719,8 +723,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
print "Transferring cluster's SSH key to slaves..."
for slave in slave_nodes:
- print slave.public_dns_name
- ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
+ slave_address = get_dns_name(slave, opts.private_ips)
+ print slave_address
+ ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone', 'tachyon']
@@ -809,7 +814,8 @@ def is_cluster_ssh_available(cluster_instances, opts):
Check if SSH is available on all the instances in a cluster.
"""
for i in cluster_instances:
- if not is_ssh_available(host=i.public_dns_name, opts=opts):
+ dns_name = get_dns_name(i, opts.private_ips)
+ if not is_ssh_available(host=dns_name, opts=opts):
return False
else:
return True
@@ -923,7 +929,7 @@ def get_num_disks(instance_type):
#
# root_dir should be an absolute path to the directory with the files we want to deploy.
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
- active_master = master_nodes[0].public_dns_name
+ active_master = get_dns_name(master_nodes[0], opts.private_ips)
num_disks = get_num_disks(opts.instance_type)
hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
@@ -948,10 +954,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
print "Deploying Spark via git hash; Tachyon won't be set up"
modules = filter(lambda x: x != "tachyon", modules)
+ master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
+ slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
template_vars = {
- "master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
+ "master_list": '\n'.join(master_addresses),
"active_master": active_master,
- "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
+ "slave_list": '\n'.join(slave_addresses),
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
@@ -1011,7 +1019,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
#
# root_dir should be an absolute path.
def deploy_user_files(root_dir, opts, master_nodes):
- active_master = master_nodes[0].public_dns_name
+ active_master = get_dns_name(master_nodes[0], opts.private_ips)
command = [
'rsync', '-rv',
'-e', stringify_command(ssh_command(opts)),
@@ -1122,6 +1130,20 @@ def get_partition(total, num_partitions, current_partitions):
return num_slaves_this_zone
+# Gets the IP address, taking into account the --private-ips flag
+def get_ip_address(instance, private_ips=False):
+ ip = instance.ip_address if not private_ips else \
+ instance.private_ip_address
+ return ip
+
+
+# Gets the DNS name, taking into account the --private-ips flag
+def get_dns_name(instance, private_ips=False):
+ dns = instance.public_dns_name if not private_ips else \
+ instance.private_ip_address
+ return dns
+
+
def real_main():
(opts, action, cluster_name) = parse_args()
@@ -1230,7 +1252,7 @@ def real_main():
if any(master_nodes + slave_nodes):
print "The following instances will be terminated:"
for inst in master_nodes + slave_nodes:
- print "> %s" % inst.public_dns_name
+ print "> %s" % get_dns_name(inst, opts.private_ips)
print "ALL DATA ON ALL NODES WILL BE LOST!!"
msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
@@ -1294,13 +1316,17 @@ def real_main():
elif action == "login":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- master = master_nodes[0].public_dns_name
- print "Logging into master " + master + "..."
- proxy_opt = []
- if opts.proxy_port is not None:
- proxy_opt = ['-D', opts.proxy_port]
- subprocess.check_call(
- ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])
+ if not master_nodes[0].public_dns_name and not opts.private_ips:
+ print "Master has no public DNS name. Maybe you meant to specify " \
+ "--private-ips?"
+ else:
+ master = get_dns_name(master_nodes[0], opts.private_ips)
+ print "Logging into master " + master + "..."
+ proxy_opt = []
+ if opts.proxy_port is not None:
+ proxy_opt = ['-D', opts.proxy_port]
+ subprocess.check_call(
+ ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])
elif action == "reboot-slaves":
response = raw_input(
@@ -1318,7 +1344,11 @@ def real_main():
elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- print master_nodes[0].public_dns_name
+ if not master_nodes[0].public_dns_name and not opts.private_ips:
+ print "Master has no public DNS name. Maybe you meant to specify " \
+ "--private-ips?"
+ else:
+ print get_dns_name(master_nodes[0], opts.private_ips)
elif action == "stop":
response = raw_input(