aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-11-12 00:10:45 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-11-12 00:10:45 -0800
commit7ccbbdacb9406d67b5acf2a489d6551900babdc9 (patch)
tree8592be6ee2e99905ff9d444e81999c8c654507e3 /streaming/src/main
parent23b53efccdc6068d6781ae3ed7b656ae5008c8fb (diff)
downloadspark-7ccbbdacb9406d67b5acf2a489d6551900babdc9.tar.gz
spark-7ccbbdacb9406d67b5acf2a489d6551900babdc9.tar.bz2
spark-7ccbbdacb9406d67b5acf2a489d6551900babdc9.zip
Made block generator thread safe to fix Kafka bug.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 8d3ac0fc65..a82862c802 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -232,11 +232,11 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
logInfo("Data handler stopped")
}
- def += (obj: T) {
+ def += (obj: T): Unit = synchronized {
currentBuffer += obj
}
- private def updateCurrentBuffer(time: Long) {
+ private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]