diff options
author | Sean Owen <sowen@cloudera.com> | 2017-02-20 09:02:09 -0800 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-20 09:02:09 -0800 |
commit | d0ecca6075d86bedebf8bc2278085a2cd6cb0a43 (patch) | |
tree | 4582f88e40df02916659800e8fa4068d585da63d | |
parent | 776b8f17cfc687a57c005a421a81e591c8d44a3f (diff) | |
download | spark-d0ecca6075d86bedebf8bc2278085a2cd6cb0a43.tar.gz spark-d0ecca6075d86bedebf8bc2278085a2cd6cb0a43.tar.bz2 spark-d0ecca6075d86bedebf8bc2278085a2cd6cb0a43.zip |
[SPARK-19646][CORE][STREAMING] binaryRecords replicates records in scala API
## What changes were proposed in this pull request?
Use `BytesWritable.copyBytes`, not `getBytes`, because `getBytes` returns the underlying array, which may be reused when repeated reads don't need a different size, as is the case with binaryRecords APIs
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes #16974 from srowen/SPARK-19646.
4 files changed, 53 insertions, 156 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e4d83893e7..17194b9f06 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -961,12 +961,11 @@ class SparkContext(config: SparkConf) extends Logging { classOf[LongWritable], classOf[BytesWritable], conf = conf) - val data = br.map { case (k, v) => - val bytes = v.getBytes + br.map { case (k, v) => + val bytes = v.copyBytes() assert(bytes.length == recordLength, "Byte array does not have correct length") bytes } - data } /** diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 6538507d40..a2d3177c5c 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io._ +import java.nio.ByteBuffer import java.util.zip.GZIPOutputStream import scala.io.Source @@ -30,7 +31,6 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} -import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.storage.StorageLevel @@ -237,24 +237,26 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) } - test("binary file input as byte array") { - sc = new SparkContext("local", "test") + private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = { val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file - val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) + val file = new FileOutputStream(outFile) val channel = file.getChannel - channel.write(bbuf) + for (i <- 0 until testOutputCopies) { + // Shift values by i so that they're different in the output + val alteredOutput = testOutput.map(b => (b + i).toByte) + channel.write(ByteBuffer.wrap(alteredOutput)) + } channel.close() file.close() + outFile + } - val inRdd = sc.binaryFiles(outFileName) - val (infile: String, indata: PortableDataStream) = inRdd.collect.head - + test("binary file input as byte array") { + sc = new SparkContext("local", "test") + val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath) + val (infile, indata) = inRdd.collect().head // Make sure the name and array match assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added assert(indata.toArray === testOutput) @@ -262,159 +264,55 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test("portabledatastream caching tests") { sc = new SparkContext("local", "test") - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - channel.write(bbuf) - channel.close() - file.close() - - val inRdd = sc.binaryFiles(outFileName).cache() - inRdd.foreach{ - curData: (String, PortableDataStream) => - curData._2.toArray() // force the file to read - } - val mappedRdd = inRdd.map { - curData: (String, PortableDataStream) => - (curData._2.getPath(), curData._2) - } - val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head - + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache() + inRdd.foreach(_._2.toArray()) // force the file to read // Try reading the output back as an object file - - assert(indata.toArray === testOutput) + assert(inRdd.values.collect().head.toArray === testOutput) } test("portabledatastream persist disk storage") { sc = new SparkContext("local", "test") - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - channel.write(bbuf) - channel.close() - file.close() - - val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY) - inRdd.foreach{ - curData: (String, PortableDataStream) => - curData._2.toArray() // force the file to read - } - val mappedRdd = inRdd.map { - curData: (String, PortableDataStream) => - (curData._2.getPath(), curData._2) - } - val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head - - // Try reading the output back as an object file - - assert(indata.toArray === testOutput) + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY) + inRdd.foreach(_._2.toArray()) // force the file to read + assert(inRdd.values.collect().head.toArray === testOutput) } test("portabledatastream flatmap tests") { sc = new SparkContext("local", "test") - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath) val numOfCopies = 3 - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - channel.write(bbuf) - channel.close() - file.close() - - val inRdd = sc.binaryFiles(outFileName) - val mappedRdd = inRdd.map { - curData: (String, PortableDataStream) => - (curData._2.getPath(), curData._2) - } - val copyRdd = mappedRdd.flatMap { - curData: (String, PortableDataStream) => - for (i <- 1 to numOfCopies) yield (i, curData._2) - } - - val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect() - - // Try reading the output back as an object file + val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2)) + val copyArr = copyRdd.collect() assert(copyArr.length == numOfCopies) - copyArr.foreach{ - cEntry: (Int, PortableDataStream) => - assert(cEntry._2.toArray === testOutput) + for (i <- copyArr.indices) { + assert(copyArr(i).toArray === testOutput) } - } test("fixed record length binary file as byte array") { - // a fixed length of 6 bytes - sc = new SparkContext("local", "test") - - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) val testOutputCopies = 10 - - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - for(i <- 1 to testOutputCopies) { - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - channel.write(bbuf) - } - channel.close() - file.close() - - val inRdd = sc.binaryRecords(outFileName, testOutput.length) - // make sure there are enough elements + val outFile = writeBinaryData(testOutput, testOutputCopies) + val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length) assert(inRdd.count == testOutputCopies) - - // now just compare the first one - val indata: Array[Byte] = inRdd.collect.head - assert(indata === testOutput) + val inArr = inRdd.collect() + for (i <- inArr.indices) { + assert(inArr(i) === testOutput.map(b => (b + i).toByte)) + } } test ("negative binary record length should raise an exception") { - // a fixed length of 6 bytes sc = new SparkContext("local", "test") - - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file - val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val testOutputCopies = 10 - - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - for(i <- 1 to testOutputCopies) { - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - channel.write(bbuf) - } - channel.close() - file.close() - - val inRdd = sc.binaryRecords(outFileName, -1) - + val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1) intercept[SparkException] { - inRdd.count + sc.binaryRecords(outFile.getAbsolutePath, -1).count() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 0a4c141e5b..a34f6c73fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -435,13 +435,12 @@ class StreamingContext private[streaming] ( conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf) - val data = br.map { case (k, v) => - val bytes = v.getBytes + br.map { case (k, v) => + val bytes = v.copyBytes() require(bytes.length == recordLength, "Byte array does not have correct length. " + s"${bytes.length} did not equal recordLength: $recordLength") bytes } - data } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 6fb50a4052..b5d36a3651 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -84,7 +84,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected // (whether the elements were received one in each interval is not verified) - val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray + val output = outputQueue.asScala.flatten.toArray assert(output.length === expectedOutput.size) for (i <- output.indices) { assert(output(i) === expectedOutput(i)) @@ -155,14 +155,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // not enough to trigger a batch clock.advance(batchDuration.milliseconds / 2) - val input = Seq(1, 2, 3, 4, 5) - input.foreach { i => + val numCopies = 3 + val input = Array[Byte](1, 2, 3, 4, 5) + for (i <- 0 until numCopies) { Thread.sleep(batchDuration.milliseconds) val file = new File(testDir, i.toString) - Files.write(Array[Byte](i.toByte), file) + Files.write(input.map(b => (b + i).toByte), file) assert(file.setLastModified(clock.getTimeMillis())) assert(file.lastModified === clock.getTimeMillis()) - logInfo("Created file " + file) + logInfo(s"Created file $file") // Advance the clock after creating the file to avoid a race when // setting its modification time clock.advance(batchDuration.milliseconds) @@ -170,10 +171,10 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(batchCounter.getNumCompletedBatches === i) } } - - val expectedOutput = input.map(i => i.toByte) - val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte) - assert(obtainedOutput.toSeq === expectedOutput) + val obtainedOutput = outputQueue.asScala.map(_.flatten).toSeq + for (i <- obtainedOutput.indices) { + assert(obtainedOutput(i) === input.map(b => (b + i).toByte)) + } } } finally { if (testDir != null) Utils.deleteRecursively(testDir) @@ -258,7 +259,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread) MultiThreadTestReceiver.haveAllThreadsFinished = false val outputQueue = new ConcurrentLinkedQueue[Seq[Long]] - def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x) + def output: Iterable[Long] = outputQueue.asScala.flatten // set up the network stream using the test receiver withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => |