aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala2
-rw-r--r--project/MimaExcludes.scala2
-rw-r--r--python/pyspark/sql/streaming.py6
-rw-r--r--python/pyspark/sql/tests.py4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala18
11 files changed, 25 insertions, 25 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 0e40abac65..544fbc5ec3 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnQuery { query =>
- val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+ val recordsRead = query.recentProgress.map(_.numInputRows).sum
recordsRead == 3
}
)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 82d50f9891..b215d8867d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -91,7 +91,7 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
- ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ee7a26d00d..9cfb3fe25c 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -114,12 +114,12 @@ class StreamingQuery(object):
@property
@since(2.1)
- def recentProgresses(self):
+ def recentProgress(self):
"""Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
The number of progress updates retained for each stream is configured by Spark session
- configuration `spark.sql.streaming.numRecentProgresses`.
+ configuration `spark.sql.streaming.numRecentProgressUpdates`.
"""
- return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+ return [json.loads(p.json()) for p in self._jsq.recentProgress()]
@property
@since(2.1)
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 66a3490a64..50df68b144 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1116,11 +1116,11 @@ class SQLTests(ReusedPySparkTestCase):
try:
q.processAllAvailable()
lastProgress = q.lastProgress
- recentProgresses = q.recentProgresses
+ recentProgress = q.recentProgress
status = q.status
self.assertEqual(lastProgress['name'], q.name)
self.assertEqual(lastProgress['id'], q.id)
- self.assertTrue(any(p == lastProgress for p in recentProgresses))
+ self.assertTrue(any(p == lastProgress for p in recentProgress))
self.assertTrue(
"message" in status and
"isDataAvailable" in status and
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 12d0c1e9b4..40e3151337 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -94,7 +94,7 @@ trait ProgressReporter extends Logging {
def status: StreamingQueryStatus = currentStatus
/** Returns an array containing the most recent query progress updates. */
- def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+ def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
progressBuffer.toArray
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5b45df69e6..91f3fe0fe9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -617,7 +617,7 @@ object SQLConf {
.createWithDefault(false)
val STREAMING_PROGRESS_RETENTION =
- SQLConfigBuilder("spark.sql.streaming.numRecentProgresses")
+ SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates")
.doc("The number of progress updates to retain for a streaming query")
.intConf
.createWithDefault(100)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 1794e75462..596bd90140 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -87,11 +87,11 @@ trait StreamingQuery {
/**
* Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
* The number of progress updates retained for each stream is configured by Spark session
- * configuration `spark.sql.streaming.numRecentProgresses`.
+ * configuration `spark.sql.streaming.numRecentProgressUpdates`.
*
* @since 2.1.0
*/
- def recentProgresses: Array[StreamingQueryProgress]
+ def recentProgress: Array[StreamingQueryProgress]
/**
* Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 4a3eeb70b1..9137d650e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -263,9 +263,9 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
try {
inputData.addData(10, 11, 12)
query.processAllAvailable()
- val recentProgress = query.recentProgresses.filter(_.numInputRows != 0).headOption
+ val recentProgress = query.recentProgress.filter(_.numInputRows != 0).headOption
assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3,
- s"recentProgresses[${query.recentProgresses.toList}] doesn't contain correct metrics")
+ s"recentProgress[${query.recentProgress.toList}] doesn't contain correct metrics")
} finally {
query.stop()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index ff1f3e26f1..7b6fe83b9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1006,7 +1006,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
AddTextFileData("100", src, tmp),
CheckAnswer("100"),
AssertOnQuery { query =>
- val actualProgress = query.recentProgresses
+ val actualProgress = query.recentProgress
.find(_.numInputRows > 0)
.getOrElse(sys.error("Could not find records with data."))
assert(actualProgress.numInputRows === 1)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 1cd503c6de..b78d1353e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -237,9 +237,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
true
}
- // `recentProgresses` should not receive too many no data events
+ // `recentProgress` should not receive too many no data events
actions += AssertOnQuery { q =>
- q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
+ q.recentProgress.size > 1 && q.recentProgress.size <= 11
}
testStream(input.toDS)(actions: _*)
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 55dd1a5d51..7be2f21691 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -152,7 +152,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}
- testQuietly("status, lastProgress, and recentProgresses") {
+ testQuietly("status, lastProgress, and recentProgress") {
import StreamingQuerySuite._
clock = new StreamManualClock
@@ -201,7 +201,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
- AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+ AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
// Test status and progress while offset is being fetched
AddData(inputData, 1, 2),
@@ -210,7 +210,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
- AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+ AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
// Test status and progress while batch is being fetched
AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
@@ -218,14 +218,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
- AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+ AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
// Test status and progress while batch is being processed
AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
- AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+ AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
// Test status and progress while batch processing has completed
AdvanceManualClock(500), // time = 1100 to unblock job
@@ -236,8 +236,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery { query =>
assert(query.lastProgress != null)
- assert(query.recentProgresses.exists(_.numInputRows > 0))
- assert(query.recentProgresses.last.eq(query.lastProgress))
+ assert(query.recentProgress.exists(_.numInputRows > 0))
+ assert(query.recentProgress.last.eq(query.lastProgress))
val progress = query.lastProgress
assert(progress.id === query.id)
@@ -274,7 +274,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery { query =>
- assert(query.recentProgresses.last.eq(query.lastProgress))
+ assert(query.recentProgress.last.eq(query.lastProgress))
assert(query.lastProgress.batchId === 1)
assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
true
@@ -408,7 +408,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
try {
val q = streamingDF.writeStream.format("memory").queryName("test").start()
q.processAllAvailable()
- q.recentProgresses.head
+ q.recentProgress.head
} finally {
spark.streams.active.map(_.stop())
}