aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-14 15:54:14 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-14 15:54:14 -0700
commit18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67 (patch)
tree5d1ed89c47ba788b659a931cdab54788fbafe8a9 /streaming
parentf3bfb711c1742d0915e43bda8230b4d1d22b4190 (diff)
downloadspark-18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67.tar.gz
spark-18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67.tar.bz2
spark-18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67.zip
[SPARK-9968] [STREAMING] Reduced time spent within synchronized block to prevent lock starvation
When the rate limiter is actually limiting the rate at which data is inserted into the buffer, the synchronized block of BlockGenerator.addData stays blocked for long time. This causes the thread switching the buffer and generating blocks (synchronized with addData) to starve and not generate blocks for seconds. The correct solution is to not block on the rate limiter within the synchronized block for adding data to the buffer. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8204 from tdas/SPARK-9968 and squashes the following commits: 8cbcc1b [Tathagata Das] Removed unused val a73b645 [Tathagata Das] Reduced time spent within synchronized block
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala40
1 files changed, 32 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 794dece370..300e820d01 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -155,10 +155,17 @@ private[streaming] class BlockGenerator(
/**
* Push a single data item into the buffer.
*/
- def addData(data: Any): Unit = synchronized {
+ def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
- currentBuffer += data
+ synchronized {
+ if (state == Active) {
+ currentBuffer += data
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
+ }
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
@@ -169,11 +176,18 @@ private[streaming] class BlockGenerator(
* Push a single data item into the buffer. After buffering the data, the
* `BlockGeneratorListener.onAddData` callback will be called.
*/
- def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
+ def addDataWithCallback(data: Any, metadata: Any): Unit = {
if (state == Active) {
waitToPush()
- currentBuffer += data
- listener.onAddData(data, metadata)
+ synchronized {
+ if (state == Active) {
+ currentBuffer += data
+ listener.onAddData(data, metadata)
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
+ }
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
@@ -185,13 +199,23 @@ private[streaming] class BlockGenerator(
* `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items
* are atomically added to the buffer, and are hence guaranteed to be present in a single block.
*/
- def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized {
+ def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = {
if (state == Active) {
+ // Unroll iterator into a temp buffer, and wait for pushing in the process
+ val tempBuffer = new ArrayBuffer[Any]
dataIterator.foreach { data =>
waitToPush()
- currentBuffer += data
+ tempBuffer += data
+ }
+ synchronized {
+ if (state == Active) {
+ currentBuffer ++= tempBuffer
+ listener.onAddData(tempBuffer, metadata)
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
}
- listener.onAddData(dataIterator, metadata)
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")