aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xec2/spark_ec2.py39
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala5
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala15
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Updater.scala19
4 files changed, 38 insertions, 40 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 2ec3c007fb..740ec08542 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -9,9 +9,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -53,7 +53,7 @@ def parse_args():
help="Seconds to wait for nodes to start (default: 120)")
parser.add_option("-k", "--key-pair",
help="Key pair to use on instances")
- parser.add_option("-i", "--identity-file",
+ parser.add_option("-i", "--identity-file",
help="SSH private key file to use for logging into instances")
parser.add_option("-t", "--instance-type", default="m1.large",
help="Type of instance to launch (default: m1.large). " +
@@ -69,7 +69,7 @@ def parse_args():
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
"available AMI (default: latest)")
- parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
+ parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)")
parser.add_option("--resume", action="store_true", default=False,
@@ -99,7 +99,7 @@ def parse_args():
help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
help="When destroying a cluster, delete the security groups that were created")
-
+
(opts, args) = parser.parse_args()
if len(args) != 2:
parser.print_help()
@@ -112,7 +112,7 @@ def parse_args():
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1)
-
+
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os.getenv('HOME')
@@ -178,6 +178,7 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
+ master_group.authorize('tcp', 33000, 33010, '0.0.0.0/0')
if opts.cluster_type == "mesos":
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia:
@@ -257,7 +258,7 @@ def launch_cluster(conn, opts, cluster_name):
block_device_map = block_map)
my_req_ids += [req.id for req in slave_reqs]
i += 1
-
+
print "Waiting for spot instances to be granted..."
try:
while True:
@@ -413,7 +414,7 @@ def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/root/spark/bin/start-all.sh")
-
+
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
@@ -528,7 +529,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
dest.write(text)
dest.close()
# rsync the whole directory over to the master machine
- command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
+ command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
"'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
subprocess.check_call(command, shell=True)
# Remove the temp directory we created above
@@ -557,9 +558,9 @@ def ssh(host, opts, command):
print "Error connecting to host {0}, sleeping 30".format(e)
time.sleep(30)
tries = tries + 1
-
-
-
+
+
+
# Gets a list of zones to launch instances in
@@ -618,12 +619,12 @@ def main():
print "Terminating zoo..."
for inst in zoo_nodes:
inst.terminate()
-
+
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
-
+
attempt = 1;
while attempt <= 3:
print "Attempt %d" % attempt
@@ -639,7 +640,7 @@ def main():
from_port=rule.from_port,
to_port=rule.to_port,
src_group=grant)
-
+
# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time.sleep(30) # Yes, it does have to be this long :-(
@@ -650,13 +651,13 @@ def main():
except boto.exception.EC2ResponseError:
success = False;
print "Failed to delete security group " + group.name
-
+
# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success: break;
-
+
attempt += 1
-
+
if not success:
print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes."
@@ -679,7 +680,7 @@ def main():
elif action == "stop":
response = raw_input("Are you sure you want to stop the cluster " +
cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
- "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
+ "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index bc1c327729..bf3b05dedb 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -151,7 +151,6 @@ object LogisticRegressionLocalRandomSGD {
input: RDD[(Int, Array[Double])],
numIterations: Int,
stepSize: Double,
-
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
@@ -174,7 +173,6 @@ object LogisticRegressionLocalRandomSGD {
input: RDD[(Int, Array[Double])],
numIterations: Int,
stepSize: Double,
-
miniBatchFraction: Double)
: LogisticRegressionModel =
{
@@ -195,8 +193,7 @@ object LogisticRegressionLocalRandomSGD {
def train(
input: RDD[(Int, Array[Double])],
numIterations: Int,
- stepSize: Double
- )
+ stepSize: Double)
: LogisticRegressionModel =
{
train(input, numIterations, stepSize, 1.0)
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index d4b83a1456..19cda26446 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -61,7 +61,7 @@ object GradientDescent {
// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
- var reg_val = 0.0
+ var regVal = 0.0
for (i <- 1 to numIters) {
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
@@ -71,15 +71,14 @@ object GradientDescent {
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
- stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
+ /**
+ * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
+ * and regVal is the regularization value computed in the previous iteration as well.
+ */
+ stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
- reg_val = update._2
- stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
- /*
- * NOTE(Xinghao): The loss here is sum of lossSum computed using the weights before applying updater,
- * and reg_val using weights after applying updater
- */
+ regVal = update._2
}
(weights.toArray, stochasticLossHistory.toArray)
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
index 188fe7d972..e916a92c33 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
@@ -23,6 +23,7 @@ import org.jblas.DoubleMatrix
abstract class Updater extends Serializable {
/**
* Compute an updated value for weights given the gradient, stepSize and iteration number.
+ * Also returns the regularization value computed using the *updated* weights.
*
* @param weightsOlds - Column matrix of size nx1 where n is the number of features.
* @param gradient - Column matrix of size nx1 where n is the number of features.
@@ -31,7 +32,7 @@ abstract class Updater extends Serializable {
* @param regParam - Regularization parameter
*
* @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
- * and the second element is the regularization value.
+ * and the second element is the regularization value computed using updated weights.
*/
def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, regParam: Double):
(DoubleMatrix, Double)
@@ -46,13 +47,13 @@ class SimpleUpdater extends Updater {
}
/**
-* L1 regularization -- corresponding proximal operator is the soft-thresholding function
-* That is, each weight component is shrunk towards 0 by shrinkageVal
-* If w > shrinkageVal, set weight component to w-shrinkageVal.
-* If w < -shrinkageVal, set weight component to w+shrinkageVal.
-* If -shrinkageVal < w < shrinkageVal, set weight component to 0.
-* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
-**/
+ * L1 regularization -- corresponding proximal operator is the soft-thresholding function
+ * That is, each weight component is shrunk towards 0 by shrinkageVal
+ * If w > shrinkageVal, set weight component to w-shrinkageVal.
+ * If w < -shrinkageVal, set weight component to w+shrinkageVal.
+ * If -shrinkageVal < w < shrinkageVal, set weight component to 0.
+ * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
+ */
class L1Updater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
@@ -76,7 +77,7 @@ class SquaredL2Updater extends Updater {
val thisIterStepSize = stepSize / math.sqrt(iter)
val normGradient = gradient.mul(thisIterStepSize)
val newWeights = weightsOld.sub(normGradient).div(2.0 * thisIterStepSize * regParam + 1.0)
- (newWeights, pow(newWeights.norm2,2.0) * regParam)
+ (newWeights, pow(newWeights.norm2, 2.0) * regParam)
}
}