diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/FileSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/FileSuite.scala | 178 |
1 files changed, 38 insertions, 140 deletions
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 6538507d40..a2d3177c5c 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io._ +import java.nio.ByteBuffer import java.util.zip.GZIPOutputStream import scala.io.Source @@ -30,7 +31,6 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} -import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.storage.StorageLevel @@ -237,24 +237,26 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) } - test("binary file input as byte array") { - sc = new SparkContext("local", "test") + private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = { val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file - val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) + val file = new FileOutputStream(outFile) val channel = file.getChannel - channel.write(bbuf) + for (i <- 0 until testOutputCopies) { + // Shift values by i so that they're different in the output + val alteredOutput = testOutput.map(b => (b + i).toByte) + channel.write(ByteBuffer.wrap(alteredOutput)) + } channel.close() file.close() + outFile + } - val inRdd = sc.binaryFiles(outFileName) - val (infile: String, indata: PortableDataStream) = inRdd.collect.head - + test("binary file input as byte array") { + sc = new SparkContext("local", "test") + val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath) + val (infile, indata) = inRdd.collect().head // Make sure the name and array match assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added assert(indata.toArray === testOutput) @@ -262,159 +264,55 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test("portabledatastream caching tests") { sc = new SparkContext("local", "test") - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - channel.write(bbuf) - channel.close() - file.close() - - val inRdd = sc.binaryFiles(outFileName).cache() - inRdd.foreach{ - curData: (String, PortableDataStream) => - curData._2.toArray() // force the file to read - } - val mappedRdd = inRdd.map { - curData: (String, PortableDataStream) => - (curData._2.getPath(), curData._2) - } - val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head - + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache() + inRdd.foreach(_._2.toArray()) // force the file to read // Try reading the output back as an object file - - assert(indata.toArray === testOutput) + assert(inRdd.values.collect().head.toArray === testOutput) } test("portabledatastream persist disk storage") { sc = new SparkContext("local", "test") - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - channel.write(bbuf) - channel.close() - file.close() - - val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY) - inRdd.foreach{ - curData: (String, PortableDataStream) => - curData._2.toArray() // force the file to read - } - val mappedRdd = inRdd.map { - curData: (String, PortableDataStream) => - (curData._2.getPath(), curData._2) - } - val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head - - // Try reading the output back as an object file - - assert(indata.toArray === testOutput) + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY) + inRdd.foreach(_._2.toArray()) // force the file to read + assert(inRdd.values.collect().head.toArray === testOutput) } test("portabledatastream flatmap tests") { sc = new SparkContext("local", "test") - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) + val outFile = writeBinaryData(testOutput, 1) + val inRdd = sc.binaryFiles(outFile.getAbsolutePath) val numOfCopies = 3 - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - channel.write(bbuf) - channel.close() - file.close() - - val inRdd = sc.binaryFiles(outFileName) - val mappedRdd = inRdd.map { - curData: (String, PortableDataStream) => - (curData._2.getPath(), curData._2) - } - val copyRdd = mappedRdd.flatMap { - curData: (String, PortableDataStream) => - for (i <- 1 to numOfCopies) yield (i, curData._2) - } - - val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect() - - // Try reading the output back as an object file + val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2)) + val copyArr = copyRdd.collect() assert(copyArr.length == numOfCopies) - copyArr.foreach{ - cEntry: (Int, PortableDataStream) => - assert(cEntry._2.toArray === testOutput) + for (i <- copyArr.indices) { + assert(copyArr(i).toArray === testOutput) } - } test("fixed record length binary file as byte array") { - // a fixed length of 6 bytes - sc = new SparkContext("local", "test") - - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) val testOutputCopies = 10 - - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - for(i <- 1 to testOutputCopies) { - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - channel.write(bbuf) - } - channel.close() - file.close() - - val inRdd = sc.binaryRecords(outFileName, testOutput.length) - // make sure there are enough elements + val outFile = writeBinaryData(testOutput, testOutputCopies) + val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length) assert(inRdd.count == testOutputCopies) - - // now just compare the first one - val indata: Array[Byte] = inRdd.collect.head - assert(indata === testOutput) + val inArr = inRdd.collect() + for (i <- inArr.indices) { + assert(inArr(i) === testOutput.map(b => (b + i).toByte)) + } } test ("negative binary record length should raise an exception") { - // a fixed length of 6 bytes sc = new SparkContext("local", "test") - - val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() - - // create file - val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) - val testOutputCopies = 10 - - // write data to file - val file = new java.io.FileOutputStream(outFile) - val channel = file.getChannel - for(i <- 1 to testOutputCopies) { - val bbuf = java.nio.ByteBuffer.wrap(testOutput) - channel.write(bbuf) - } - channel.close() - file.close() - - val inRdd = sc.binaryRecords(outFileName, -1) - + val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1) intercept[SparkException] { - inRdd.count + sc.binaryRecords(outFile.getAbsolutePath, -1).count() } } |