diff options
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-x | ec2/spark_ec2.py | 48 |
1 files changed, 37 insertions, 11 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 4e8f5c1f44..556d99d102 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -21,6 +21,7 @@ from __future__ import with_statement +import hashlib import logging import os import pipes @@ -29,6 +30,7 @@ import shutil import string import subprocess import sys +import tarfile import tempfile import time import urllib2 @@ -36,9 +38,6 @@ import warnings from datetime import datetime from optparse import OptionParser from sys import stderr -import boto -from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType -from boto import ec2 DEFAULT_SPARK_VERSION = "1.1.0" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -48,6 +47,39 @@ MESOS_SPARK_EC2_BRANCH = "v4" AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/{b}/ami-list".format(b=MESOS_SPARK_EC2_BRANCH) +def setup_boto(): + # Download Boto if it's not already present in the SPARK_EC2_DIR/lib folder: + version = "boto-2.34.0" + md5 = "5556223d2d0cc4d06dd4829e671dcecd" + url = "https://pypi.python.org/packages/source/b/boto/%s.tar.gz" % version + lib_dir = os.path.join(SPARK_EC2_DIR, "lib") + if not os.path.exists(lib_dir): + os.mkdir(lib_dir) + boto_lib_dir = os.path.join(lib_dir, version) + if not os.path.isdir(boto_lib_dir): + tgz_file_path = os.path.join(lib_dir, "%s.tar.gz" % version) + print "Downloading Boto from PyPi" + download_stream = urllib2.urlopen(url) + with open(tgz_file_path, "wb") as tgz_file: + tgz_file.write(download_stream.read()) + with open(tgz_file_path) as tar: + if hashlib.md5(tar.read()).hexdigest() != md5: + print >> stderr, "ERROR: Got wrong md5sum for Boto" + sys.exit(1) + tar = tarfile.open(tgz_file_path) + tar.extractall(path=lib_dir) + tar.close() + os.remove(tgz_file_path) + print "Finished downloading Boto" + sys.path.insert(0, boto_lib_dir) + + +setup_boto() +import boto +from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType +from boto import ec2 + + class UsageError(Exception): pass @@ -452,7 +484,7 @@ def launch_cluster(conn, opts, cluster_name): active_instance_ids.append(id_to_req[i].instance_id) if len(active_instance_ids) == opts.slaves: print "All %d slaves granted" % opts.slaves - reservations = conn.get_all_instances(active_instance_ids) + reservations = conn.get_all_reservations(active_instance_ids) slave_nodes = [] for r in reservations: slave_nodes += r.instances @@ -541,7 +573,7 @@ def launch_cluster(conn, opts, cluster_name): def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." - reservations = conn.get_all_instances() + reservations = conn.get_all_reservations() master_nodes = [] slave_nodes = [] for res in reservations: @@ -618,12 +650,6 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): print "Done!" -def setup_standalone_cluster(master, slave_nodes, opts): - slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) - ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) - ssh(master, opts, "/root/spark/sbin/start-all.sh") - - def setup_spark_cluster(master, opts): ssh(master, opts, "chmod u+x spark-ec2/setup.sh") ssh(master, opts, "spark-ec2/setup.sh") |