diff options
4 files changed, 38 insertions, 40 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 2ec3c007fb..740ec08542 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -9,9 +9,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -53,7 +53,7 @@ def parse_args(): help="Seconds to wait for nodes to start (default: 120)") parser.add_option("-k", "--key-pair", help="Key pair to use on instances") - parser.add_option("-i", "--identity-file", + parser.add_option("-i", "--identity-file", help="SSH private key file to use for logging into instances") parser.add_option("-t", "--instance-type", default="m1.large", help="Type of instance to launch (default: m1.large). " + @@ -69,7 +69,7 @@ def parse_args(): parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") - parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", + parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + "the given local address (for use with login)") parser.add_option("--resume", action="store_true", default=False, @@ -99,7 +99,7 @@ def parse_args(): help="The SSH user you want to connect as (default: root)") parser.add_option("--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") - + (opts, args) = parser.parse_args() if len(args) != 2: parser.print_help() @@ -112,7 +112,7 @@ def parse_args(): if opts.cluster_type not in ["mesos", "standalone"] and action == "launch": print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type) sys.exit(1) - + # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html home_dir = os.getenv('HOME') @@ -178,6 +178,7 @@ def launch_cluster(conn, opts, cluster_name): master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') + master_group.authorize('tcp', 33000, 33010, '0.0.0.0/0') if opts.cluster_type == "mesos": master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0') if opts.ganglia: @@ -257,7 +258,7 @@ def launch_cluster(conn, opts, cluster_name): block_device_map = block_map) my_req_ids += [req.id for req in slave_reqs] i += 1 - + print "Waiting for spot instances to be granted..." try: while True: @@ -413,7 +414,7 @@ def setup_standalone_cluster(master, slave_nodes, opts): slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) ssh(master, opts, "/root/spark/bin/start-all.sh") - + def setup_spark_cluster(master, opts): ssh(master, opts, "chmod u+x spark-ec2/setup.sh") ssh(master, opts, "spark-ec2/setup.sh") @@ -528,7 +529,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes, dest.write(text) dest.close() # rsync the whole directory over to the master machine - command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + + command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master)) subprocess.check_call(command, shell=True) # Remove the temp directory we created above @@ -557,9 +558,9 @@ def ssh(host, opts, command): print "Error connecting to host {0}, sleeping 30".format(e) time.sleep(30) tries = tries + 1 - - - + + + # Gets a list of zones to launch instances in @@ -618,12 +619,12 @@ def main(): print "Terminating zoo..." for inst in zoo_nodes: inst.terminate() - + # Delete security groups as well if opts.delete_groups: print "Deleting security groups (this will take some time)..." group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"] - + attempt = 1; while attempt <= 3: print "Attempt %d" % attempt @@ -639,7 +640,7 @@ def main(): from_port=rule.from_port, to_port=rule.to_port, src_group=grant) - + # Sleep for AWS eventual-consistency to catch up, and for instances # to terminate time.sleep(30) # Yes, it does have to be this long :-( @@ -650,13 +651,13 @@ def main(): except boto.exception.EC2ResponseError: success = False; print "Failed to delete security group " + group.name - + # Unfortunately, group.revoke() returns True even if a rule was not # deleted, so this needs to be rerun if something fails if success: break; - + attempt += 1 - + if not success: print "Failed to delete all security groups after 3 tries." print "Try re-running in a few minutes." @@ -679,7 +680,7 @@ def main(): elif action == "stop": response = raw_input("Are you sure you want to stop the cluster " + cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + - "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index bc1c327729..bf3b05dedb 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -151,7 +151,6 @@ object LogisticRegressionLocalRandomSGD { input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, - miniBatchFraction: Double, initialWeights: Array[Double]) : LogisticRegressionModel = @@ -174,7 +173,6 @@ object LogisticRegressionLocalRandomSGD { input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, - miniBatchFraction: Double) : LogisticRegressionModel = { @@ -195,8 +193,7 @@ object LogisticRegressionLocalRandomSGD { def train( input: RDD[(Int, Array[Double])], numIterations: Int, - stepSize: Double - ) + stepSize: Double) : LogisticRegressionModel = { train(input, numIterations, stepSize, 1.0) diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index d4b83a1456..19cda26446 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -61,7 +61,7 @@ object GradientDescent { // Initialize weights as a column vector var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) - var reg_val = 0.0 + var regVal = 0.0 for (i <- 1 to numIters) { val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { @@ -71,15 +71,14 @@ object GradientDescent { (grad, loss) }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) - stochasticLossHistory.append(lossSum / miniBatchSize + reg_val) + /** + * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration + * and regVal is the regularization value computed in the previous iteration as well. + */ + stochasticLossHistory.append(lossSum / miniBatchSize + regVal) val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) weights = update._1 - reg_val = update._2 - stochasticLossHistory.append(lossSum / miniBatchSize + reg_val) - /* - * NOTE(Xinghao): The loss here is sum of lossSum computed using the weights before applying updater, - * and reg_val using weights after applying updater - */ + regVal = update._2 } (weights.toArray, stochasticLossHistory.toArray) diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala index 188fe7d972..e916a92c33 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala @@ -23,6 +23,7 @@ import org.jblas.DoubleMatrix abstract class Updater extends Serializable { /** * Compute an updated value for weights given the gradient, stepSize and iteration number. + * Also returns the regularization value computed using the *updated* weights. * * @param weightsOlds - Column matrix of size nx1 where n is the number of features. * @param gradient - Column matrix of size nx1 where n is the number of features. @@ -31,7 +32,7 @@ abstract class Updater extends Serializable { * @param regParam - Regularization parameter * * @return A tuple of 2 elements. The first element is a column matrix containing updated weights, - * and the second element is the regularization value. + * and the second element is the regularization value computed using updated weights. */ def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) @@ -46,13 +47,13 @@ class SimpleUpdater extends Updater { } /** -* L1 regularization -- corresponding proximal operator is the soft-thresholding function -* That is, each weight component is shrunk towards 0 by shrinkageVal -* If w > shrinkageVal, set weight component to w-shrinkageVal. -* If w < -shrinkageVal, set weight component to w+shrinkageVal. -* If -shrinkageVal < w < shrinkageVal, set weight component to 0. -* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal) -**/ + * L1 regularization -- corresponding proximal operator is the soft-thresholding function + * That is, each weight component is shrunk towards 0 by shrinkageVal + * If w > shrinkageVal, set weight component to w-shrinkageVal. + * If w < -shrinkageVal, set weight component to w+shrinkageVal. + * If -shrinkageVal < w < shrinkageVal, set weight component to 0. + * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal) + */ class L1Updater extends Updater { override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { @@ -76,7 +77,7 @@ class SquaredL2Updater extends Updater { val thisIterStepSize = stepSize / math.sqrt(iter) val normGradient = gradient.mul(thisIterStepSize) val newWeights = weightsOld.sub(normGradient).div(2.0 * thisIterStepSize * regParam + 1.0) - (newWeights, pow(newWeights.norm2,2.0) * regParam) + (newWeights, pow(newWeights.norm2, 2.0) * regParam) } } |