aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/r/dataframe.R54
-rw-r--r--examples/src/main/r/kmeans.R93
-rw-r--r--examples/src/main/r/linear_solver_mnist.R107
-rw-r--r--examples/src/main/r/logistic_regression.R62
-rw-r--r--examples/src/main/r/pi.R46
-rw-r--r--examples/src/main/r/wordcount.R42
6 files changed, 54 insertions, 350 deletions
diff --git a/examples/src/main/r/dataframe.R b/examples/src/main/r/dataframe.R
new file mode 100644
index 0000000000..53b817144f
--- /dev/null
+++ b/examples/src/main/r/dataframe.R
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+# Initialize SparkContext and SQLContext
+sc <- sparkR.init(appName="SparkR-DataFrame-example")
+sqlContext <- sparkRSQL.init(sc)
+
+# Create a simple local data.frame
+localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
+
+# Convert local data frame to a SparkR DataFrame
+df <- createDataFrame(sqlContext, localDF)
+
+# Print its schema
+printSchema(df)
+# root
+# |-- name: string (nullable = true)
+# |-- age: double (nullable = true)
+
+# Create a DataFrame from a JSON file
+path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json")
+peopleDF <- jsonFile(sqlContext, path)
+printSchema(peopleDF)
+
+# Register this DataFrame as a table.
+registerTempTable(peopleDF, "people")
+
+# SQL statements can be run by using the sql methods provided by sqlContext
+teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+# Call collect to get a local data.frame
+teenagersLocalDF <- collect(teenagers)
+
+# Print the teenagers in our dataset
+print(teenagersLocalDF)
+
+# Stop the SparkContext now
+sparkR.stop()
diff --git a/examples/src/main/r/kmeans.R b/examples/src/main/r/kmeans.R
deleted file mode 100644
index 6e6b5cb937..0000000000
--- a/examples/src/main/r/kmeans.R
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-# Logistic regression in Spark.
-# Note: unlike the example in Scala, a point here is represented as a vector of
-# doubles.
-
-parseVectors <- function(lines) {
- lines <- strsplit(as.character(lines) , " ", fixed = TRUE)
- list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]])))
-}
-
-dist.fun <- function(P, C) {
- apply(
- C,
- 1,
- function(x) {
- colSums((t(P) - x)^2)
- }
- )
-}
-
-closestPoint <- function(P, C) {
- max.col(-dist.fun(P, C))
-}
-# Main program
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 3) {
- print("Usage: kmeans <file> <K> <convergeDist>")
- q("no")
-}
-
-sc <- sparkR.init(appName = "RKMeans")
-K <- as.integer(args[[2]])
-convergeDist <- as.double(args[[3]])
-
-lines <- textFile(sc, args[[1]])
-points <- cache(lapplyPartition(lines, parseVectors))
-# kPoints <- take(points, K)
-kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L))
-tempDist <- 1.0
-
-while (tempDist > convergeDist) {
- closest <- lapplyPartition(
- lapply(points,
- function(p) {
- cp <- closestPoint(p, kPoints);
- mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), SIMPLIFY=FALSE)
- }),
- function(x) {do.call(c, x)
- })
-
- pointStats <- reduceByKey(closest,
- function(p1, p2) {
- t(colSums(rbind(p1, p2)))
- },
- 2L)
-
- newPoints <- do.call(
- rbind,
- collect(lapply(pointStats,
- function(tup) {
- point.sum <- tup[[2]][, -1]
- point.count <- tup[[2]][, 1]
- point.sum/point.count
- })))
-
- D <- dist.fun(kPoints, newPoints)
- tempDist <- sum(D[cbind(1:3, max.col(-D))])
- kPoints <- newPoints
- cat("Finished iteration (delta = ", tempDist, ")\n")
-}
-
-cat("Final centers:\n")
-writeLines(unlist(lapply(kPoints, paste, collapse = " ")))
diff --git a/examples/src/main/r/linear_solver_mnist.R b/examples/src/main/r/linear_solver_mnist.R
deleted file mode 100644
index c864a4232d..0000000000
--- a/examples/src/main/r/linear_solver_mnist.R
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Instructions: https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2
-
-library(SparkR)
-library(Matrix)
-
-args <- commandArgs(trailing = TRUE)
-
-# number of random features; default to 1100
-D <- ifelse(length(args) > 0, as.integer(args[[1]]), 1100)
-# number of partitions for training dataset
-trainParts <- 12
-# dimension of digits
-d <- 784
-# number of test examples
-NTrain <- 60000
-# number of training examples
-NTest <- 10000
-# scale of features
-gamma <- 4e-4
-
-sc <- sparkR.init(appName = "SparkR-LinearSolver")
-
-# You can also use HDFS path to speed things up:
-# hdfs://<master>/train-mnist-dense-with-labels.data
-file <- textFile(sc, "/data/train-mnist-dense-with-labels.data", trainParts)
-
-W <- gamma * matrix(nrow=D, ncol=d, data=rnorm(D*d))
-b <- 2 * pi * matrix(nrow=D, ncol=1, data=runif(D))
-broadcastW <- broadcast(sc, W)
-broadcastB <- broadcast(sc, b)
-
-includePackage(sc, Matrix)
-numericLines <- lapplyPartitionsWithIndex(file,
- function(split, part) {
- matList <- sapply(part, function(line) {
- as.numeric(strsplit(line, ",", fixed=TRUE)[[1]])
- }, simplify=FALSE)
- mat <- Matrix(ncol=d+1, data=unlist(matList, F, F),
- sparse=T, byrow=T)
- mat
- })
-
-featureLabels <- cache(lapplyPartition(
- numericLines,
- function(part) {
- label <- part[,1]
- mat <- part[,-1]
- ones <- rep(1, nrow(mat))
- features <- cos(
- mat %*% t(value(broadcastW)) + (matrix(ncol=1, data=ones) %*% t(value(broadcastB))))
- onesMat <- Matrix(ones)
- featuresPlus <- cBind(features, onesMat)
- labels <- matrix(nrow=nrow(mat), ncol=10, data=-1)
- for (i in 1:nrow(mat)) {
- labels[i, label[i]] <- 1
- }
- list(label=labels, features=featuresPlus)
- }))
-
-FTF <- Reduce("+", collect(lapplyPartition(featureLabels,
- function(part) {
- t(part$features) %*% part$features
- }), flatten=F))
-
-FTY <- Reduce("+", collect(lapplyPartition(featureLabels,
- function(part) {
- t(part$features) %*% part$label
- }), flatten=F))
-
-# solve for the coefficient matrix
-C <- solve(FTF, FTY)
-
-test <- Matrix(as.matrix(read.csv("/data/test-mnist-dense-with-labels.data",
- header=F), sparse=T))
-testData <- test[,-1]
-testLabels <- matrix(ncol=1, test[,1])
-
-err <- 0
-
-# contstruct the feature maps for all examples from this digit
-featuresTest <- cos(testData %*% t(value(broadcastW)) +
- (matrix(ncol=1, data=rep(1, NTest)) %*% t(value(broadcastB))))
-featuresTest <- cBind(featuresTest, Matrix(rep(1, NTest)))
-
-# extract the one vs. all assignment
-results <- featuresTest %*% C
-labelsGot <- apply(results, 1, which.max)
-err <- sum(testLabels != labelsGot) / nrow(testLabels)
-
-cat("\nFinished running. The error rate is: ", err, ".\n")
diff --git a/examples/src/main/r/logistic_regression.R b/examples/src/main/r/logistic_regression.R
deleted file mode 100644
index 2a86aa9816..0000000000
--- a/examples/src/main/r/logistic_regression.R
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 3) {
- print("Usage: logistic_regression <file> <iters> <dimension>")
- q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "LogisticRegressionR")
-iterations <- as.integer(args[[2]])
-D <- as.integer(args[[3]])
-
-readPartition <- function(part){
- part = strsplit(part, " ", fixed = T)
- list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]])))
-}
-
-# Read data points and convert each partition to a matrix
-points <- cache(lapplyPartition(textFile(sc, args[[1]]), readPartition))
-
-# Initialize w to a random value
-w <- runif(n=D, min = -1, max = 1)
-cat("Initial w: ", w, "\n")
-
-# Compute logistic regression gradient for a matrix of data points
-gradient <- function(partition) {
- partition = partition[[1]]
- Y <- partition[, 1] # point labels (first column of input file)
- X <- partition[, -1] # point coordinates
-
- # For each point (x, y), compute gradient function
- dot <- X %*% w
- logit <- 1 / (1 + exp(-Y * dot))
- grad <- t(X) %*% ((logit - 1) * Y)
- list(grad)
-}
-
-for (i in 1:iterations) {
- cat("On iteration ", i, "\n")
- w <- w - reduce(lapplyPartition(points, gradient), "+")
-}
-
-cat("Final w: ", w, "\n")
diff --git a/examples/src/main/r/pi.R b/examples/src/main/r/pi.R
deleted file mode 100644
index aa7a833e14..0000000000
--- a/examples/src/main/r/pi.R
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-sc <- sparkR.init(appName = "PiR")
-
-slices <- ifelse(length(args) > 1, as.integer(args[[2]]), 2)
-
-n <- 100000 * slices
-
-piFunc <- function(elem) {
- rands <- runif(n = 2, min = -1, max = 1)
- val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
- val
-}
-
-
-piFuncVec <- function(elems) {
- message(length(elems))
- rands1 <- runif(n = length(elems), min = -1, max = 1)
- rands2 <- runif(n = length(elems), min = -1, max = 1)
- val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
- sum(val)
-}
-
-rdd <- parallelize(sc, 1:n, slices)
-count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
-cat("Pi is roughly", 4.0 * count / n, "\n")
-cat("Num elements in RDD ", count(rdd), "\n")
diff --git a/examples/src/main/r/wordcount.R b/examples/src/main/r/wordcount.R
deleted file mode 100644
index b734cb0ecf..0000000000
--- a/examples/src/main/r/wordcount.R
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 1) {
- print("Usage: wordcount <file>")
- q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "RwordCount")
-lines <- textFile(sc, args[[1]])
-
-words <- flatMap(lines,
- function(line) {
- strsplit(line, " ")[[1]]
- })
-wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-counts <- reduceByKey(wordCount, "+", 2L)
-output <- collect(counts)
-
-for (wordcount in output) {
- cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
-}