aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main')
-rw-r--r--examples/src/main/r/kmeans.R93
-rw-r--r--examples/src/main/r/linear_solver_mnist.R107
-rw-r--r--examples/src/main/r/logistic_regression.R62
-rw-r--r--examples/src/main/r/pi.R46
-rw-r--r--examples/src/main/r/wordcount.R42
5 files changed, 350 insertions, 0 deletions
diff --git a/examples/src/main/r/kmeans.R b/examples/src/main/r/kmeans.R
new file mode 100644
index 0000000000..6e6b5cb937
--- /dev/null
+++ b/examples/src/main/r/kmeans.R
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+# Logistic regression in Spark.
+# Note: unlike the example in Scala, a point here is represented as a vector of
+# doubles.
+
+parseVectors <- function(lines) {
+ lines <- strsplit(as.character(lines) , " ", fixed = TRUE)
+ list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]])))
+}
+
+dist.fun <- function(P, C) {
+ apply(
+ C,
+ 1,
+ function(x) {
+ colSums((t(P) - x)^2)
+ }
+ )
+}
+
+closestPoint <- function(P, C) {
+ max.col(-dist.fun(P, C))
+}
+# Main program
+
+args <- commandArgs(trailing = TRUE)
+
+if (length(args) != 3) {
+ print("Usage: kmeans <file> <K> <convergeDist>")
+ q("no")
+}
+
+sc <- sparkR.init(appName = "RKMeans")
+K <- as.integer(args[[2]])
+convergeDist <- as.double(args[[3]])
+
+lines <- textFile(sc, args[[1]])
+points <- cache(lapplyPartition(lines, parseVectors))
+# kPoints <- take(points, K)
+kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L))
+tempDist <- 1.0
+
+while (tempDist > convergeDist) {
+ closest <- lapplyPartition(
+ lapply(points,
+ function(p) {
+ cp <- closestPoint(p, kPoints);
+ mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), SIMPLIFY=FALSE)
+ }),
+ function(x) {do.call(c, x)
+ })
+
+ pointStats <- reduceByKey(closest,
+ function(p1, p2) {
+ t(colSums(rbind(p1, p2)))
+ },
+ 2L)
+
+ newPoints <- do.call(
+ rbind,
+ collect(lapply(pointStats,
+ function(tup) {
+ point.sum <- tup[[2]][, -1]
+ point.count <- tup[[2]][, 1]
+ point.sum/point.count
+ })))
+
+ D <- dist.fun(kPoints, newPoints)
+ tempDist <- sum(D[cbind(1:3, max.col(-D))])
+ kPoints <- newPoints
+ cat("Finished iteration (delta = ", tempDist, ")\n")
+}
+
+cat("Final centers:\n")
+writeLines(unlist(lapply(kPoints, paste, collapse = " ")))
diff --git a/examples/src/main/r/linear_solver_mnist.R b/examples/src/main/r/linear_solver_mnist.R
new file mode 100644
index 0000000000..c864a4232d
--- /dev/null
+++ b/examples/src/main/r/linear_solver_mnist.R
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Instructions: https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2
+
+library(SparkR)
+library(Matrix)
+
+args <- commandArgs(trailing = TRUE)
+
+# number of random features; default to 1100
+D <- ifelse(length(args) > 0, as.integer(args[[1]]), 1100)
+# number of partitions for training dataset
+trainParts <- 12
+# dimension of digits
+d <- 784
+# number of test examples
+NTrain <- 60000
+# number of training examples
+NTest <- 10000
+# scale of features
+gamma <- 4e-4
+
+sc <- sparkR.init(appName = "SparkR-LinearSolver")
+
+# You can also use HDFS path to speed things up:
+# hdfs://<master>/train-mnist-dense-with-labels.data
+file <- textFile(sc, "/data/train-mnist-dense-with-labels.data", trainParts)
+
+W <- gamma * matrix(nrow=D, ncol=d, data=rnorm(D*d))
+b <- 2 * pi * matrix(nrow=D, ncol=1, data=runif(D))
+broadcastW <- broadcast(sc, W)
+broadcastB <- broadcast(sc, b)
+
+includePackage(sc, Matrix)
+numericLines <- lapplyPartitionsWithIndex(file,
+ function(split, part) {
+ matList <- sapply(part, function(line) {
+ as.numeric(strsplit(line, ",", fixed=TRUE)[[1]])
+ }, simplify=FALSE)
+ mat <- Matrix(ncol=d+1, data=unlist(matList, F, F),
+ sparse=T, byrow=T)
+ mat
+ })
+
+featureLabels <- cache(lapplyPartition(
+ numericLines,
+ function(part) {
+ label <- part[,1]
+ mat <- part[,-1]
+ ones <- rep(1, nrow(mat))
+ features <- cos(
+ mat %*% t(value(broadcastW)) + (matrix(ncol=1, data=ones) %*% t(value(broadcastB))))
+ onesMat <- Matrix(ones)
+ featuresPlus <- cBind(features, onesMat)
+ labels <- matrix(nrow=nrow(mat), ncol=10, data=-1)
+ for (i in 1:nrow(mat)) {
+ labels[i, label[i]] <- 1
+ }
+ list(label=labels, features=featuresPlus)
+ }))
+
+FTF <- Reduce("+", collect(lapplyPartition(featureLabels,
+ function(part) {
+ t(part$features) %*% part$features
+ }), flatten=F))
+
+FTY <- Reduce("+", collect(lapplyPartition(featureLabels,
+ function(part) {
+ t(part$features) %*% part$label
+ }), flatten=F))
+
+# solve for the coefficient matrix
+C <- solve(FTF, FTY)
+
+test <- Matrix(as.matrix(read.csv("/data/test-mnist-dense-with-labels.data",
+ header=F), sparse=T))
+testData <- test[,-1]
+testLabels <- matrix(ncol=1, test[,1])
+
+err <- 0
+
+# contstruct the feature maps for all examples from this digit
+featuresTest <- cos(testData %*% t(value(broadcastW)) +
+ (matrix(ncol=1, data=rep(1, NTest)) %*% t(value(broadcastB))))
+featuresTest <- cBind(featuresTest, Matrix(rep(1, NTest)))
+
+# extract the one vs. all assignment
+results <- featuresTest %*% C
+labelsGot <- apply(results, 1, which.max)
+err <- sum(testLabels != labelsGot) / nrow(testLabels)
+
+cat("\nFinished running. The error rate is: ", err, ".\n")
diff --git a/examples/src/main/r/logistic_regression.R b/examples/src/main/r/logistic_regression.R
new file mode 100644
index 0000000000..2a86aa9816
--- /dev/null
+++ b/examples/src/main/r/logistic_regression.R
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+args <- commandArgs(trailing = TRUE)
+
+if (length(args) != 3) {
+ print("Usage: logistic_regression <file> <iters> <dimension>")
+ q("no")
+}
+
+# Initialize Spark context
+sc <- sparkR.init(appName = "LogisticRegressionR")
+iterations <- as.integer(args[[2]])
+D <- as.integer(args[[3]])
+
+readPartition <- function(part){
+ part = strsplit(part, " ", fixed = T)
+ list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]])))
+}
+
+# Read data points and convert each partition to a matrix
+points <- cache(lapplyPartition(textFile(sc, args[[1]]), readPartition))
+
+# Initialize w to a random value
+w <- runif(n=D, min = -1, max = 1)
+cat("Initial w: ", w, "\n")
+
+# Compute logistic regression gradient for a matrix of data points
+gradient <- function(partition) {
+ partition = partition[[1]]
+ Y <- partition[, 1] # point labels (first column of input file)
+ X <- partition[, -1] # point coordinates
+
+ # For each point (x, y), compute gradient function
+ dot <- X %*% w
+ logit <- 1 / (1 + exp(-Y * dot))
+ grad <- t(X) %*% ((logit - 1) * Y)
+ list(grad)
+}
+
+for (i in 1:iterations) {
+ cat("On iteration ", i, "\n")
+ w <- w - reduce(lapplyPartition(points, gradient), "+")
+}
+
+cat("Final w: ", w, "\n")
diff --git a/examples/src/main/r/pi.R b/examples/src/main/r/pi.R
new file mode 100644
index 0000000000..aa7a833e14
--- /dev/null
+++ b/examples/src/main/r/pi.R
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+args <- commandArgs(trailing = TRUE)
+
+sc <- sparkR.init(appName = "PiR")
+
+slices <- ifelse(length(args) > 1, as.integer(args[[2]]), 2)
+
+n <- 100000 * slices
+
+piFunc <- function(elem) {
+ rands <- runif(n = 2, min = -1, max = 1)
+ val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
+ val
+}
+
+
+piFuncVec <- function(elems) {
+ message(length(elems))
+ rands1 <- runif(n = length(elems), min = -1, max = 1)
+ rands2 <- runif(n = length(elems), min = -1, max = 1)
+ val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
+ sum(val)
+}
+
+rdd <- parallelize(sc, 1:n, slices)
+count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
+cat("Pi is roughly", 4.0 * count / n, "\n")
+cat("Num elements in RDD ", count(rdd), "\n")
diff --git a/examples/src/main/r/wordcount.R b/examples/src/main/r/wordcount.R
new file mode 100644
index 0000000000..b734cb0ecf
--- /dev/null
+++ b/examples/src/main/r/wordcount.R
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+args <- commandArgs(trailing = TRUE)
+
+if (length(args) != 1) {
+ print("Usage: wordcount <file>")
+ q("no")
+}
+
+# Initialize Spark context
+sc <- sparkR.init(appName = "RwordCount")
+lines <- textFile(sc, args[[1]])
+
+words <- flatMap(lines,
+ function(line) {
+ strsplit(line, " ")[[1]]
+ })
+wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+counts <- reduceByKey(wordCount, "+", 2L)
+output <- collect(counts)
+
+for (wordcount in output) {
+ cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
+}