aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorXusen Yin <yinxusen@gmail.com>2016-08-18 05:33:52 -0700
committerFelix Cheung <felixcheung@apache.org>2016-08-18 05:33:52 -0700
commitb72bb62d421840f82d663c6b8e3922bd14383fbb (patch)
tree1445a4e605794d84a606661dcfbd68decb3df657 /R
parent68f5087d2107d6afec5d5745f0cb0e9e3bdd6a0b (diff)
downloadspark-b72bb62d421840f82d663c6b8e3922bd14383fbb.tar.gz
spark-b72bb62d421840f82d663c6b8e3922bd14383fbb.tar.bz2
spark-b72bb62d421840f82d663c6b8e3922bd14383fbb.zip
[SPARK-16447][ML][SPARKR] LDA wrapper in SparkR
## What changes were proposed in this pull request? Add LDA Wrapper in SparkR with the following interfaces: - spark.lda(data, ...) - spark.posterior(object, newData, ...) - spark.perplexity(object, ...) - summary(object) - write.ml(object) - read.ml(path) ## How was this patch tested? Test with SparkR unit test. Author: Xusen Yin <yinxusen@gmail.com> Closes #14229 from yinxusen/SPARK-16447.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE3
-rw-r--r--R/pkg/R/generics.R14
-rw-r--r--R/pkg/R/mllib.R166
-rw-r--r--R/pkg/inst/tests/testthat/test_mllib.R87
4 files changed, 268 insertions, 2 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index c71eec5ce0..4404cffc29 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -25,6 +25,9 @@ exportMethods("glm",
"fitted",
"spark.naiveBayes",
"spark.survreg",
+ "spark.lda",
+ "spark.posterior",
+ "spark.perplexity",
"spark.isoreg",
"spark.gaussianMixture")
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 06bb25d62d..fe04bcfc7d 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1304,6 +1304,19 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s
#' @export
setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") })
+#' @rdname spark.lda
+#' @param ... Additional parameters to tune LDA.
+#' @export
+setGeneric("spark.lda", function(data, ...) { standardGeneric("spark.lda") })
+
+#' @rdname spark.lda
+#' @export
+setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark.posterior") })
+
+#' @rdname spark.lda
+#' @export
+setGeneric("spark.perplexity", function(object, data) { standardGeneric("spark.perplexity") })
+
#' @rdname spark.isoreg
#' @export
setGeneric("spark.isoreg", function(data, formula, ...) { standardGeneric("spark.isoreg") })
@@ -1315,6 +1328,7 @@ setGeneric("spark.gaussianMixture",
standardGeneric("spark.gaussianMixture")
})
+#' write.ml
#' @rdname write.ml
#' @export
setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") })
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index db74046056..b9527410a9 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -39,6 +39,13 @@ setClass("GeneralizedLinearRegressionModel", representation(jobj = "jobj"))
#' @note NaiveBayesModel since 2.0.0
setClass("NaiveBayesModel", representation(jobj = "jobj"))
+#' S4 class that represents an LDAModel
+#'
+#' @param jobj a Java object reference to the backing Scala LDAWrapper
+#' @export
+#' @note LDAModel since 2.1.0
+setClass("LDAModel", representation(jobj = "jobj"))
+
#' S4 class that represents a AFTSurvivalRegressionModel
#'
#' @param jobj a Java object reference to the backing Scala AFTSurvivalRegressionWrapper
@@ -75,7 +82,7 @@ setClass("GaussianMixtureModel", representation(jobj = "jobj"))
#' @name write.ml
#' @export
#' @seealso \link{spark.glm}, \link{glm}, \link{spark.gaussianMixture}
-#' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg}
+#' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg}, \link{spark.lda}
#' @seealso \link{spark.isoreg}
#' @seealso \link{read.ml}
NULL
@@ -315,6 +322,94 @@ setMethod("summary", signature(object = "NaiveBayesModel"),
return(list(apriori = apriori, tables = tables))
})
+# Returns posterior probabilities from a Latent Dirichlet Allocation model produced by spark.lda()
+
+#' @param newData A SparkDataFrame for testing
+#' @return \code{spark.posterior} returns a SparkDataFrame containing posterior probabilities
+#' vectors named "topicDistribution"
+#' @rdname spark.lda
+#' @aliases spark.posterior,LDAModel,SparkDataFrame-method
+#' @export
+#' @note spark.posterior(LDAModel) since 2.1.0
+setMethod("spark.posterior", signature(object = "LDAModel", newData = "SparkDataFrame"),
+ function(object, newData) {
+ return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
+ })
+
+# Returns the summary of a Latent Dirichlet Allocation model produced by \code{spark.lda}
+
+#' @param object A Latent Dirichlet Allocation model fitted by \code{spark.lda}.
+#' @param maxTermsPerTopic Maximum number of terms to collect for each topic. Default value of 10.
+#' @return \code{summary} returns a list containing
+#' \item{\code{docConcentration}}{concentration parameter commonly named \code{alpha} for
+#' the prior placed on documents distributions over topics \code{theta}}
+#' \item{\code{topicConcentration}}{concentration parameter commonly named \code{beta} or
+#' \code{eta} for the prior placed on topic distributions over terms}
+#' \item{\code{logLikelihood}}{log likelihood of the entire corpus}
+#' \item{\code{logPerplexity}}{log perplexity}
+#' \item{\code{isDistributed}}{TRUE for distributed model while FALSE for local model}
+#' \item{\code{vocabSize}}{number of terms in the corpus}
+#' \item{\code{topics}}{top 10 terms and their weights of all topics}
+#' \item{\code{vocabulary}}{whole terms of the training corpus, NULL if libsvm format file
+#' used as training set}
+#' @rdname spark.lda
+#' @aliases summary,LDAModel-method
+#' @export
+#' @note summary(LDAModel) since 2.1.0
+setMethod("summary", signature(object = "LDAModel"),
+ function(object, maxTermsPerTopic) {
+ maxTermsPerTopic <- as.integer(ifelse(missing(maxTermsPerTopic), 10, maxTermsPerTopic))
+ jobj <- object@jobj
+ docConcentration <- callJMethod(jobj, "docConcentration")
+ topicConcentration <- callJMethod(jobj, "topicConcentration")
+ logLikelihood <- callJMethod(jobj, "logLikelihood")
+ logPerplexity <- callJMethod(jobj, "logPerplexity")
+ isDistributed <- callJMethod(jobj, "isDistributed")
+ vocabSize <- callJMethod(jobj, "vocabSize")
+ topics <- dataFrame(callJMethod(jobj, "topics", maxTermsPerTopic))
+ vocabulary <- callJMethod(jobj, "vocabulary")
+ return(list(docConcentration = unlist(docConcentration),
+ topicConcentration = topicConcentration,
+ logLikelihood = logLikelihood, logPerplexity = logPerplexity,
+ isDistributed = isDistributed, vocabSize = vocabSize,
+ topics = topics,
+ vocabulary = unlist(vocabulary)))
+ })
+
+# Returns the log perplexity of a Latent Dirichlet Allocation model produced by \code{spark.lda}
+
+#' @return \code{spark.perplexity} returns the log perplexity of given SparkDataFrame, or the log
+#' perplexity of the training data if missing argument "data".
+#' @rdname spark.lda
+#' @aliases spark.perplexity,LDAModel-method
+#' @export
+#' @note spark.perplexity(LDAModel) since 2.1.0
+setMethod("spark.perplexity", signature(object = "LDAModel", data = "SparkDataFrame"),
+ function(object, data) {
+ return(ifelse(missing(data), callJMethod(object@jobj, "logPerplexity"),
+ callJMethod(object@jobj, "computeLogPerplexity", data@sdf)))
+ })
+
+# Saves the Latent Dirichlet Allocation model to the input path.
+
+#' @param path The directory where the model is saved
+#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE
+#' which means throw exception if the output path exists.
+#'
+#' @rdname spark.lda
+#' @aliases write.ml,LDAModel,character-method
+#' @export
+#' @seealso \link{read.ml}
+#' @note write.ml(LDAModel, character) since 2.1.0
+setMethod("write.ml", signature(object = "LDAModel", path = "character"),
+ function(object, path, overwrite = FALSE) {
+ writer <- callJMethod(object@jobj, "write")
+ if (overwrite) {
+ writer <- callJMethod(writer, "overwrite")
+ }
+ invisible(callJMethod(writer, "save", path))
+ })
+
#' Isotonic Regression Model
#'
#' Fits an Isotonic Regression model against a Spark DataFrame, similarly to R's isoreg().
@@ -700,6 +795,8 @@ read.ml <- function(path) {
return(new("GeneralizedLinearRegressionModel", jobj = jobj))
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.KMeansWrapper")) {
return(new("KMeansModel", jobj = jobj))
+ } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) {
+ return(new("LDAModel", jobj = jobj))
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) {
return(new("IsotonicRegressionModel", jobj = jobj))
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GaussianMixtureWrapper")) {
@@ -751,6 +848,71 @@ setMethod("spark.survreg", signature(data = "SparkDataFrame", formula = "formula
return(new("AFTSurvivalRegressionModel", jobj = jobj))
})
+#' Latent Dirichlet Allocation
+#'
+#' \code{spark.lda} fits a Latent Dirichlet Allocation model on a SparkDataFrame. Users can call
+#' \code{summary} to get a summary of the fitted LDA model, \code{spark.posterior} to compute
+#' posterior probabilities on new data, \code{spark.perplexity} to compute log perplexity on new
+#' data and \code{write.ml}/\code{read.ml} to save/load fitted models.
+#'
+#' @param data A SparkDataFrame for training
+#' @param features Features column name, default "features". Either libSVM-format column or
+#' character-format column is valid.
+#' @param k Number of topics, default 10
+#' @param maxIter Maximum iterations, default 20
+#' @param optimizer Optimizer to train an LDA model, "online" or "em", default "online"
+#' @param subsamplingRate (For online optimizer) Fraction of the corpus to be sampled and used in
+#' each iteration of mini-batch gradient descent, in range (0, 1], default 0.05
+#' @param topicConcentration concentration parameter (commonly named \code{beta} or \code{eta}) for
+#' the prior placed on topic distributions over terms, default -1 to set automatically on the
+#' Spark side. Use \code{summary} to retrieve the effective topicConcentration. Only 1-size
+#' numeric is accepted.
+#' @param docConcentration concentration parameter (commonly named \code{alpha}) for the
+#' prior placed on documents distributions over topics (\code{theta}), default -1 to set
+#' automatically on the Spark side. Use \code{summary} to retrieve the effective
+#' docConcentration. Only 1-size or \code{k}-size numeric is accepted.
+#' @param customizedStopWords stopwords that need to be removed from the given corpus. Ignore the
+#' parameter if libSVM-format column is used as the features column.
+#' @param maxVocabSize maximum vocabulary size, default 1 << 18
+#' @return \code{spark.lda} returns a fitted Latent Dirichlet Allocation model
+#' @rdname spark.lda
+#' @aliases spark.lda,SparkDataFrame-method
+#' @seealso topicmodels: \url{https://cran.r-project.org/web/packages/topicmodels/}
+#' @export
+#' @examples
+#' \dontrun{
+#' text <- read.df("path/to/data", source = "libsvm")
+#' model <- spark.lda(data = text, optimizer = "em")
+#'
+#' # get a summary of the model
+#' summary(model)
+#'
+#' # compute posterior probabilities
+#' posterior <- spark.posterior(model, df)
+#' showDF(posterior)
+#'
+#' # compute perplexity
+#' perplexity <- spark.perplexity(model, df)
+#'
+#' # save and load the model
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#' }
+#' @note spark.lda since 2.1.0
+setMethod("spark.lda", signature(data = "SparkDataFrame"),
+ function(data, features = "features", k = 10, maxIter = 20, optimizer = c("online", "em"),
+ subsamplingRate = 0.05, topicConcentration = -1, docConcentration = -1,
+ customizedStopWords = "", maxVocabSize = bitwShiftL(1, 18)) {
+ optimizer <- match.arg(optimizer)
+ jobj <- callJStatic("org.apache.spark.ml.r.LDAWrapper", "fit", data@sdf, features,
+ as.integer(k), as.integer(maxIter), optimizer,
+ as.numeric(subsamplingRate), topicConcentration,
+ as.array(docConcentration), as.array(customizedStopWords),
+ maxVocabSize)
+ return(new("LDAModel", jobj = jobj))
+ })
# Returns a summary of the AFT survival regression model produced by spark.survreg,
# similarly to R's summary().
@@ -891,4 +1053,4 @@ setMethod("summary", signature(object = "GaussianMixtureModel"),
setMethod("predict", signature(object = "GaussianMixtureModel"),
function(object, newData) {
return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
- })
+ }) \ No newline at end of file
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
index 96179864a8..8c380fbf15 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -570,4 +570,91 @@ test_that("spark.gaussianMixture", {
unlink(modelPath)
})
+test_that("spark.lda with libsvm", {
+ text <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
+ model <- spark.lda(text, optimizer = "em")
+
+ stats <- summary(model, 10)
+ isDistributed <- stats$isDistributed
+ logLikelihood <- stats$logLikelihood
+ logPerplexity <- stats$logPerplexity
+ vocabSize <- stats$vocabSize
+ topics <- stats$topicTopTerms
+ weights <- stats$topicTopTermsWeights
+ vocabulary <- stats$vocabulary
+
+ expect_false(isDistributed)
+ expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
+ expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
+ expect_equal(vocabSize, 11)
+ expect_true(is.null(vocabulary))
+
+ # Test model save/load
+ modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+
+ expect_false(stats2$isDistributed)
+ expect_equal(logLikelihood, stats2$logLikelihood)
+ expect_equal(logPerplexity, stats2$logPerplexity)
+ expect_equal(vocabSize, stats2$vocabSize)
+ expect_equal(vocabulary, stats2$vocabulary)
+
+ unlink(modelPath)
+})
+
+test_that("spark.lda with text input", {
+ text <- read.text("data/mllib/sample_lda_data.txt")
+ model <- spark.lda(text, optimizer = "online", features = "value")
+
+ stats <- summary(model)
+ isDistributed <- stats$isDistributed
+ logLikelihood <- stats$logLikelihood
+ logPerplexity <- stats$logPerplexity
+ vocabSize <- stats$vocabSize
+ topics <- stats$topicTopTerms
+ weights <- stats$topicTopTermsWeights
+ vocabulary <- stats$vocabulary
+
+ expect_false(isDistributed)
+ expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
+ expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
+ expect_equal(vocabSize, 10)
+ expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")))
+
+ # Test model save/load
+ modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+
+ expect_false(stats2$isDistributed)
+ expect_equal(logLikelihood, stats2$logLikelihood)
+ expect_equal(logPerplexity, stats2$logPerplexity)
+ expect_equal(vocabSize, stats2$vocabSize)
+ expect_true(all.equal(vocabulary, stats2$vocabulary))
+
+ unlink(modelPath)
+})
+
+test_that("spark.posterior and spark.perplexity", {
+ text <- read.text("data/mllib/sample_lda_data.txt")
+ model <- spark.lda(text, features = "value", k = 3)
+
+ # Assert perplexities are equal
+ stats <- summary(model)
+ logPerplexity <- spark.perplexity(model, text)
+ expect_equal(logPerplexity, stats$logPerplexity)
+
+ # Assert the sum of every topic distribution is equal to 1
+ posterior <- spark.posterior(model, text)
+ local.posterior <- collect(posterior)$topicDistribution
+ expect_equal(length(local.posterior), sum(unlist(local.posterior)))
+})
+
sparkR.session.stop()