From d0ecca6075d86bedebf8bc2278085a2cd6cb0a43 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 20 Feb 2017 09:02:09 -0800 Subject: [SPARK-19646][CORE][STREAMING] binaryRecords replicates records in scala API ## What changes were proposed in this pull request? Use `BytesWritable.copyBytes`, not `getBytes`, because `getBytes` returns the underlying array, which may be reused when repeated reads don't need a different size, as is the case with binaryRecords APIs ## How was this patch tested? Existing tests Author: Sean Owen Closes #16974 from srowen/SPARK-19646. --- .../src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'streaming/src/main/scala') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 0a4c141e5b..a34f6c73fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -435,13 +435,12 @@ class StreamingContext private[streaming] ( conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf) - val data = br.map { case (k, v) => - val bytes = v.getBytes + br.map { case (k, v) => + val bytes = v.copyBytes() require(bytes.length == recordLength, "Byte array does not have correct length. " + s"${bytes.length} did not equal recordLength: $recordLength") bytes } - data } /** -- cgit v1.2.3