aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2015-03-08 14:01:26 +0000
committerSean Owen <sowen@cloudera.com>2015-03-08 14:01:26 +0000
commit52ed7da12e26c45734ce53a1be19ef148b2b953e (patch)
tree1b8141d40a1b9abe96e2288a02962de7592ccf10 /ec2
parent334c5bd1ae50ac76770e545cab302361673f45de (diff)
downloadspark-52ed7da12e26c45734ce53a1be19ef148b2b953e.tar.gz
spark-52ed7da12e26c45734ce53a1be19ef148b2b953e.tar.bz2
spark-52ed7da12e26c45734ce53a1be19ef148b2b953e.zip
[SPARK-6193] [EC2] Push group filter up to EC2
When looking for a cluster, spark-ec2 currently pulls down [info for all instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620) and filters locally. When working on an AWS account with hundreds of active instances, this step alone can take over 10 seconds. This PR improves how spark-ec2 searches for clusters by pushing the filter up to EC2. Basically, the problem (and solution) look like this: ```python >>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ import conn', number=10) 116.96390509605408 >>> timeit.timeit('blah = conn.get_all_reservations(filters={"instance.group-name": ["my-cluster-master"]})', setup='from __main__ import conn', number=10) 4.629754066467285 ``` Translated to a user-visible action, this looks like (against an AWS account with ~200 active instances): ```shell # master $ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)' ... 3 loops, best of 3: 9.83 sec per loop # this PR $ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)' ... 3 loops, best of 3: 1.47 sec per loop ``` This PR also refactors `get_existing_cluster()` to make it, I hope, simpler. Finally, this PR fixes some minor grammar issues related to printing status to the user. :tophat: :clap: Author: Nicholas Chammas <nicholas.chammas@gmail.com> Closes #4922 from nchammas/get-existing-cluster-faster and squashes the following commits: 18802f1 [Nicholas Chammas] ignore shutting-down f2a5b9f [Nicholas Chammas] fix grammar d96a489 [Nicholas Chammas] push group filter up to EC2
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py78
1 files changed, 41 insertions, 37 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index b6e7c4c2af..5e636ddd17 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -22,6 +22,7 @@
from __future__ import with_statement
import hashlib
+import itertools
import logging
import os
import os.path
@@ -299,13 +300,6 @@ def get_validate_spark_version(version, repo):
return version
-# Check whether a given EC2 instance object is in a state we consider active,
-# i.e. not terminating or terminated. We count both stopping and stopped as
-# active since we can restart stopped clusters.
-def is_active(instance):
- return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
-
-
# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
# Last Updated: 2014-06-20
# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
@@ -573,8 +567,11 @@ def launch_cluster(conn, opts, cluster_name):
placement_group=opts.placement_group,
user_data=user_data_content)
slave_nodes += slave_res.instances
- print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
- zone, slave_res.id)
+ print "Launched {s} slave{plural_s} in {z}, regid = {r}".format(
+ s=num_slaves_this_zone,
+ plural_s=('' if num_slaves_this_zone == 1 else 's'),
+ z=zone,
+ r=slave_res.id)
i += 1
# Launch or resume masters
@@ -621,40 +618,47 @@ def launch_cluster(conn, opts, cluster_name):
return (master_nodes, slave_nodes)
-# Get the EC2 instances in an existing cluster if available.
-# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
- print "Searching for existing cluster " + cluster_name + " in region " \
- + opts.region + "..."
- reservations = conn.get_all_reservations()
- master_nodes = []
- slave_nodes = []
- for res in reservations:
- active = [i for i in res.instances if is_active(i)]
- for inst in active:
- group_names = [g.name for g in inst.groups]
- if (cluster_name + "-master") in group_names:
- master_nodes.append(inst)
- elif (cluster_name + "-slaves") in group_names:
- slave_nodes.append(inst)
- if any((master_nodes, slave_nodes)):
- print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))
- if master_nodes != [] or not die_on_error:
- return (master_nodes, slave_nodes)
- else:
- if master_nodes == [] and slave_nodes != []:
- print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name \
- + "-master" + " in region " + opts.region
- else:
- print >> sys.stderr, "ERROR: Could not find any existing cluster" \
- + " in region " + opts.region
+ """
+ Get the EC2 instances in an existing cluster if available.
+ Returns a tuple of lists of EC2 instance objects for the masters and slaves.
+ """
+ print "Searching for existing cluster {c} in region {r}...".format(
+ c=cluster_name, r=opts.region)
+
+ def get_instances(group_names):
+ """
+ Get all non-terminated instances that belong to any of the provided security groups.
+
+ EC2 reservation filters and instance states are documented here:
+ http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options
+ """
+ reservations = conn.get_all_reservations(
+ filters={"instance.group-name": group_names})
+ instances = itertools.chain.from_iterable(r.instances for r in reservations)
+ return [i for i in instances if i.state not in ["shutting-down", "terminated"]]
+
+ master_instances = get_instances([cluster_name + "-master"])
+ slave_instances = get_instances([cluster_name + "-slaves"])
+
+ if any((master_instances, slave_instances)):
+ print "Found {m} master{plural_m}, {s} slave{plural_s}.".format(
+ m=len(master_instances),
+ plural_m=('' if len(master_instances) == 1 else 's'),
+ s=len(slave_instances),
+ plural_s=('' if len(slave_instances) == 1 else 's'))
+
+ if not master_instances and die_on_error:
+ print >> sys.stderr, \
+ "ERROR: Could not find a master for cluster {c} in region {r}.".format(
+ c=cluster_name, r=opts.region)
sys.exit(1)
+ return (master_instances, slave_instances)
+
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
-
-
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key: