aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-05-06 17:28:11 -0700
committerReynold Xin <rxin@databricks.com>2015-05-06 17:28:11 -0700
commit4e930420c19ae7773b138dfc7db8fc03b4660251 (patch)
tree7086ab6c3c0dc1599834aab82a863d050ebca0a0 /examples
parentfbf1f342a02af65f69e0ee770a2b983c69e7c089 (diff)
downloadspark-4e930420c19ae7773b138dfc7db8fc03b4660251.tar.gz
spark-4e930420c19ae7773b138dfc7db8fc03b4660251.tar.bz2
spark-4e930420c19ae7773b138dfc7db8fc03b4660251.zip
[SPARK-6799] [SPARKR] Remove SparkR RDD examples, add dataframe examples
This PR also makes some of the DataFrame to RDD methods private as the RDD class is private in 1.4 cc rxin pwendell Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Closes #5949 from shivaram/sparkr-examples and squashes the following commits: 6c42fdc [Shivaram Venkataraman] Remove SparkR RDD examples, add dataframe examples
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/r/dataframe.R54
-rw-r--r--examples/src/main/r/kmeans.R93
-rw-r--r--examples/src/main/r/linear_solver_mnist.R107
-rw-r--r--examples/src/main/r/logistic_regression.R62
-rw-r--r--examples/src/main/r/pi.R46
-rw-r--r--examples/src/main/r/wordcount.R42
6 files changed, 54 insertions, 350 deletions
diff --git a/examples/src/main/r/dataframe.R b/examples/src/main/r/dataframe.R
new file mode 100644
index 0000000000..53b817144f
--- /dev/null
+++ b/examples/src/main/r/dataframe.R
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+# Initialize SparkContext and SQLContext
+sc <- sparkR.init(appName="SparkR-DataFrame-example")
+sqlContext <- sparkRSQL.init(sc)
+
+# Create a simple local data.frame
+localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
+
+# Convert local data frame to a SparkR DataFrame
+df <- createDataFrame(sqlContext, localDF)
+
+# Print its schema
+printSchema(df)
+# root
+# |-- name: string (nullable = true)
+# |-- age: double (nullable = true)
+
+# Create a DataFrame from a JSON file
+path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json")
+peopleDF <- jsonFile(sqlContext, path)
+printSchema(peopleDF)
+
+# Register this DataFrame as a table.
+registerTempTable(peopleDF, "people")
+
+# SQL statements can be run by using the sql methods provided by sqlContext
+teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+# Call collect to get a local data.frame
+teenagersLocalDF <- collect(teenagers)
+
+# Print the teenagers in our dataset
+print(teenagersLocalDF)
+
+# Stop the SparkContext now
+sparkR.stop()
diff --git a/examples/src/main/r/kmeans.R b/examples/src/main/r/kmeans.R
deleted file mode 100644
index 6e6b5cb937..0000000000
--- a/examples/src/main/r/kmeans.R
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-# Logistic regression in Spark.
-# Note: unlike the example in Scala, a point here is represented as a vector of
-# doubles.
-
-parseVectors <- function(lines) {
- lines <- strsplit(as.character(lines) , " ", fixed = TRUE)
- list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]])))
-}
-
-dist.fun <- function(P, C) {
- apply(
- C,
- 1,
- function(x) {
- colSums((t(P) - x)^2)
- }
- )
-}
-
-closestPoint <- function(P, C) {
- max.col(-dist.fun(P, C))
-}
-# Main program
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 3) {
- print("Usage: kmeans <file> <K> <convergeDist>")
- q("no")
-}
-
-sc <- sparkR.init(appName = "RKMeans")
-K <- as.integer(args[[2]])
-convergeDist <- as.double(args[[3]])
-
-lines <- textFile(sc, args[[1]])
-points <- cache(lapplyPartition(lines, parseVectors))
-# kPoints <- take(points, K)
-kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L))
-tempDist <- 1.0
-
-while (tempDist > convergeDist) {
- closest <- lapplyPartition(
- lapply(points,
- function(p) {
- cp <- closestPoint(p, kPoints);
- mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), SIMPLIFY=FALSE)
- }),
- function(x) {do.call(c, x)
- })
-
- pointStats <- reduceByKey(closest,
- function(p1, p2) {
- t(colSums(rbind(p1, p2)))
- },
- 2L)
-
- newPoints <- do.call(
- rbind,
- collect(lapply(pointStats,
- function(tup) {
- point.sum <- tup[[2]][, -1]
- point.count <- tup[[2]][, 1]
- point.sum/point.count
- })))
-
- D <- dist.fun(kPoints, newPoints)
- tempDist <- sum(D[cbind(1:3, max.col(-D))])
- kPoints <- newPoints
- cat("Finished iteration (delta = ", tempDist, ")\n")
-}
-
-cat("Final centers:\n")
-writeLines(unlist(lapply(kPoints, paste, collapse = " ")))
diff --git a/examples/src/main/r/linear_solver_mnist.R b/examples/src/main/r/linear_solver_mnist.R
deleted file mode 100644
index c864a4232d..0000000000
--- a/examples/src/main/r/linear_solver_mnist.R
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Instructions: https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2
-
-library(SparkR)
-library(Matrix)
-
-args <- commandArgs(trailing = TRUE)
-
-# number of random features; default to 1100
-D <- ifelse(length(args) > 0, as.integer(args[[1]]), 1100)
-# number of partitions for training dataset
-trainParts <- 12
-# dimension of digits
-d <- 784
-# number of test examples
-NTrain <- 60000
-# number of training examples
-NTest <- 10000
-# scale of features
-gamma <- 4e-4
-
-sc <- sparkR.init(appName = "SparkR-LinearSolver")
-
-# You can also use HDFS path to speed things up:
-# hdfs://<master>/train-mnist-dense-with-labels.data
-file <- textFile(sc, "/data/train-mnist-dense-with-labels.data", trainParts)
-
-W <- gamma * matrix(nrow=D, ncol=d, data=rnorm(D*d))
-b <- 2 * pi * matrix(nrow=D, ncol=1, data=runif(D))
-broadcastW <- broadcast(sc, W)
-broadcastB <- broadcast(sc, b)
-
-includePackage(sc, Matrix)
-numericLines <- lapplyPartitionsWithIndex(file,
- function(split, part) {
- matList <- sapply(part, function(line) {
- as.numeric(strsplit(line, ",", fixed=TRUE)[[1]])
- }, simplify=FALSE)
- mat <- Matrix(ncol=d+1, data=unlist(matList, F, F),
- sparse=T, byrow=T)
- mat
- })
-
-featureLabels <- cache(lapplyPartition(
- numericLines,
- function(part) {
- label <- part[,1]
- mat <- part[,-1]
- ones <- rep(1, nrow(mat))
- features <- cos(
- mat %*% t(value(broadcastW)) + (matrix(ncol=1, data=ones) %*% t(value(broadcastB))))
- onesMat <- Matrix(ones)
- featuresPlus <- cBind(features, onesMat)
- labels <- matrix(nrow=nrow(mat), ncol=10, data=-1)
- for (i in 1:nrow(mat)) {
- labels[i, label[i]] <- 1
- }
- list(label=labels, features=featuresPlus)
- }))
-
-FTF <- Reduce("+", collect(lapplyPartition(featureLabels,
- function(part) {
- t(part$features) %*% part$features
- }), flatten=F))
-
-FTY <- Reduce("+", collect(lapplyPartition(featureLabels,
- function(part) {
- t(part$features) %*% part$label
- }), flatten=F))
-
-# solve for the coefficient matrix
-C <- solve(FTF, FTY)
-
-test <- Matrix(as.matrix(read.csv("/data/test-mnist-dense-with-labels.data",
- header=F), sparse=T))
-testData <- test[,-1]
-testLabels <- matrix(ncol=1, test[,1])
-
-err <- 0
-
-# contstruct the feature maps for all examples from this digit
-featuresTest <- cos(testData %*% t(value(broadcastW)) +
- (matrix(ncol=1, data=rep(1, NTest)) %*% t(value(broadcastB))))
-featuresTest <- cBind(featuresTest, Matrix(rep(1, NTest)))
-
-# extract the one vs. all assignment
-results <- featuresTest %*% C
-labelsGot <- apply(results, 1, which.max)
-err <- sum(testLabels != labelsGot) / nrow(testLabels)
-
-cat("\nFinished running. The error rate is: ", err, ".\n")
diff --git a/examples/src/main/r/logistic_regression.R b/examples/src/main/r/logistic_regression.R
deleted file mode 100644
index 2a86aa9816..0000000000
--- a/examples/src/main/r/logistic_regression.R
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 3) {
- print("Usage: logistic_regression <file> <iters> <dimension>")
- q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "LogisticRegressionR")
-iterations <- as.integer(args[[2]])
-D <- as.integer(args[[3]])
-
-readPartition <- function(part){
- part = strsplit(part, " ", fixed = T)
- list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]])))
-}
-
-# Read data points and convert each partition to a matrix
-points <- cache(lapplyPartition(textFile(sc, args[[1]]), readPartition))
-
-# Initialize w to a random value
-w <- runif(n=D, min = -1, max = 1)
-cat("Initial w: ", w, "\n")
-
-# Compute logistic regression gradient for a matrix of data points
-gradient <- function(partition) {
- partition = partition[[1]]
- Y <- partition[, 1] # point labels (first column of input file)
- X <- partition[, -1] # point coordinates
-
- # For each point (x, y), compute gradient function
- dot <- X %*% w
- logit <- 1 / (1 + exp(-Y * dot))
- grad <- t(X) %*% ((logit - 1) * Y)
- list(grad)
-}
-
-for (i in 1:iterations) {
- cat("On iteration ", i, "\n")
- w <- w - reduce(lapplyPartition(points, gradient), "+")
-}
-
-cat("Final w: ", w, "\n")
diff --git a/examples/src/main/r/pi.R b/examples/src/main/r/pi.R
deleted file mode 100644
index aa7a833e14..0000000000
--- a/examples/src/main/r/pi.R
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-sc <- sparkR.init(appName = "PiR")
-
-slices <- ifelse(length(args) > 1, as.integer(args[[2]]), 2)
-
-n <- 100000 * slices
-
-piFunc <- function(elem) {
- rands <- runif(n = 2, min = -1, max = 1)
- val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
- val
-}
-
-
-piFuncVec <- function(elems) {
- message(length(elems))
- rands1 <- runif(n = length(elems), min = -1, max = 1)
- rands2 <- runif(n = length(elems), min = -1, max = 1)
- val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
- sum(val)
-}
-
-rdd <- parallelize(sc, 1:n, slices)
-count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
-cat("Pi is roughly", 4.0 * count / n, "\n")
-cat("Num elements in RDD ", count(rdd), "\n")
diff --git a/examples/src/main/r/wordcount.R b/examples/src/main/r/wordcount.R
deleted file mode 100644
index b734cb0ecf..0000000000
--- a/examples/src/main/r/wordcount.R
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 1) {
- print("Usage: wordcount <file>")
- q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "RwordCount")
-lines <- textFile(sc, args[[1]])
-
-words <- flatMap(lines,
- function(line) {
- strsplit(line, " ")[[1]]
- })
-wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-counts <- reduceByKey(wordCount, "+", 2L)
-output <- collect(counts)
-
-for (wordcount in output) {
- cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
-}