diff options
Diffstat (limited to 'R/pkg/R/DataFrame.R')
-rw-r--r-- | R/pkg/R/DataFrame.R | 104 |
1 files changed, 101 insertions, 3 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 97e0c9edea..bc81633815 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -133,9 +133,6 @@ setMethod("schema", #' #' Print the logical and physical Catalyst plans to the console for debugging. #' -#' @param x a SparkDataFrame. -#' @param extended Logical. If extended is FALSE, explain() only prints the physical plan. -#' @param ... further arguments to be passed to or from other methods. #' @family SparkDataFrame functions #' @aliases explain,SparkDataFrame-method #' @rdname explain @@ -3515,3 +3512,104 @@ setMethod("getNumPartitions", function(x) { callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions") }) + +#' isStreaming +#' +#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data +#' as it arrives. +#' +#' @param x A SparkDataFrame +#' @return TRUE if this SparkDataFrame is from a streaming source +#' @family SparkDataFrame functions +#' @aliases isStreaming,SparkDataFrame-method +#' @rdname isStreaming +#' @name isStreaming +#' @seealso \link{read.stream} \link{write.stream} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- read.stream("socket", host = "localhost", port = 9999) +#' isStreaming(df) +#' } +#' @note isStreaming since 2.2.0 +#' @note experimental +setMethod("isStreaming", + signature(x = "SparkDataFrame"), + function(x) { + callJMethod(x@sdf, "isStreaming") + }) + +#' Write the streaming SparkDataFrame to a data source. +#' +#' The data source is specified by the \code{source} and a set of options (...). +#' If \code{source} is not specified, the default data source configured by +#' spark.sql.sources.default will be used. +#' +#' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a +#' output data source. There are three modes: +#' \itemize{ +#' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This +#' output mode can be only be used in queries that do not contain any aggregation. +#' \item complete: All the rows in the streaming SparkDataFrame will be written out every time +#' there are some updates. This output mode can only be used in queries that +#' contain aggregations. +#' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written +#' out every time there are some updates. If the query doesn't contain aggregations, +#' it will be equivalent to \code{append} mode. +#' } +#' +#' @param df a streaming SparkDataFrame. +#' @param source a name for external data source. +#' @param outputMode one of 'append', 'complete', 'update'. +#' @param ... additional argument(s) passed to the method. +#' +#' @family SparkDataFrame functions +#' @seealso \link{read.stream} +#' @aliases write.stream,SparkDataFrame-method +#' @rdname write.stream +#' @name write.stream +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- read.stream("socket", host = "localhost", port = 9999) +#' isStreaming(df) +#' wordCounts <- count(group_by(df, "value")) +#' +#' # console +#' q <- write.stream(wordCounts, "console", outputMode = "complete") +#' # text stream +#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp") +#' # memory stream +#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete") +#' head(sql("SELECT * from outs")) +#' queryName(q) +#' +#' stopQuery(q) +#' } +#' @note write.stream since 2.2.0 +#' @note experimental +setMethod("write.stream", + signature(df = "SparkDataFrame"), + function(df, source = NULL, outputMode = NULL, ...) { + if (!is.null(source) && !is.character(source)) { + stop("source should be character, NULL or omitted. It is the data source specified ", + "in 'spark.sql.sources.default' configuration by default.") + } + if (!is.null(outputMode) && !is.character(outputMode)) { + stop("outputMode should be charactor or omitted.") + } + if (is.null(source)) { + source <- getDefaultSqlSource() + } + options <- varargsToStrEnv(...) + write <- handledCallJMethod(df@sdf, "writeStream") + write <- callJMethod(write, "format", source) + if (!is.null(outputMode)) { + write <- callJMethod(write, "outputMode", outputMode) + } + write <- callJMethod(write, "options", options) + ssq <- handledCallJMethod(write, "start") + streamingQuery(ssq) + }) |