aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOscar D. Lara Yejas <odlaraye@oscars-mbp.usca.ibm.com>2016-04-26 15:34:30 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-04-26 15:34:30 -0700
commit0c99c23b7d9f0c3538cd2b062d551411712a2bcc (patch)
treefc7d7cc02559756f50b3dd4a5f262e5aa822412e
parent75879ac3c07f3b1a708f4392429335feb06f271b (diff)
downloadspark-0c99c23b7d9f0c3538cd2b062d551411712a2bcc.tar.gz
spark-0c99c23b7d9f0c3538cd2b062d551411712a2bcc.tar.bz2
spark-0c99c23b7d9f0c3538cd2b062d551411712a2bcc.zip
[SPARK-13734][SPARKR] Added histogram function
## What changes were proposed in this pull request? Added method histogram() to compute the histogram of a Column Usage: ``` ## Create a DataFrame from the Iris dataset irisDF <- createDataFrame(sqlContext, iris) ## Render a histogram for the Sepal_Length column histogram(irisDF, "Sepal_Length", nbins=12) ``` ![histogram](https://cloud.githubusercontent.com/assets/13985649/13588486/e1e751c6-e484-11e5-85db-2fc2115c4bb2.png) Note: Usage will change once SPARK-9325 is figured out so that histogram() only takes a Column as a parameter, as opposed to a DataFrame and a name ## How was this patch tested? All unit tests pass. I added specific unit cases for different scenarios. Author: Oscar D. Lara Yejas <odlaraye@oscars-mbp.usca.ibm.com> Author: Oscar D. Lara Yejas <odlaraye@oscars-mbp.attlocal.net> Closes #11569 from olarayej/SPARK-13734.
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R120
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R45
4 files changed, 170 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index c0a63d6b3e..ea31baed3d 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -173,6 +173,7 @@ exportMethods("%in%",
"getItem",
"greatest",
"hex",
+ "histogram",
"hour",
"hypot",
"ifelse",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 890d15dfee..36aedfae86 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2469,6 +2469,126 @@ setMethod("drop",
base::drop(x)
})
+#' This function computes a histogram for a given SparkR Column.
+#'
+#' @name histogram
+#' @title Histogram
+#' @param nbins the number of bins (optional). Default value is 10.
+#' @param df the SparkDataFrame containing the Column to build the histogram from.
+#' @param colname the name of the column to build the histogram from.
+#' @return a data.frame with the histogram statistics, i.e., counts and centroids.
+#' @rdname histogram
+#' @family SparkDataFrame functions
+#' @export
+#' @examples
+#' \dontrun{
+#'
+#' # Create a SparkDataFrame from the Iris dataset
+#' irisDF <- createDataFrame(sqlContext, iris)
+#'
+#' # Compute histogram statistics
+#' histStats <- histogram(irisDF, irisDF$Sepal_Length, nbins = 12)
+#'
+#' # Once SparkR has computed the histogram statistics, the histogram can be
+#' # rendered using the ggplot2 library:
+#'
+#' require(ggplot2)
+#' plot <- ggplot(histStats, aes(x = centroids, y = counts)) +
+#' geom_bar(stat = "identity") +
+#' xlab("Sepal_Length") + ylab("Frequency")
+#' }
+setMethod("histogram",
+ signature(df = "SparkDataFrame", col = "characterOrColumn"),
+ function(df, col, nbins = 10) {
+ # Validate nbins
+ if (nbins < 2) {
+ stop("The number of bins must be a positive integer number greater than 1.")
+ }
+
+ # Round nbins to the smallest integer
+ nbins <- floor(nbins)
+
+ # Validate col
+ if (is.null(col)) {
+ stop("col must be specified.")
+ }
+
+ colname <- col
+ x <- if (class(col) == "character") {
+ if (!colname %in% names(df)) {
+ stop("Specified colname does not belong to the given SparkDataFrame.")
+ }
+
+ # Filter NA values in the target column and remove all other columns
+ df <- na.omit(df[, colname])
+ getColumn(df, colname)
+
+ } else if (class(col) == "Column") {
+
+ # The given column needs to be appended to the SparkDataFrame so that we can
+ # use method describe() to compute statistics in one single pass. The new
+ # column must have a name that doesn't exist in the dataset.
+ # To do so, we generate a random column name with more characters than the
+ # longest colname in the dataset, but no more than 100 (think of a UUID).
+ # This column name will never be visible to the user, so the name is irrelevant.
+ # Limiting the colname length to 100 makes debugging easier and it does
+ # introduce a negligible probability of collision: assuming the user has 1 million
+ # columns AND all of them have names 100 characters long (which is very unlikely),
+ # AND they run 1 billion histograms, the probability of collision will roughly be
+ # 1 in 4.4 x 10 ^ 96
+ colname <- paste(base:::sample(c(letters, LETTERS),
+ size = min(max(nchar(colnames(df))) + 1, 100),
+ replace = TRUE),
+ collapse = "")
+
+ # Append the given column to the dataset. This is to support Columns that
+ # don't belong to the SparkDataFrame but are rather expressions
+ df <- withColumn(df, colname, col)
+
+ # Filter NA values in the target column. Cannot remove all other columns
+ # since given Column may be an expression on one or more existing columns
+ df <- na.omit(df)
+
+ col
+ }
+
+ # At this point, df only has one column: the one to compute the histogram from
+ stats <- collect(describe(df[, colname]))
+ min <- as.numeric(stats[4, 2])
+ max <- as.numeric(stats[5, 2])
+
+ # Normalize the data
+ xnorm <- (x - min) / (max - min)
+
+ # Round the data to 4 significant digits. This is to avoid rounding issues.
+ xnorm <- cast(xnorm * 10000, "integer") / 10000.0
+
+ # Since min = 0, max = 1 (data is already normalized)
+ normBinSize <- 1 / nbins
+ binsize <- (max - min) / nbins
+ approxBins <- xnorm / normBinSize
+
+ # Adjust values that are equal to the upper bound of each bin
+ bins <- cast(approxBins -
+ ifelse(approxBins == cast(approxBins, "integer") & x != min, 1, 0),
+ "integer")
+
+ df$bins <- bins
+ histStats <- collect(count(groupBy(df, "bins")))
+ names(histStats) <- c("bins", "counts")
+
+ # Fill bins with zero counts
+ y <- data.frame("bins" = seq(0, nbins - 1))
+ histStats <- merge(histStats, y, all.x = T, all.y = T)
+ histStats[is.na(histStats$count), 2] <- 0
+
+ # Compute centroids
+ histStats$centroids <- histStats$bins * binsize + min + binsize / 2
+
+ # Return the statistics
+ return(histStats)
+ })
+
#' Saves the content of the SparkDataFrame to an external database table via JDBC
#'
#' Additional JDBC database connection properties can be set (...)
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index f654d8330c..62907118ef 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -106,6 +106,10 @@ setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
# @export
setGeneric("glom", function(x) { standardGeneric("glom") })
+# @rdname histogram
+# @export
+setGeneric("histogram", function(df, col, nbins=10) { standardGeneric("histogram") })
+
# @rdname keyBy
# @export
setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9244c5621b..336068035e 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1972,6 +1972,51 @@ test_that("Method str()", {
expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
})
+test_that("Histogram", {
+
+ # Basic histogram test with colname
+ expect_equal(
+ all(histogram(irisDF, "Petal_Width", 8) ==
+ data.frame(bins = seq(0, 7),
+ counts = c(48, 2, 7, 21, 24, 19, 15, 14),
+ centroids = seq(0, 7) * 0.3 + 0.25)),
+ TRUE)
+
+ # Basic histogram test with Column
+ expect_equal(
+ all(histogram(irisDF, irisDF$Petal_Width, 8) ==
+ data.frame(bins = seq(0, 7),
+ counts = c(48, 2, 7, 21, 24, 19, 15, 14),
+ centroids = seq(0, 7) * 0.3 + 0.25)),
+ TRUE)
+
+ # Basic histogram test with derived column
+ expect_equal(
+ all(round(histogram(irisDF, irisDF$Petal_Width + 1, 8), 2) ==
+ data.frame(bins = seq(0, 7),
+ counts = c(48, 2, 7, 21, 24, 19, 15, 14),
+ centroids = seq(0, 7) * 0.3 + 1.25)),
+ TRUE)
+
+ # Missing nbins
+ expect_equal(length(histogram(irisDF, "Petal_Width")$counts), 10)
+
+ # Wrong colname
+ expect_error(histogram(irisDF, "xxx"),
+ "Specified colname does not belong to the given SparkDataFrame.")
+
+ # Invalid nbins
+ expect_error(histogram(irisDF, "Petal_Width", nbins = 0),
+ "The number of bins must be a positive integer number greater than 1.")
+
+ # Test against R's hist
+ expect_equal(all(hist(iris$Sepal.Width)$counts ==
+ histogram(irisDF, "Sepal_Width", 12)$counts), T)
+
+ # Test when there are zero counts
+ df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100)))
+ expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1))
+})
unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)