From 96c9bcfd8d3f45fe43b3857a80fa1a42f983970b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 30 Oct 2012 23:32:38 -0700 Subject: Cancel spot instance requests when exiting spark-ec2. --- ec2/spark_ec2.py | 52 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 21 deletions(-) (limited to 'ec2') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 6a3647b218..c0926e214f 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -233,27 +233,37 @@ def launch_cluster(conn, opts, cluster_name): block_device_map = block_map) my_req_ids = [req.id for req in slave_reqs] print "Waiting for spot instances to be granted..." - while True: - time.sleep(10) - reqs = conn.get_all_spot_instance_requests() - id_to_req = {} - for r in reqs: - id_to_req[r.id] = r - active = 0 - instance_ids = [] - for i in my_req_ids: - if id_to_req[i].state == "active": - active += 1 - instance_ids.append(id_to_req[i].instance_id) - if active == opts.slaves: - print "All %d slaves granted" % opts.slaves - reservations = conn.get_all_instances(instance_ids) - slave_nodes = [] - for r in reservations: - slave_nodes += r.instances - break - else: - print "%d of %d slaves granted, waiting longer" % (active, opts.slaves) + try: + while True: + time.sleep(10) + reqs = conn.get_all_spot_instance_requests() + id_to_req = {} + for r in reqs: + id_to_req[r.id] = r + active_instance_ids = [] + for i in my_req_ids: + if i in id_to_req and id_to_req[i].state == "active": + active_instance_ids.append(id_to_req[i].instance_id) + if len(active_instance_ids) == opts.slaves: + print "All %d slaves granted" % opts.slaves + reservations = conn.get_all_instances(active_instance_ids) + slave_nodes = [] + for r in reservations: + slave_nodes += r.instances + break + else: + print "%d of %d slaves granted, waiting longer" % ( + len(active_instance_ids), opts.slaves) + except: + print "Canceling spot instance requests" + conn.cancel_spot_instance_requests(my_req_ids) + # Log a warning if any of these requests actually launched instances: + (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes) + if running: + print >> stderr, ("WARNING: %d instances are still running" % running) + sys.exit(0) else: # Launch non-spot instances slave_res = image.run(key_name = opts.key_pair, -- cgit v1.2.3