From b72bb62d421840f82d663c6b8e3922bd14383fbb Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 18 Aug 2016 05:33:52 -0700 Subject: [SPARK-16447][ML][SPARKR] LDA wrapper in SparkR ## What changes were proposed in this pull request? Add LDA Wrapper in SparkR with the following interfaces: - spark.lda(data, ...) - spark.posterior(object, newData, ...) - spark.perplexity(object, ...) - summary(object) - write.ml(object) - read.ml(path) ## How was this patch tested? Test with SparkR unit test. Author: Xusen Yin Closes #14229 from yinxusen/SPARK-16447. --- R/pkg/NAMESPACE | 3 + R/pkg/R/generics.R | 14 +++ R/pkg/R/mllib.R | 166 ++++++++++++++++++++++++++++++++- R/pkg/inst/tests/testthat/test_mllib.R | 87 +++++++++++++++++ 4 files changed, 268 insertions(+), 2 deletions(-) (limited to 'R') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index c71eec5ce0..4404cffc29 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -25,6 +25,9 @@ exportMethods("glm", "fitted", "spark.naiveBayes", "spark.survreg", + "spark.lda", + "spark.posterior", + "spark.perplexity", "spark.isoreg", "spark.gaussianMixture") diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 06bb25d62d..fe04bcfc7d 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1304,6 +1304,19 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s #' @export setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") }) +#' @rdname spark.lda +#' @param ... Additional parameters to tune LDA. +#' @export +setGeneric("spark.lda", function(data, ...) { standardGeneric("spark.lda") }) + +#' @rdname spark.lda +#' @export +setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark.posterior") }) + +#' @rdname spark.lda +#' @export +setGeneric("spark.perplexity", function(object, data) { standardGeneric("spark.perplexity") }) + #' @rdname spark.isoreg #' @export setGeneric("spark.isoreg", function(data, formula, ...) { standardGeneric("spark.isoreg") }) @@ -1315,6 +1328,7 @@ setGeneric("spark.gaussianMixture", standardGeneric("spark.gaussianMixture") }) +#' write.ml #' @rdname write.ml #' @export setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") }) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index db74046056..b9527410a9 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -39,6 +39,13 @@ setClass("GeneralizedLinearRegressionModel", representation(jobj = "jobj")) #' @note NaiveBayesModel since 2.0.0 setClass("NaiveBayesModel", representation(jobj = "jobj")) +#' S4 class that represents an LDAModel +#' +#' @param jobj a Java object reference to the backing Scala LDAWrapper +#' @export +#' @note LDAModel since 2.1.0 +setClass("LDAModel", representation(jobj = "jobj")) + #' S4 class that represents a AFTSurvivalRegressionModel #' #' @param jobj a Java object reference to the backing Scala AFTSurvivalRegressionWrapper @@ -75,7 +82,7 @@ setClass("GaussianMixtureModel", representation(jobj = "jobj")) #' @name write.ml #' @export #' @seealso \link{spark.glm}, \link{glm}, \link{spark.gaussianMixture} -#' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg} +#' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg}, \link{spark.lda} #' @seealso \link{spark.isoreg} #' @seealso \link{read.ml} NULL @@ -315,6 +322,94 @@ setMethod("summary", signature(object = "NaiveBayesModel"), return(list(apriori = apriori, tables = tables)) }) +# Returns posterior probabilities from a Latent Dirichlet Allocation model produced by spark.lda() + +#' @param newData A SparkDataFrame for testing +#' @return \code{spark.posterior} returns a SparkDataFrame containing posterior probabilities +#' vectors named "topicDistribution" +#' @rdname spark.lda +#' @aliases spark.posterior,LDAModel,SparkDataFrame-method +#' @export +#' @note spark.posterior(LDAModel) since 2.1.0 +setMethod("spark.posterior", signature(object = "LDAModel", newData = "SparkDataFrame"), + function(object, newData) { + return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) + }) + +# Returns the summary of a Latent Dirichlet Allocation model produced by \code{spark.lda} + +#' @param object A Latent Dirichlet Allocation model fitted by \code{spark.lda}. +#' @param maxTermsPerTopic Maximum number of terms to collect for each topic. Default value of 10. +#' @return \code{summary} returns a list containing +#' \item{\code{docConcentration}}{concentration parameter commonly named \code{alpha} for +#' the prior placed on documents distributions over topics \code{theta}} +#' \item{\code{topicConcentration}}{concentration parameter commonly named \code{beta} or +#' \code{eta} for the prior placed on topic distributions over terms} +#' \item{\code{logLikelihood}}{log likelihood of the entire corpus} +#' \item{\code{logPerplexity}}{log perplexity} +#' \item{\code{isDistributed}}{TRUE for distributed model while FALSE for local model} +#' \item{\code{vocabSize}}{number of terms in the corpus} +#' \item{\code{topics}}{top 10 terms and their weights of all topics} +#' \item{\code{vocabulary}}{whole terms of the training corpus, NULL if libsvm format file +#' used as training set} +#' @rdname spark.lda +#' @aliases summary,LDAModel-method +#' @export +#' @note summary(LDAModel) since 2.1.0 +setMethod("summary", signature(object = "LDAModel"), + function(object, maxTermsPerTopic) { + maxTermsPerTopic <- as.integer(ifelse(missing(maxTermsPerTopic), 10, maxTermsPerTopic)) + jobj <- object@jobj + docConcentration <- callJMethod(jobj, "docConcentration") + topicConcentration <- callJMethod(jobj, "topicConcentration") + logLikelihood <- callJMethod(jobj, "logLikelihood") + logPerplexity <- callJMethod(jobj, "logPerplexity") + isDistributed <- callJMethod(jobj, "isDistributed") + vocabSize <- callJMethod(jobj, "vocabSize") + topics <- dataFrame(callJMethod(jobj, "topics", maxTermsPerTopic)) + vocabulary <- callJMethod(jobj, "vocabulary") + return(list(docConcentration = unlist(docConcentration), + topicConcentration = topicConcentration, + logLikelihood = logLikelihood, logPerplexity = logPerplexity, + isDistributed = isDistributed, vocabSize = vocabSize, + topics = topics, + vocabulary = unlist(vocabulary))) + }) + +# Returns the log perplexity of a Latent Dirichlet Allocation model produced by \code{spark.lda} + +#' @return \code{spark.perplexity} returns the log perplexity of given SparkDataFrame, or the log +#' perplexity of the training data if missing argument "data". +#' @rdname spark.lda +#' @aliases spark.perplexity,LDAModel-method +#' @export +#' @note spark.perplexity(LDAModel) since 2.1.0 +setMethod("spark.perplexity", signature(object = "LDAModel", data = "SparkDataFrame"), + function(object, data) { + return(ifelse(missing(data), callJMethod(object@jobj, "logPerplexity"), + callJMethod(object@jobj, "computeLogPerplexity", data@sdf))) + }) + +# Saves the Latent Dirichlet Allocation model to the input path. + +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @rdname spark.lda +#' @aliases write.ml,LDAModel,character-method +#' @export +#' @seealso \link{read.ml} +#' @note write.ml(LDAModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "LDAModel", path = "character"), + function(object, path, overwrite = FALSE) { + writer <- callJMethod(object@jobj, "write") + if (overwrite) { + writer <- callJMethod(writer, "overwrite") + } + invisible(callJMethod(writer, "save", path)) + }) + #' Isotonic Regression Model #' #' Fits an Isotonic Regression model against a Spark DataFrame, similarly to R's isoreg(). @@ -700,6 +795,8 @@ read.ml <- function(path) { return(new("GeneralizedLinearRegressionModel", jobj = jobj)) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.KMeansWrapper")) { return(new("KMeansModel", jobj = jobj)) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) { + return(new("LDAModel", jobj = jobj)) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) { return(new("IsotonicRegressionModel", jobj = jobj)) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GaussianMixtureWrapper")) { @@ -751,6 +848,71 @@ setMethod("spark.survreg", signature(data = "SparkDataFrame", formula = "formula return(new("AFTSurvivalRegressionModel", jobj = jobj)) }) +#' Latent Dirichlet Allocation +#' +#' \code{spark.lda} fits a Latent Dirichlet Allocation model on a SparkDataFrame. Users can call +#' \code{summary} to get a summary of the fitted LDA model, \code{spark.posterior} to compute +#' posterior probabilities on new data, \code{spark.perplexity} to compute log perplexity on new +#' data and \code{write.ml}/\code{read.ml} to save/load fitted models. +#' +#' @param data A SparkDataFrame for training +#' @param features Features column name, default "features". Either libSVM-format column or +#' character-format column is valid. +#' @param k Number of topics, default 10 +#' @param maxIter Maximum iterations, default 20 +#' @param optimizer Optimizer to train an LDA model, "online" or "em", default "online" +#' @param subsamplingRate (For online optimizer) Fraction of the corpus to be sampled and used in +#' each iteration of mini-batch gradient descent, in range (0, 1], default 0.05 +#' @param topicConcentration concentration parameter (commonly named \code{beta} or \code{eta}) for +#' the prior placed on topic distributions over terms, default -1 to set automatically on the +#' Spark side. Use \code{summary} to retrieve the effective topicConcentration. Only 1-size +#' numeric is accepted. +#' @param docConcentration concentration parameter (commonly named \code{alpha}) for the +#' prior placed on documents distributions over topics (\code{theta}), default -1 to set +#' automatically on the Spark side. Use \code{summary} to retrieve the effective +#' docConcentration. Only 1-size or \code{k}-size numeric is accepted. +#' @param customizedStopWords stopwords that need to be removed from the given corpus. Ignore the +#' parameter if libSVM-format column is used as the features column. +#' @param maxVocabSize maximum vocabulary size, default 1 << 18 +#' @return \code{spark.lda} returns a fitted Latent Dirichlet Allocation model +#' @rdname spark.lda +#' @aliases spark.lda,SparkDataFrame-method +#' @seealso topicmodels: \url{https://cran.r-project.org/web/packages/topicmodels/} +#' @export +#' @examples +#' \dontrun{ +#' text <- read.df("path/to/data", source = "libsvm") +#' model <- spark.lda(data = text, optimizer = "em") +#' +#' # get a summary of the model +#' summary(model) +#' +#' # compute posterior probabilities +#' posterior <- spark.posterior(model, df) +#' showDF(posterior) +#' +#' # compute perplexity +#' perplexity <- spark.perplexity(model, df) +#' +#' # save and load the model +#' path <- "path/to/model" +#' write.ml(model, path) +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' } +#' @note spark.lda since 2.1.0 +setMethod("spark.lda", signature(data = "SparkDataFrame"), + function(data, features = "features", k = 10, maxIter = 20, optimizer = c("online", "em"), + subsamplingRate = 0.05, topicConcentration = -1, docConcentration = -1, + customizedStopWords = "", maxVocabSize = bitwShiftL(1, 18)) { + optimizer <- match.arg(optimizer) + jobj <- callJStatic("org.apache.spark.ml.r.LDAWrapper", "fit", data@sdf, features, + as.integer(k), as.integer(maxIter), optimizer, + as.numeric(subsamplingRate), topicConcentration, + as.array(docConcentration), as.array(customizedStopWords), + maxVocabSize) + return(new("LDAModel", jobj = jobj)) + }) # Returns a summary of the AFT survival regression model produced by spark.survreg, # similarly to R's summary(). @@ -891,4 +1053,4 @@ setMethod("summary", signature(object = "GaussianMixtureModel"), setMethod("predict", signature(object = "GaussianMixtureModel"), function(object, newData) { return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) - }) + }) \ No newline at end of file diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 96179864a8..8c380fbf15 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -570,4 +570,91 @@ test_that("spark.gaussianMixture", { unlink(modelPath) }) +test_that("spark.lda with libsvm", { + text <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm") + model <- spark.lda(text, optimizer = "em") + + stats <- summary(model, 10) + isDistributed <- stats$isDistributed + logLikelihood <- stats$logLikelihood + logPerplexity <- stats$logPerplexity + vocabSize <- stats$vocabSize + topics <- stats$topicTopTerms + weights <- stats$topicTopTermsWeights + vocabulary <- stats$vocabulary + + expect_false(isDistributed) + expect_true(logLikelihood <= 0 & is.finite(logLikelihood)) + expect_true(logPerplexity >= 0 & is.finite(logPerplexity)) + expect_equal(vocabSize, 11) + expect_true(is.null(vocabulary)) + + # Test model save/load + modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + stats2 <- summary(model2) + + expect_false(stats2$isDistributed) + expect_equal(logLikelihood, stats2$logLikelihood) + expect_equal(logPerplexity, stats2$logPerplexity) + expect_equal(vocabSize, stats2$vocabSize) + expect_equal(vocabulary, stats2$vocabulary) + + unlink(modelPath) +}) + +test_that("spark.lda with text input", { + text <- read.text("data/mllib/sample_lda_data.txt") + model <- spark.lda(text, optimizer = "online", features = "value") + + stats <- summary(model) + isDistributed <- stats$isDistributed + logLikelihood <- stats$logLikelihood + logPerplexity <- stats$logPerplexity + vocabSize <- stats$vocabSize + topics <- stats$topicTopTerms + weights <- stats$topicTopTermsWeights + vocabulary <- stats$vocabulary + + expect_false(isDistributed) + expect_true(logLikelihood <= 0 & is.finite(logLikelihood)) + expect_true(logPerplexity >= 0 & is.finite(logPerplexity)) + expect_equal(vocabSize, 10) + expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"))) + + # Test model save/load + modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + stats2 <- summary(model2) + + expect_false(stats2$isDistributed) + expect_equal(logLikelihood, stats2$logLikelihood) + expect_equal(logPerplexity, stats2$logPerplexity) + expect_equal(vocabSize, stats2$vocabSize) + expect_true(all.equal(vocabulary, stats2$vocabulary)) + + unlink(modelPath) +}) + +test_that("spark.posterior and spark.perplexity", { + text <- read.text("data/mllib/sample_lda_data.txt") + model <- spark.lda(text, features = "value", k = 3) + + # Assert perplexities are equal + stats <- summary(model) + logPerplexity <- spark.perplexity(model, text) + expect_equal(logPerplexity, stats$logPerplexity) + + # Assert the sum of every topic distribution is equal to 1 + posterior <- spark.posterior(model, text) + local.posterior <- collect(posterior)$topicDistribution + expect_equal(length(local.posterior), sum(unlist(local.posterior))) +}) + sparkR.session.stop() -- cgit v1.2.3