aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
1 files changed, 2 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 0a4c141e5b..a34f6c73fe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -435,13 +435,12 @@ class StreamingContext private[streaming] (
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
- val data = br.map { case (k, v) =>
- val bytes = v.getBytes
+ br.map { case (k, v) =>
+ val bytes = v.copyBytes()
require(bytes.length == recordLength, "Byte array does not have correct length. " +
s"${bytes.length} did not equal recordLength: $recordLength")
bytes
}
- data
}
/**