diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-11-12 00:10:45 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-11-12 00:10:45 -0800 |
commit | 7ccbbdacb9406d67b5acf2a489d6551900babdc9 (patch) | |
tree | 8592be6ee2e99905ff9d444e81999c8c654507e3 /streaming/src/main | |
parent | 23b53efccdc6068d6781ae3ed7b656ae5008c8fb (diff) | |
download | spark-7ccbbdacb9406d67b5acf2a489d6551900babdc9.tar.gz spark-7ccbbdacb9406d67b5acf2a489d6551900babdc9.tar.bz2 spark-7ccbbdacb9406d67b5acf2a489d6551900babdc9.zip |
Made block generator thread safe to fix Kafka bug.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 8d3ac0fc65..a82862c802 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -232,11 +232,11 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log logInfo("Data handler stopped") } - def += (obj: T) { + def += (obj: T): Unit = synchronized { currentBuffer += obj } - private def updateCurrentBuffer(time: Long) { + private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] |