From 0c99c23b7d9f0c3538cd2b062d551411712a2bcc Mon Sep 17 00:00:00 2001 From: "Oscar D. Lara Yejas" Date: Tue, 26 Apr 2016 15:34:30 -0700 Subject: [SPARK-13734][SPARKR] Added histogram function ## What changes were proposed in this pull request? Added method histogram() to compute the histogram of a Column Usage: ``` ## Create a DataFrame from the Iris dataset irisDF <- createDataFrame(sqlContext, iris) ## Render a histogram for the Sepal_Length column histogram(irisDF, "Sepal_Length", nbins=12) ``` ![histogram](https://cloud.githubusercontent.com/assets/13985649/13588486/e1e751c6-e484-11e5-85db-2fc2115c4bb2.png) Note: Usage will change once SPARK-9325 is figured out so that histogram() only takes a Column as a parameter, as opposed to a DataFrame and a name ## How was this patch tested? All unit tests pass. I added specific unit cases for different scenarios. Author: Oscar D. Lara Yejas Author: Oscar D. Lara Yejas Closes #11569 from olarayej/SPARK-13734. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 120 ++++++++++++++++++++++++++++++ R/pkg/R/generics.R | 4 + R/pkg/inst/tests/testthat/test_sparkSQL.R | 45 +++++++++++ 4 files changed, 170 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index c0a63d6b3e..ea31baed3d 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -173,6 +173,7 @@ exportMethods("%in%", "getItem", "greatest", "hex", + "histogram", "hour", "hypot", "ifelse", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 890d15dfee..36aedfae86 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2469,6 +2469,126 @@ setMethod("drop", base::drop(x) }) +#' This function computes a histogram for a given SparkR Column. +#' +#' @name histogram +#' @title Histogram +#' @param nbins the number of bins (optional). Default value is 10. +#' @param df the SparkDataFrame containing the Column to build the histogram from. +#' @param colname the name of the column to build the histogram from. +#' @return a data.frame with the histogram statistics, i.e., counts and centroids. +#' @rdname histogram +#' @family SparkDataFrame functions +#' @export +#' @examples +#' \dontrun{ +#' +#' # Create a SparkDataFrame from the Iris dataset +#' irisDF <- createDataFrame(sqlContext, iris) +#' +#' # Compute histogram statistics +#' histStats <- histogram(irisDF, irisDF$Sepal_Length, nbins = 12) +#' +#' # Once SparkR has computed the histogram statistics, the histogram can be +#' # rendered using the ggplot2 library: +#' +#' require(ggplot2) +#' plot <- ggplot(histStats, aes(x = centroids, y = counts)) + +#' geom_bar(stat = "identity") + +#' xlab("Sepal_Length") + ylab("Frequency") +#' } +setMethod("histogram", + signature(df = "SparkDataFrame", col = "characterOrColumn"), + function(df, col, nbins = 10) { + # Validate nbins + if (nbins < 2) { + stop("The number of bins must be a positive integer number greater than 1.") + } + + # Round nbins to the smallest integer + nbins <- floor(nbins) + + # Validate col + if (is.null(col)) { + stop("col must be specified.") + } + + colname <- col + x <- if (class(col) == "character") { + if (!colname %in% names(df)) { + stop("Specified colname does not belong to the given SparkDataFrame.") + } + + # Filter NA values in the target column and remove all other columns + df <- na.omit(df[, colname]) + getColumn(df, colname) + + } else if (class(col) == "Column") { + + # The given column needs to be appended to the SparkDataFrame so that we can + # use method describe() to compute statistics in one single pass. The new + # column must have a name that doesn't exist in the dataset. + # To do so, we generate a random column name with more characters than the + # longest colname in the dataset, but no more than 100 (think of a UUID). + # This column name will never be visible to the user, so the name is irrelevant. + # Limiting the colname length to 100 makes debugging easier and it does + # introduce a negligible probability of collision: assuming the user has 1 million + # columns AND all of them have names 100 characters long (which is very unlikely), + # AND they run 1 billion histograms, the probability of collision will roughly be + # 1 in 4.4 x 10 ^ 96 + colname <- paste(base:::sample(c(letters, LETTERS), + size = min(max(nchar(colnames(df))) + 1, 100), + replace = TRUE), + collapse = "") + + # Append the given column to the dataset. This is to support Columns that + # don't belong to the SparkDataFrame but are rather expressions + df <- withColumn(df, colname, col) + + # Filter NA values in the target column. Cannot remove all other columns + # since given Column may be an expression on one or more existing columns + df <- na.omit(df) + + col + } + + # At this point, df only has one column: the one to compute the histogram from + stats <- collect(describe(df[, colname])) + min <- as.numeric(stats[4, 2]) + max <- as.numeric(stats[5, 2]) + + # Normalize the data + xnorm <- (x - min) / (max - min) + + # Round the data to 4 significant digits. This is to avoid rounding issues. + xnorm <- cast(xnorm * 10000, "integer") / 10000.0 + + # Since min = 0, max = 1 (data is already normalized) + normBinSize <- 1 / nbins + binsize <- (max - min) / nbins + approxBins <- xnorm / normBinSize + + # Adjust values that are equal to the upper bound of each bin + bins <- cast(approxBins - + ifelse(approxBins == cast(approxBins, "integer") & x != min, 1, 0), + "integer") + + df$bins <- bins + histStats <- collect(count(groupBy(df, "bins"))) + names(histStats) <- c("bins", "counts") + + # Fill bins with zero counts + y <- data.frame("bins" = seq(0, nbins - 1)) + histStats <- merge(histStats, y, all.x = T, all.y = T) + histStats[is.na(histStats$count), 2] <- 0 + + # Compute centroids + histStats$centroids <- histStats$bins * binsize + min + binsize / 2 + + # Return the statistics + return(histStats) + }) + #' Saves the content of the SparkDataFrame to an external database table via JDBC #' #' Additional JDBC database connection properties can be set (...) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index f654d8330c..62907118ef 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -106,6 +106,10 @@ setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") }) # @export setGeneric("glom", function(x) { standardGeneric("glom") }) +# @rdname histogram +# @export +setGeneric("histogram", function(df, col, nbins=10) { standardGeneric("histogram") }) + # @rdname keyBy # @export setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 9244c5621b..336068035e 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1972,6 +1972,51 @@ test_that("Method str()", { expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris))) }) +test_that("Histogram", { + + # Basic histogram test with colname + expect_equal( + all(histogram(irisDF, "Petal_Width", 8) == + data.frame(bins = seq(0, 7), + counts = c(48, 2, 7, 21, 24, 19, 15, 14), + centroids = seq(0, 7) * 0.3 + 0.25)), + TRUE) + + # Basic histogram test with Column + expect_equal( + all(histogram(irisDF, irisDF$Petal_Width, 8) == + data.frame(bins = seq(0, 7), + counts = c(48, 2, 7, 21, 24, 19, 15, 14), + centroids = seq(0, 7) * 0.3 + 0.25)), + TRUE) + + # Basic histogram test with derived column + expect_equal( + all(round(histogram(irisDF, irisDF$Petal_Width + 1, 8), 2) == + data.frame(bins = seq(0, 7), + counts = c(48, 2, 7, 21, 24, 19, 15, 14), + centroids = seq(0, 7) * 0.3 + 1.25)), + TRUE) + + # Missing nbins + expect_equal(length(histogram(irisDF, "Petal_Width")$counts), 10) + + # Wrong colname + expect_error(histogram(irisDF, "xxx"), + "Specified colname does not belong to the given SparkDataFrame.") + + # Invalid nbins + expect_error(histogram(irisDF, "Petal_Width", nbins = 0), + "The number of bins must be a positive integer number greater than 1.") + + # Test against R's hist + expect_equal(all(hist(iris$Sepal.Width)$counts == + histogram(irisDF, "Sepal_Width", 12)$counts), T) + + # Test when there are zero counts + df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100))) + expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1)) +}) unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) -- cgit v1.2.3