aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/R/DataFrame.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/R/DataFrame.R')
-rw-r--r--R/pkg/R/DataFrame.R104
1 files changed, 101 insertions, 3 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 97e0c9edea..bc81633815 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -133,9 +133,6 @@ setMethod("schema",
#'
#' Print the logical and physical Catalyst plans to the console for debugging.
#'
-#' @param x a SparkDataFrame.
-#' @param extended Logical. If extended is FALSE, explain() only prints the physical plan.
-#' @param ... further arguments to be passed to or from other methods.
#' @family SparkDataFrame functions
#' @aliases explain,SparkDataFrame-method
#' @rdname explain
@@ -3515,3 +3512,104 @@ setMethod("getNumPartitions",
function(x) {
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
})
+
+#' isStreaming
+#'
+#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
+#' as it arrives.
+#'
+#' @param x A SparkDataFrame
+#' @return TRUE if this SparkDataFrame is from a streaming source
+#' @family SparkDataFrame functions
+#' @aliases isStreaming,SparkDataFrame-method
+#' @rdname isStreaming
+#' @name isStreaming
+#' @seealso \link{read.stream} \link{write.stream}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' isStreaming(df)
+#' }
+#' @note isStreaming since 2.2.0
+#' @note experimental
+setMethod("isStreaming",
+ signature(x = "SparkDataFrame"),
+ function(x) {
+ callJMethod(x@sdf, "isStreaming")
+ })
+
+#' Write the streaming SparkDataFrame to a data source.
+#'
+#' The data source is specified by the \code{source} and a set of options (...).
+#' If \code{source} is not specified, the default data source configured by
+#' spark.sql.sources.default will be used.
+#'
+#' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a
+#' output data source. There are three modes:
+#' \itemize{
+#' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This
+#' output mode can be only be used in queries that do not contain any aggregation.
+#' \item complete: All the rows in the streaming SparkDataFrame will be written out every time
+#' there are some updates. This output mode can only be used in queries that
+#' contain aggregations.
+#' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written
+#' out every time there are some updates. If the query doesn't contain aggregations,
+#' it will be equivalent to \code{append} mode.
+#' }
+#'
+#' @param df a streaming SparkDataFrame.
+#' @param source a name for external data source.
+#' @param outputMode one of 'append', 'complete', 'update'.
+#' @param ... additional argument(s) passed to the method.
+#'
+#' @family SparkDataFrame functions
+#' @seealso \link{read.stream}
+#' @aliases write.stream,SparkDataFrame-method
+#' @rdname write.stream
+#' @name write.stream
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' isStreaming(df)
+#' wordCounts <- count(group_by(df, "value"))
+#'
+#' # console
+#' q <- write.stream(wordCounts, "console", outputMode = "complete")
+#' # text stream
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
+#' # memory stream
+#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
+#' head(sql("SELECT * from outs"))
+#' queryName(q)
+#'
+#' stopQuery(q)
+#' }
+#' @note write.stream since 2.2.0
+#' @note experimental
+setMethod("write.stream",
+ signature(df = "SparkDataFrame"),
+ function(df, source = NULL, outputMode = NULL, ...) {
+ if (!is.null(source) && !is.character(source)) {
+ stop("source should be character, NULL or omitted. It is the data source specified ",
+ "in 'spark.sql.sources.default' configuration by default.")
+ }
+ if (!is.null(outputMode) && !is.character(outputMode)) {
+ stop("outputMode should be charactor or omitted.")
+ }
+ if (is.null(source)) {
+ source <- getDefaultSqlSource()
+ }
+ options <- varargsToStrEnv(...)
+ write <- handledCallJMethod(df@sdf, "writeStream")
+ write <- callJMethod(write, "format", source)
+ if (!is.null(outputMode)) {
+ write <- callJMethod(write, "outputMode", outputMode)
+ }
+ write <- callJMethod(write, "options", options)
+ ssq <- handledCallJMethod(write, "start")
+ streamingQuery(ssq)
+ })