aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/R/DataFrame.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/R/DataFrame.R')
-rw-r--r--R/pkg/R/DataFrame.R120
1 files changed, 120 insertions, 0 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 890d15dfee..36aedfae86 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2469,6 +2469,126 @@ setMethod("drop",
base::drop(x)
})
+#' This function computes a histogram for a given SparkR Column.
+#'
+#' @name histogram
+#' @title Histogram
+#' @param nbins the number of bins (optional). Default value is 10.
+#' @param df the SparkDataFrame containing the Column to build the histogram from.
+#' @param colname the name of the column to build the histogram from.
+#' @return a data.frame with the histogram statistics, i.e., counts and centroids.
+#' @rdname histogram
+#' @family SparkDataFrame functions
+#' @export
+#' @examples
+#' \dontrun{
+#'
+#' # Create a SparkDataFrame from the Iris dataset
+#' irisDF <- createDataFrame(sqlContext, iris)
+#'
+#' # Compute histogram statistics
+#' histStats <- histogram(irisDF, irisDF$Sepal_Length, nbins = 12)
+#'
+#' # Once SparkR has computed the histogram statistics, the histogram can be
+#' # rendered using the ggplot2 library:
+#'
+#' require(ggplot2)
+#' plot <- ggplot(histStats, aes(x = centroids, y = counts)) +
+#' geom_bar(stat = "identity") +
+#' xlab("Sepal_Length") + ylab("Frequency")
+#' }
+setMethod("histogram",
+ signature(df = "SparkDataFrame", col = "characterOrColumn"),
+ function(df, col, nbins = 10) {
+ # Validate nbins
+ if (nbins < 2) {
+ stop("The number of bins must be a positive integer number greater than 1.")
+ }
+
+ # Round nbins to the smallest integer
+ nbins <- floor(nbins)
+
+ # Validate col
+ if (is.null(col)) {
+ stop("col must be specified.")
+ }
+
+ colname <- col
+ x <- if (class(col) == "character") {
+ if (!colname %in% names(df)) {
+ stop("Specified colname does not belong to the given SparkDataFrame.")
+ }
+
+ # Filter NA values in the target column and remove all other columns
+ df <- na.omit(df[, colname])
+ getColumn(df, colname)
+
+ } else if (class(col) == "Column") {
+
+ # The given column needs to be appended to the SparkDataFrame so that we can
+ # use method describe() to compute statistics in one single pass. The new
+ # column must have a name that doesn't exist in the dataset.
+ # To do so, we generate a random column name with more characters than the
+ # longest colname in the dataset, but no more than 100 (think of a UUID).
+ # This column name will never be visible to the user, so the name is irrelevant.
+ # Limiting the colname length to 100 makes debugging easier and it does
+ # introduce a negligible probability of collision: assuming the user has 1 million
+ # columns AND all of them have names 100 characters long (which is very unlikely),
+ # AND they run 1 billion histograms, the probability of collision will roughly be
+ # 1 in 4.4 x 10 ^ 96
+ colname <- paste(base:::sample(c(letters, LETTERS),
+ size = min(max(nchar(colnames(df))) + 1, 100),
+ replace = TRUE),
+ collapse = "")
+
+ # Append the given column to the dataset. This is to support Columns that
+ # don't belong to the SparkDataFrame but are rather expressions
+ df <- withColumn(df, colname, col)
+
+ # Filter NA values in the target column. Cannot remove all other columns
+ # since given Column may be an expression on one or more existing columns
+ df <- na.omit(df)
+
+ col
+ }
+
+ # At this point, df only has one column: the one to compute the histogram from
+ stats <- collect(describe(df[, colname]))
+ min <- as.numeric(stats[4, 2])
+ max <- as.numeric(stats[5, 2])
+
+ # Normalize the data
+ xnorm <- (x - min) / (max - min)
+
+ # Round the data to 4 significant digits. This is to avoid rounding issues.
+ xnorm <- cast(xnorm * 10000, "integer") / 10000.0
+
+ # Since min = 0, max = 1 (data is already normalized)
+ normBinSize <- 1 / nbins
+ binsize <- (max - min) / nbins
+ approxBins <- xnorm / normBinSize
+
+ # Adjust values that are equal to the upper bound of each bin
+ bins <- cast(approxBins -
+ ifelse(approxBins == cast(approxBins, "integer") & x != min, 1, 0),
+ "integer")
+
+ df$bins <- bins
+ histStats <- collect(count(groupBy(df, "bins")))
+ names(histStats) <- c("bins", "counts")
+
+ # Fill bins with zero counts
+ y <- data.frame("bins" = seq(0, nbins - 1))
+ histStats <- merge(histStats, y, all.x = T, all.y = T)
+ histStats[is.na(histStats$count), 2] <- 0
+
+ # Compute centroids
+ histStats$centroids <- histStats$bins * binsize + min + binsize / 2
+
+ # Return the statistics
+ return(histStats)
+ })
+
#' Saves the content of the SparkDataFrame to an external database table via JDBC
#'
#' Additional JDBC database connection properties can be set (...)