aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-30 23:32:38 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-30 23:32:38 -0700
commit96c9bcfd8d3f45fe43b3857a80fa1a42f983970b (patch)
tree178f4e4078ea91461b5e72eeeab10f57090c570e /ec2
parent51477e88743ac03578ebadba578dff85f45ceca6 (diff)
downloadspark-96c9bcfd8d3f45fe43b3857a80fa1a42f983970b.tar.gz
spark-96c9bcfd8d3f45fe43b3857a80fa1a42f983970b.tar.bz2
spark-96c9bcfd8d3f45fe43b3857a80fa1a42f983970b.zip
Cancel spot instance requests when exiting spark-ec2.
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py52
1 files changed, 31 insertions, 21 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6a3647b218..c0926e214f 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -233,27 +233,37 @@ def launch_cluster(conn, opts, cluster_name):
block_device_map = block_map)
my_req_ids = [req.id for req in slave_reqs]
print "Waiting for spot instances to be granted..."
- while True:
- time.sleep(10)
- reqs = conn.get_all_spot_instance_requests()
- id_to_req = {}
- for r in reqs:
- id_to_req[r.id] = r
- active = 0
- instance_ids = []
- for i in my_req_ids:
- if id_to_req[i].state == "active":
- active += 1
- instance_ids.append(id_to_req[i].instance_id)
- if active == opts.slaves:
- print "All %d slaves granted" % opts.slaves
- reservations = conn.get_all_instances(instance_ids)
- slave_nodes = []
- for r in reservations:
- slave_nodes += r.instances
- break
- else:
- print "%d of %d slaves granted, waiting longer" % (active, opts.slaves)
+ try:
+ while True:
+ time.sleep(10)
+ reqs = conn.get_all_spot_instance_requests()
+ id_to_req = {}
+ for r in reqs:
+ id_to_req[r.id] = r
+ active_instance_ids = []
+ for i in my_req_ids:
+ if i in id_to_req and id_to_req[i].state == "active":
+ active_instance_ids.append(id_to_req[i].instance_id)
+ if len(active_instance_ids) == opts.slaves:
+ print "All %d slaves granted" % opts.slaves
+ reservations = conn.get_all_instances(active_instance_ids)
+ slave_nodes = []
+ for r in reservations:
+ slave_nodes += r.instances
+ break
+ else:
+ print "%d of %d slaves granted, waiting longer" % (
+ len(active_instance_ids), opts.slaves)
+ except:
+ print "Canceling spot instance requests"
+ conn.cancel_spot_instance_requests(my_req_ids)
+ # Log a warning if any of these requests actually launched instances:
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name, die_on_error=False)
+ running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
+ if running:
+ print >> stderr, ("WARNING: %d instances are still running" % running)
+ sys.exit(0)
else:
# Launch non-spot instances
slave_res = image.run(key_name = opts.key_pair,