aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-06-20 13:41:03 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-06-20 13:41:03 -0700
commitb0f2fb5b9729b38744bf784f2072f5ee52314f87 (patch)
treebbe026f28c48cdd9741016d0ac19d6abb1639df8 /R
parentaee1420eca64dfc145f31b8c653388fafc5ccd8f (diff)
downloadspark-b0f2fb5b9729b38744bf784f2072f5ee52314f87.tar.gz
spark-b0f2fb5b9729b38744bf784f2072f5ee52314f87.tar.bz2
spark-b0f2fb5b9729b38744bf784f2072f5ee52314f87.zip
[SPARK-16053][R] Add `spark_partition_id` in SparkR
## What changes were proposed in this pull request? This PR adds `spark_partition_id` virtual column function in SparkR for API parity. The following is just an example to illustrate a SparkR usage on a partitioned parquet table created by `spark.range(10).write.mode("overwrite").parquet("/tmp/t1")`. ```r > collect(select(read.parquet('/tmp/t1'), c('id', spark_partition_id()))) id SPARK_PARTITION_ID() 1 3 0 2 4 0 3 8 1 4 9 1 5 0 2 6 1 3 7 2 4 8 5 5 9 6 6 10 7 7 ``` ## How was this patch tested? Pass the Jenkins tests (including new testcase). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13768 from dongjoon-hyun/SPARK-16053.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/functions.R21
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R1
4 files changed, 27 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index aaeab665a4..45663f4c2c 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -260,6 +260,7 @@ exportMethods("%in%",
"skewness",
"sort_array",
"soundex",
+ "spark_partition_id",
"stddev",
"stddev_pop",
"stddev_samp",
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 0fb38bc289..c26f963258 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -1206,6 +1206,27 @@ setMethod("soundex",
column(jc)
})
+#' Return the partition ID as a column
+#'
+#' Return the partition ID of the Spark task as a SparkDataFrame column.
+#' Note that this is nondeterministic because it depends on data partitioning and
+#' task scheduling.
+#'
+#' This is equivalent to the SPARK_PARTITION_ID function in SQL.
+#'
+#' @rdname spark_partition_id
+#' @name spark_partition_id
+#' @export
+#' @examples
+#' \dontrun{select(df, spark_partition_id())}
+#' @note spark_partition_id since 2.0.0
+setMethod("spark_partition_id",
+ signature(x = "missing"),
+ function() {
+ jc <- callJStatic("org.apache.spark.sql.functions", "spark_partition_id")
+ column(jc)
+ })
+
#' @rdname sd
#' @name stddev
setMethod("stddev",
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index dcc1cf241f..f6b9276d86 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1135,6 +1135,10 @@ setGeneric("sort_array", function(x, asc = TRUE) { standardGeneric("sort_array")
#' @export
setGeneric("soundex", function(x) { standardGeneric("soundex") })
+#' @rdname spark_partition_id
+#' @export
+setGeneric("spark_partition_id", function(x) { standardGeneric("spark_partition_id") })
+
#' @rdname sd
#' @export
setGeneric("stddev", function(x) { standardGeneric("stddev") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 114fec6e36..d53c40d423 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1059,6 +1059,7 @@ test_that("column functions", {
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
+ c19 <- spark_partition_id()
# Test if base::is.nan() is exposed
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))