aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-03-18 16:26:48 -0700
committerFelix Cheung <felixcheung@apache.org>2017-03-18 16:26:48 -0700
commit5c165596dac136b9b3a88cfb3578b2423d227eb7 (patch)
tree1f6d3128c1586fe67fae21a50b9454d2ac4c8ed2 /R
parent54e61df2634163382c7d01a2ad40ffb5e7270abc (diff)
downloadspark-5c165596dac136b9b3a88cfb3578b2423d227eb7.tar.gz
spark-5c165596dac136b9b3a88cfb3578b2423d227eb7.tar.bz2
spark-5c165596dac136b9b3a88cfb3578b2423d227eb7.zip
[SPARK-19654][SPARKR][SS] Structured Streaming API for R
## What changes were proposed in this pull request? Add "experimental" API for SS in R ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16982 from felixcheung/rss.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/DESCRIPTION1
-rw-r--r--R/pkg/NAMESPACE13
-rw-r--r--R/pkg/R/DataFrame.R104
-rw-r--r--R/pkg/R/SQLContext.R50
-rw-r--r--R/pkg/R/generics.R41
-rw-r--r--R/pkg/R/streaming.R208
-rw-r--r--R/pkg/R/utils.R11
-rw-r--r--R/pkg/inst/tests/testthat/test_streaming.R150
8 files changed, 573 insertions, 5 deletions
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index cc471edc37..1635f71489 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -54,5 +54,6 @@ Collate:
'types.R'
'utils.R'
'window.R'
+ 'streaming.R'
RoxygenNote: 5.0.1
VignetteBuilder: knitr
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 871f8e41a0..78344ce9ff 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -121,6 +121,7 @@ exportMethods("arrange",
"insertInto",
"intersect",
"isLocal",
+ "isStreaming",
"join",
"limit",
"merge",
@@ -169,6 +170,7 @@ exportMethods("arrange",
"write.json",
"write.orc",
"write.parquet",
+ "write.stream",
"write.text",
"write.ml")
@@ -365,6 +367,7 @@ export("as.DataFrame",
"read.json",
"read.orc",
"read.parquet",
+ "read.stream",
"read.text",
"spark.lapply",
"spark.addFile",
@@ -402,6 +405,16 @@ export("partitionBy",
export("windowPartitionBy",
"windowOrderBy")
+exportClasses("StreamingQuery")
+
+export("awaitTermination",
+ "isActive",
+ "lastProgress",
+ "queryName",
+ "status",
+ "stopQuery")
+
+
S3method(print, jobj)
S3method(print, structField)
S3method(print, structType)
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 97e0c9edea..bc81633815 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -133,9 +133,6 @@ setMethod("schema",
#'
#' Print the logical and physical Catalyst plans to the console for debugging.
#'
-#' @param x a SparkDataFrame.
-#' @param extended Logical. If extended is FALSE, explain() only prints the physical plan.
-#' @param ... further arguments to be passed to or from other methods.
#' @family SparkDataFrame functions
#' @aliases explain,SparkDataFrame-method
#' @rdname explain
@@ -3515,3 +3512,104 @@ setMethod("getNumPartitions",
function(x) {
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
})
+
+#' isStreaming
+#'
+#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
+#' as it arrives.
+#'
+#' @param x A SparkDataFrame
+#' @return TRUE if this SparkDataFrame is from a streaming source
+#' @family SparkDataFrame functions
+#' @aliases isStreaming,SparkDataFrame-method
+#' @rdname isStreaming
+#' @name isStreaming
+#' @seealso \link{read.stream} \link{write.stream}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' isStreaming(df)
+#' }
+#' @note isStreaming since 2.2.0
+#' @note experimental
+setMethod("isStreaming",
+ signature(x = "SparkDataFrame"),
+ function(x) {
+ callJMethod(x@sdf, "isStreaming")
+ })
+
+#' Write the streaming SparkDataFrame to a data source.
+#'
+#' The data source is specified by the \code{source} and a set of options (...).
+#' If \code{source} is not specified, the default data source configured by
+#' spark.sql.sources.default will be used.
+#'
+#' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a
+#' output data source. There are three modes:
+#' \itemize{
+#' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This
+#' output mode can be only be used in queries that do not contain any aggregation.
+#' \item complete: All the rows in the streaming SparkDataFrame will be written out every time
+#' there are some updates. This output mode can only be used in queries that
+#' contain aggregations.
+#' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written
+#' out every time there are some updates. If the query doesn't contain aggregations,
+#' it will be equivalent to \code{append} mode.
+#' }
+#'
+#' @param df a streaming SparkDataFrame.
+#' @param source a name for external data source.
+#' @param outputMode one of 'append', 'complete', 'update'.
+#' @param ... additional argument(s) passed to the method.
+#'
+#' @family SparkDataFrame functions
+#' @seealso \link{read.stream}
+#' @aliases write.stream,SparkDataFrame-method
+#' @rdname write.stream
+#' @name write.stream
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' isStreaming(df)
+#' wordCounts <- count(group_by(df, "value"))
+#'
+#' # console
+#' q <- write.stream(wordCounts, "console", outputMode = "complete")
+#' # text stream
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
+#' # memory stream
+#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
+#' head(sql("SELECT * from outs"))
+#' queryName(q)
+#'
+#' stopQuery(q)
+#' }
+#' @note write.stream since 2.2.0
+#' @note experimental
+setMethod("write.stream",
+ signature(df = "SparkDataFrame"),
+ function(df, source = NULL, outputMode = NULL, ...) {
+ if (!is.null(source) && !is.character(source)) {
+ stop("source should be character, NULL or omitted. It is the data source specified ",
+ "in 'spark.sql.sources.default' configuration by default.")
+ }
+ if (!is.null(outputMode) && !is.character(outputMode)) {
+ stop("outputMode should be charactor or omitted.")
+ }
+ if (is.null(source)) {
+ source <- getDefaultSqlSource()
+ }
+ options <- varargsToStrEnv(...)
+ write <- handledCallJMethod(df@sdf, "writeStream")
+ write <- callJMethod(write, "format", source)
+ if (!is.null(outputMode)) {
+ write <- callJMethod(write, "outputMode", outputMode)
+ }
+ write <- callJMethod(write, "options", options)
+ ssq <- handledCallJMethod(write, "start")
+ streamingQuery(ssq)
+ })
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 8354f705f6..b75fb0159d 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -937,3 +937,53 @@ read.jdbc <- function(url, tableName,
}
dataFrame(sdf)
}
+
+#' Load a streaming SparkDataFrame
+#'
+#' Returns the dataset in a data source as a SparkDataFrame
+#'
+#' The data source is specified by the \code{source} and a set of options(...).
+#' If \code{source} is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param source The name of external data source
+#' @param schema The data schema defined in structType, this is required for file-based streaming
+#' data source
+#' @param ... additional external data source specific named options, for instance \code{path} for
+#' file-based streaming data source
+#' @return SparkDataFrame
+#' @rdname read.stream
+#' @name read.stream
+#' @seealso \link{write.stream}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
+#'
+#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
+#' }
+#' @name read.stream
+#' @note read.stream since 2.2.0
+#' @note experimental
+read.stream <- function(source = NULL, schema = NULL, ...) {
+ sparkSession <- getSparkSession()
+ if (!is.null(source) && !is.character(source)) {
+ stop("source should be character, NULL or omitted. It is the data source specified ",
+ "in 'spark.sql.sources.default' configuration by default.")
+ }
+ if (is.null(source)) {
+ source <- getDefaultSqlSource()
+ }
+ options <- varargsToStrEnv(...)
+ read <- callJMethod(sparkSession, "readStream")
+ read <- callJMethod(read, "format", source)
+ if (!is.null(schema)) {
+ stopifnot(class(schema) == "structType")
+ read <- callJMethod(read, "schema", schema$jobj)
+ }
+ read <- callJMethod(read, "options", options)
+ sdf <- handledCallJMethod(read, "load")
+ dataFrame(callJMethod(sdf, "toDF"))
+}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 45bc127465..029771289f 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -539,6 +539,9 @@ setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
#' @rdname explain
#' @export
+#' @param x a SparkDataFrame or a StreamingQuery.
+#' @param extended Logical. If extended is FALSE, prints only the physical plan.
+#' @param ... further arguments to be passed to or from other methods.
setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @rdname except
@@ -577,6 +580,10 @@ setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
#' @export
setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
+#' @rdname isStreaming
+#' @export
+setGeneric("isStreaming", function(x) { standardGeneric("isStreaming") })
+
#' @rdname limit
#' @export
setGeneric("limit", function(x, num) {standardGeneric("limit") })
@@ -682,6 +689,12 @@ setGeneric("write.parquet", function(x, path, ...) {
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+#' @rdname write.stream
+#' @export
+setGeneric("write.stream", function(df, source = NULL, outputMode = NULL, ...) {
+ standardGeneric("write.stream")
+})
+
#' @rdname write.text
#' @export
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })
@@ -1428,10 +1441,36 @@ setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark
#' @export
setGeneric("spark.perplexity", function(object, data) { standardGeneric("spark.perplexity") })
-
#' @param object a fitted ML model object.
#' @param path the directory where the model is saved.
#' @param ... additional argument(s) passed to the method.
#' @rdname write.ml
#' @export
setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") })
+
+
+###################### Streaming Methods ##########################
+
+#' @rdname awaitTermination
+#' @export
+setGeneric("awaitTermination", function(x, timeout) { standardGeneric("awaitTermination") })
+
+#' @rdname isActive
+#' @export
+setGeneric("isActive", function(x) { standardGeneric("isActive") })
+
+#' @rdname lastProgress
+#' @export
+setGeneric("lastProgress", function(x) { standardGeneric("lastProgress") })
+
+#' @rdname queryName
+#' @export
+setGeneric("queryName", function(x) { standardGeneric("queryName") })
+
+#' @rdname status
+#' @export
+setGeneric("status", function(x) { standardGeneric("status") })
+
+#' @rdname stopQuery
+#' @export
+setGeneric("stopQuery", function(x) { standardGeneric("stopQuery") })
diff --git a/R/pkg/R/streaming.R b/R/pkg/R/streaming.R
new file mode 100644
index 0000000000..e353d2dd07
--- /dev/null
+++ b/R/pkg/R/streaming.R
@@ -0,0 +1,208 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# streaming.R - Structured Streaming / StreamingQuery class and methods implemented in S4 OO classes
+
+#' @include generics.R jobj.R
+NULL
+
+#' S4 class that represents a StreamingQuery
+#'
+#' StreamingQuery can be created by using read.stream() and write.stream()
+#'
+#' @rdname StreamingQuery
+#' @seealso \link{read.stream}
+#'
+#' @param ssq A Java object reference to the backing Scala StreamingQuery
+#' @export
+#' @note StreamingQuery since 2.2.0
+#' @note experimental
+setClass("StreamingQuery",
+ slots = list(ssq = "jobj"))
+
+setMethod("initialize", "StreamingQuery", function(.Object, ssq) {
+ .Object@ssq <- ssq
+ .Object
+})
+
+streamingQuery <- function(ssq) {
+ stopifnot(class(ssq) == "jobj")
+ new("StreamingQuery", ssq)
+}
+
+#' @rdname show
+#' @export
+#' @note show(StreamingQuery) since 2.2.0
+setMethod("show", "StreamingQuery",
+ function(object) {
+ name <- callJMethod(object@ssq, "name")
+ if (!is.null(name)) {
+ cat(paste0("StreamingQuery '", name, "'\n"))
+ } else {
+ cat("StreamingQuery", "\n")
+ }
+ })
+
+#' queryName
+#'
+#' Returns the user-specified name of the query. This is specified in
+#' \code{write.stream(df, queryName = "query")}. This name, if set, must be unique across all active
+#' queries.
+#'
+#' @param x a StreamingQuery.
+#' @return The name of the query, or NULL if not specified.
+#' @rdname queryName
+#' @name queryName
+#' @aliases queryName,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @seealso \link{write.stream}
+#' @export
+#' @examples
+#' \dontrun{ queryName(sq) }
+#' @note queryName(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("queryName",
+ signature(x = "StreamingQuery"),
+ function(x) {
+ callJMethod(x@ssq, "name")
+ })
+
+#' @rdname explain
+#' @name explain
+#' @aliases explain,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ explain(sq) }
+#' @note explain(StreamingQuery) since 2.2.0
+setMethod("explain",
+ signature(x = "StreamingQuery"),
+ function(x, extended = FALSE) {
+ cat(callJMethod(x@ssq, "explainInternal", extended), "\n")
+ })
+
+#' lastProgress
+#'
+#' Prints the most recent progess update of this streaming query in JSON format.
+#'
+#' @param x a StreamingQuery.
+#' @rdname lastProgress
+#' @name lastProgress
+#' @aliases lastProgress,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ lastProgress(sq) }
+#' @note lastProgress(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("lastProgress",
+ signature(x = "StreamingQuery"),
+ function(x) {
+ p <- callJMethod(x@ssq, "lastProgress")
+ if (is.null(p)) {
+ cat("Streaming query has no progress")
+ } else {
+ cat(callJMethod(p, "toString"), "\n")
+ }
+ })
+
+#' status
+#'
+#' Prints the current status of the query in JSON format.
+#'
+#' @param x a StreamingQuery.
+#' @rdname status
+#' @name status
+#' @aliases status,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ status(sq) }
+#' @note status(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("status",
+ signature(x = "StreamingQuery"),
+ function(x) {
+ cat(callJMethod(callJMethod(x@ssq, "status"), "toString"), "\n")
+ })
+
+#' isActive
+#'
+#' Returns TRUE if this query is actively running.
+#'
+#' @param x a StreamingQuery.
+#' @return TRUE if query is actively running, FALSE if stopped.
+#' @rdname isActive
+#' @name isActive
+#' @aliases isActive,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ isActive(sq) }
+#' @note isActive(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("isActive",
+ signature(x = "StreamingQuery"),
+ function(x) {
+ callJMethod(x@ssq, "isActive")
+ })
+
+#' awaitTermination
+#'
+#' Waits for the termination of the query, either by \code{stopQuery} or by an error.
+#'
+#' If the query has terminated, then all subsequent calls to this method will return TRUE
+#' immediately.
+#'
+#' @param x a StreamingQuery.
+#' @param timeout time to wait in milliseconds
+#' @return TRUE if query has terminated within the timeout period.
+#' @rdname awaitTermination
+#' @name awaitTermination
+#' @aliases awaitTermination,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ awaitTermination(sq, 10000) }
+#' @note awaitTermination(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("awaitTermination",
+ signature(x = "StreamingQuery"),
+ function(x, timeout) {
+ handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
+ })
+
+#' stopQuery
+#'
+#' Stops the execution of this query if it is running. This method blocks until the execution is
+#' stopped.
+#'
+#' @param x a StreamingQuery.
+#' @rdname stopQuery
+#' @name stopQuery
+#' @aliases stopQuery,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ stopQuery(sq) }
+#' @note stopQuery(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("stopQuery",
+ signature(x = "StreamingQuery"),
+ function(x) {
+ invisible(callJMethod(x@ssq, "stop"))
+ })
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 1f7848f2b4..810de9917e 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -823,7 +823,16 @@ captureJVMException <- function(e, method) {
stacktrace <- rawmsg
}
- if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
+ # StreamingQueryException could wrap an IllegalArgumentException, so look for that first
+ if (any(grep("org.apache.spark.sql.streaming.StreamingQueryException: ", stacktrace))) {
+ msg <- strsplit(stacktrace, "org.apache.spark.sql.streaming.StreamingQueryException: ",
+ fixed = TRUE)[[1]]
+ # Extract "Error in ..." message.
+ rmsg <- msg[1]
+ # Extract the first message of JVM exception.
+ first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
+ stop(paste0(rmsg, "streaming query error - ", first), call. = FALSE)
+ } else if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
# Extract "Error in ..." message.
rmsg <- msg[1]
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R b/R/pkg/inst/tests/testthat/test_streaming.R
new file mode 100644
index 0000000000..03b1bd3dc1
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_streaming.R
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("Structured Streaming")
+
+# Tests for Structured Streaming functions in SparkR
+
+sparkSession <- sparkR.session(enableHiveSupport = FALSE)
+
+jsonSubDir <- file.path("sparkr-test", "json", "")
+if (.Platform$OS.type == "windows") {
+ # file.path removes the empty separator on Windows, adds it back
+ jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
+}
+jsonDir <- file.path(tempdir(), jsonSubDir)
+dir.create(jsonDir, recursive = TRUE)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+ "{\"name\":\"Andy\", \"age\":30}",
+ "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+writeLines(mockLines, jsonPath)
+
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}")
+jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+
+schema <- structType(structField("name", "string"),
+ structField("age", "integer"),
+ structField("count", "double"))
+
+test_that("read.stream, write.stream, awaitTermination, stopQuery", {
+ df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
+ expect_true(isStreaming(df))
+ counts <- count(group_by(df, "name"))
+ q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete")
+
+ expect_false(awaitTermination(q, 5 * 1000))
+ expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
+
+ writeLines(mockLinesNa, jsonPathNa)
+ awaitTermination(q, 5 * 1000)
+ expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
+
+ stopQuery(q)
+ expect_true(awaitTermination(q, 1))
+})
+
+test_that("print from explain, lastProgress, status, isActive", {
+ df <- read.stream("json", path = jsonDir, schema = schema)
+ expect_true(isStreaming(df))
+ counts <- count(group_by(df, "name"))
+ q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete")
+
+ awaitTermination(q, 5 * 1000)
+
+ expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
+ expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q)))))
+ expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)))))
+
+ expect_equal(queryName(q), "people2")
+ expect_true(isActive(q))
+
+ stopQuery(q)
+})
+
+test_that("Stream other format", {
+ parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+ df <- read.df(jsonPath, "json", schema)
+ write.df(df, parquetPath, "parquet", "overwrite")
+
+ df <- read.stream(path = parquetPath, schema = schema)
+ expect_true(isStreaming(df))
+ counts <- count(group_by(df, "name"))
+ q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete")
+
+ expect_false(awaitTermination(q, 5 * 1000))
+ expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
+
+ expect_equal(queryName(q), "people3")
+ expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet",
+ capture.output(lastProgress(q)))))
+ expect_true(isActive(q))
+
+ stopQuery(q)
+ expect_true(awaitTermination(q, 1))
+ expect_false(isActive(q))
+
+ unlink(parquetPath)
+})
+
+test_that("Non-streaming DataFrame", {
+ c <- as.DataFrame(cars)
+ expect_false(isStreaming(c))
+
+ expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"),
+ paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ",
+ "streaming Dataset/DataFrame).*"))
+})
+
+test_that("Unsupported operation", {
+ # memory sink without aggregation
+ df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
+ expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"),
+ paste0(".*(start : analysis error - Complete output mode not supported when there ",
+ "are no streaming aggregations on streaming DataFrames/Datasets).*"))
+})
+
+test_that("Terminated by error", {
+ df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = -1)
+ counts <- count(group_by(df, "name"))
+ # This would not fail before returning with a StreamingQuery,
+ # but could dump error log at just about the same time
+ expect_error(q <- write.stream(counts, "memory", queryName = "people4", outputMode = "complete"),
+ NA)
+
+ expect_error(awaitTermination(q, 1),
+ paste0(".*(awaitTermination : streaming query error - Invalid value '-1' for option",
+ " 'maxFilesPerTrigger', must be a positive integer).*"))
+
+ expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid value",
+ capture.output(status(q)))))
+ expect_true(any(grepl("Streaming query has no progress", capture.output(lastProgress(q)))))
+ expect_equal(queryName(q), "people4")
+ expect_false(isActive(q))
+
+ stopQuery(q)
+})
+
+unlink(jsonPath)
+unlink(jsonPathNa)
+
+sparkR.session.stop()