aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-20 09:02:09 -0800
committerSean Owen <sowen@cloudera.com>2017-02-20 09:02:09 -0800
commitd0ecca6075d86bedebf8bc2278085a2cd6cb0a43 (patch)
tree4582f88e40df02916659800e8fa4068d585da63d
parent776b8f17cfc687a57c005a421a81e591c8d44a3f (diff)
downloadspark-d0ecca6075d86bedebf8bc2278085a2cd6cb0a43.tar.gz
spark-d0ecca6075d86bedebf8bc2278085a2cd6cb0a43.tar.bz2
spark-d0ecca6075d86bedebf8bc2278085a2cd6cb0a43.zip
[SPARK-19646][CORE][STREAMING] binaryRecords replicates records in scala API
## What changes were proposed in this pull request? Use `BytesWritable.copyBytes`, not `getBytes`, because `getBytes` returns the underlying array, which may be reused when repeated reads don't need a different size, as is the case with binaryRecords APIs ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16974 from srowen/SPARK-19646.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala178
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala21
4 files changed, 53 insertions, 156 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e4d83893e7..17194b9f06 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -961,12 +961,11 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[LongWritable],
classOf[BytesWritable],
conf = conf)
- val data = br.map { case (k, v) =>
- val bytes = v.getBytes
+ br.map { case (k, v) =>
+ val bytes = v.copyBytes()
assert(bytes.length == recordLength, "Byte array does not have correct length")
bytes
}
- data
}
/**
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 6538507d40..a2d3177c5c 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.io._
+import java.nio.ByteBuffer
import java.util.zip.GZIPOutputStream
import scala.io.Source
@@ -30,7 +31,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
-import org.apache.spark.input.PortableDataStream
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
@@ -237,24 +237,26 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}
- test("binary file input as byte array") {
- sc = new SparkContext("local", "test")
+ private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = {
val outFile = new File(tempDir, "record-bytestream-00000.bin")
- val outFileName = outFile.getAbsolutePath()
-
- // create file
- val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
- val bbuf = java.nio.ByteBuffer.wrap(testOutput)
- // write data to file
- val file = new java.io.FileOutputStream(outFile)
+ val file = new FileOutputStream(outFile)
val channel = file.getChannel
- channel.write(bbuf)
+ for (i <- 0 until testOutputCopies) {
+ // Shift values by i so that they're different in the output
+ val alteredOutput = testOutput.map(b => (b + i).toByte)
+ channel.write(ByteBuffer.wrap(alteredOutput))
+ }
channel.close()
file.close()
+ outFile
+ }
- val inRdd = sc.binaryFiles(outFileName)
- val (infile: String, indata: PortableDataStream) = inRdd.collect.head
-
+ test("binary file input as byte array") {
+ sc = new SparkContext("local", "test")
+ val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+ val outFile = writeBinaryData(testOutput, 1)
+ val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
+ val (infile, indata) = inRdd.collect().head
// Make sure the name and array match
assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added
assert(indata.toArray === testOutput)
@@ -262,159 +264,55 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test("portabledatastream caching tests") {
sc = new SparkContext("local", "test")
- val outFile = new File(tempDir, "record-bytestream-00000.bin")
- val outFileName = outFile.getAbsolutePath()
-
- // create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
- val bbuf = java.nio.ByteBuffer.wrap(testOutput)
- // write data to file
- val file = new java.io.FileOutputStream(outFile)
- val channel = file.getChannel
- channel.write(bbuf)
- channel.close()
- file.close()
-
- val inRdd = sc.binaryFiles(outFileName).cache()
- inRdd.foreach{
- curData: (String, PortableDataStream) =>
- curData._2.toArray() // force the file to read
- }
- val mappedRdd = inRdd.map {
- curData: (String, PortableDataStream) =>
- (curData._2.getPath(), curData._2)
- }
- val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
-
+ val outFile = writeBinaryData(testOutput, 1)
+ val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache()
+ inRdd.foreach(_._2.toArray()) // force the file to read
// Try reading the output back as an object file
-
- assert(indata.toArray === testOutput)
+ assert(inRdd.values.collect().head.toArray === testOutput)
}
test("portabledatastream persist disk storage") {
sc = new SparkContext("local", "test")
- val outFile = new File(tempDir, "record-bytestream-00000.bin")
- val outFileName = outFile.getAbsolutePath()
-
- // create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
- val bbuf = java.nio.ByteBuffer.wrap(testOutput)
- // write data to file
- val file = new java.io.FileOutputStream(outFile)
- val channel = file.getChannel
- channel.write(bbuf)
- channel.close()
- file.close()
-
- val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY)
- inRdd.foreach{
- curData: (String, PortableDataStream) =>
- curData._2.toArray() // force the file to read
- }
- val mappedRdd = inRdd.map {
- curData: (String, PortableDataStream) =>
- (curData._2.getPath(), curData._2)
- }
- val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
-
- // Try reading the output back as an object file
-
- assert(indata.toArray === testOutput)
+ val outFile = writeBinaryData(testOutput, 1)
+ val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY)
+ inRdd.foreach(_._2.toArray()) // force the file to read
+ assert(inRdd.values.collect().head.toArray === testOutput)
}
test("portabledatastream flatmap tests") {
sc = new SparkContext("local", "test")
- val outFile = new File(tempDir, "record-bytestream-00000.bin")
- val outFileName = outFile.getAbsolutePath()
-
- // create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+ val outFile = writeBinaryData(testOutput, 1)
+ val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
val numOfCopies = 3
- val bbuf = java.nio.ByteBuffer.wrap(testOutput)
- // write data to file
- val file = new java.io.FileOutputStream(outFile)
- val channel = file.getChannel
- channel.write(bbuf)
- channel.close()
- file.close()
-
- val inRdd = sc.binaryFiles(outFileName)
- val mappedRdd = inRdd.map {
- curData: (String, PortableDataStream) =>
- (curData._2.getPath(), curData._2)
- }
- val copyRdd = mappedRdd.flatMap {
- curData: (String, PortableDataStream) =>
- for (i <- 1 to numOfCopies) yield (i, curData._2)
- }
-
- val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()
-
- // Try reading the output back as an object file
+ val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2))
+ val copyArr = copyRdd.collect()
assert(copyArr.length == numOfCopies)
- copyArr.foreach{
- cEntry: (Int, PortableDataStream) =>
- assert(cEntry._2.toArray === testOutput)
+ for (i <- copyArr.indices) {
+ assert(copyArr(i).toArray === testOutput)
}
-
}
test("fixed record length binary file as byte array") {
- // a fixed length of 6 bytes
-
sc = new SparkContext("local", "test")
-
- val outFile = new File(tempDir, "record-bytestream-00000.bin")
- val outFileName = outFile.getAbsolutePath()
-
- // create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val testOutputCopies = 10
-
- // write data to file
- val file = new java.io.FileOutputStream(outFile)
- val channel = file.getChannel
- for(i <- 1 to testOutputCopies) {
- val bbuf = java.nio.ByteBuffer.wrap(testOutput)
- channel.write(bbuf)
- }
- channel.close()
- file.close()
-
- val inRdd = sc.binaryRecords(outFileName, testOutput.length)
- // make sure there are enough elements
+ val outFile = writeBinaryData(testOutput, testOutputCopies)
+ val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length)
assert(inRdd.count == testOutputCopies)
-
- // now just compare the first one
- val indata: Array[Byte] = inRdd.collect.head
- assert(indata === testOutput)
+ val inArr = inRdd.collect()
+ for (i <- inArr.indices) {
+ assert(inArr(i) === testOutput.map(b => (b + i).toByte))
+ }
}
test ("negative binary record length should raise an exception") {
- // a fixed length of 6 bytes
sc = new SparkContext("local", "test")
-
- val outFile = new File(tempDir, "record-bytestream-00000.bin")
- val outFileName = outFile.getAbsolutePath()
-
- // create file
- val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
- val testOutputCopies = 10
-
- // write data to file
- val file = new java.io.FileOutputStream(outFile)
- val channel = file.getChannel
- for(i <- 1 to testOutputCopies) {
- val bbuf = java.nio.ByteBuffer.wrap(testOutput)
- channel.write(bbuf)
- }
- channel.close()
- file.close()
-
- val inRdd = sc.binaryRecords(outFileName, -1)
-
+ val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1)
intercept[SparkException] {
- inRdd.count
+ sc.binaryRecords(outFile.getAbsolutePath, -1).count()
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 0a4c141e5b..a34f6c73fe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -435,13 +435,12 @@ class StreamingContext private[streaming] (
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
- val data = br.map { case (k, v) =>
- val bytes = v.getBytes
+ br.map { case (k, v) =>
+ val bytes = v.copyBytes()
require(bytes.length == recordLength, "Byte array does not have correct length. " +
s"${bytes.length} did not equal recordLength: $recordLength")
bytes
}
- data
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6fb50a4052..b5d36a3651 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -84,7 +84,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
- val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
+ val output = outputQueue.asScala.flatten.toArray
assert(output.length === expectedOutput.size)
for (i <- output.indices) {
assert(output(i) === expectedOutput(i))
@@ -155,14 +155,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)
- val input = Seq(1, 2, 3, 4, 5)
- input.foreach { i =>
+ val numCopies = 3
+ val input = Array[Byte](1, 2, 3, 4, 5)
+ for (i <- 0 until numCopies) {
Thread.sleep(batchDuration.milliseconds)
val file = new File(testDir, i.toString)
- Files.write(Array[Byte](i.toByte), file)
+ Files.write(input.map(b => (b + i).toByte), file)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
- logInfo("Created file " + file)
+ logInfo(s"Created file $file")
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
@@ -170,10 +171,10 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(batchCounter.getNumCompletedBatches === i)
}
}
-
- val expectedOutput = input.map(i => i.toByte)
- val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte)
- assert(obtainedOutput.toSeq === expectedOutput)
+ val obtainedOutput = outputQueue.asScala.map(_.flatten).toSeq
+ for (i <- obtainedOutput.indices) {
+ assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
+ }
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
@@ -258,7 +259,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
MultiThreadTestReceiver.haveAllThreadsFinished = false
val outputQueue = new ConcurrentLinkedQueue[Seq[Long]]
- def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x)
+ def output: Iterable[Long] = outputQueue.asScala.flatten
// set up the network stream using the test receiver
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>