aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/main
diff options
context:
space:
mode:
authorDavid Lemieux <david.lemieux@radialpoint.com>2014-05-28 15:50:35 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-28 15:51:32 -0700
commit4312cf0bade82d4b54eef80e637953e1633edc3e (patch)
treeda6de3497a06fb0c7f6683173d971aa52951c5c7 /external/flume/src/main
parent7801d44fd3bcf4d82e6db12574cc42bef15bf0e1 (diff)
downloadspark-4312cf0bade82d4b54eef80e637953e1633edc3e.tar.gz
spark-4312cf0bade82d4b54eef80e637953e1633edc3e.tar.bz2
spark-4312cf0bade82d4b54eef80e637953e1633edc3e.zip
Spark 1916
The changes could be ported back to 0.9 as well. Changing in.read to in.readFully to read the whole input stream rather than the first 1020 bytes. This should ok considering that Flume caps the body size to 32K by default. Author: David Lemieux <david.lemieux@radialpoint.com> Closes #865 from lemieud/SPARK-1916 and squashes the following commits: a265673 [David Lemieux] Updated SparkFlumeEvent to read the whole stream rather than the first X bytes. (cherry picked from commit 0b769b73fb7ae314325857138a2d3138ed157908) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'external/flume/src/main')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index df7605fe57..5be33f1d5c 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -63,7 +63,7 @@ class SparkFlumeEvent() extends Externalizable {
def readExternal(in: ObjectInput) {
val bodyLength = in.readInt()
val bodyBuff = new Array[Byte](bodyLength)
- in.read(bodyBuff)
+ in.readFully(bodyBuff)
val numHeaders = in.readInt()
val headers = new java.util.HashMap[CharSequence, CharSequence]