diff options
author | Josh Rosen <joshrosen@databricks.com> | 2014-12-15 14:33:43 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-15 14:33:43 -0800 |
commit | f6b8591a08835c9af19210f9cdfbaab2537135c4 (patch) | |
tree | 9983a44dbb17a708f55361357165dbac45c7118d | |
parent | 8098fab06cb2be22cca4e531e8e65ab29dbb909a (diff) | |
download | spark-f6b8591a08835c9af19210f9cdfbaab2537135c4.tar.gz spark-f6b8591a08835c9af19210f9cdfbaab2537135c4.tar.bz2 spark-f6b8591a08835c9af19210f9cdfbaab2537135c4.zip |
[SPARK-4826] Fix generation of temp file names in WAL tests
This PR should fix SPARK-4826, an issue where a bug in how we generate temp. file names was causing spurious test failures in the write ahead log suites.
Closes #3695.
Closes #3701.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #3704 from JoshRosen/SPARK-4826 and squashes the following commits:
f2307f5 [Josh Rosen] Use Spark Utils class for directory creation/deletion
a693ddb [Josh Rosen] remove unused Random import
b275e41 [Josh Rosen] Move creation of temp. dir to beforeEach/afterEach.
9362919 [Josh Rosen] [SPARK-4826] Fix bug in generation of temp file names. in WAL suites.
86c1944 [Josh Rosen] Revert "HOTFIX: Disabling failing block manager test"
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala | 28 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 9 |
2 files changed, 20 insertions, 17 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 728e7f0afa..7a6a2f3e57 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -20,15 +20,15 @@ import java.io.File import scala.util.Random -import com.google.common.io.Files import org.apache.hadoop.conf.Configuration -import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} +import org.apache.spark.util.Utils -class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { +class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { val conf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) @@ -38,36 +38,42 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { var blockManager: BlockManager = null var dir: File = null + override def beforeEach(): Unit = { + dir = Utils.createTempDir() + } + + override def afterEach(): Unit = { + Utils.deleteRecursively(dir) + } + override def beforeAll(): Unit = { sparkContext = new SparkContext(conf) blockManager = sparkContext.env.blockManager - dir = Files.createTempDir() } override def afterAll(): Unit = { // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests. sparkContext.stop() - dir.delete() System.clearProperty("spark.driver.port") } - ignore("Read data available in block manager and write ahead log") { + test("Read data available in block manager and write ahead log") { testRDD(5, 5) } - ignore("Read data available only in block manager, not in write ahead log") { + test("Read data available only in block manager, not in write ahead log") { testRDD(5, 0) } - ignore("Read data available only in write ahead log, not in block manager") { + test("Read data available only in write ahead log, not in block manager") { testRDD(0, 5) } - ignore("Read data available only in write ahead log, and test storing in block manager") { + test("Read data available only in write ahead log, and test storing in block manager") { testRDD(0, 5, testStoreInBM = true) } - ignore("Read data with partially available in block manager, and rest in write ahead log") { + test("Read data with partially available in block manager, and rest in write ahead log") { testRDD(3, 2) } @@ -137,7 +143,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { blockIds: Seq[BlockId] ): Seq[WriteAheadLogFileSegment] = { require(blockData.size === blockIds.size) - val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf) + val writer = new WriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => writer.write(blockManager.dataSerialize(id, data.iterator)) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 1956a4f1db..8f69bcb642 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -22,11 +22,8 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.{implicitConversions, postfixOps} -import scala.util.Random import WriteAheadLogSuite._ -import com.google.common.io.Files -import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.util.Utils @@ -42,9 +39,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { var manager: WriteAheadLogManager = null before { - tempDir = Files.createTempDir() + tempDir = Utils.createTempDir() testDir = tempDir.toString - testFile = new File(tempDir, Random.nextString(10)).toString + testFile = new File(tempDir, "testFile").toString if (manager != null) { manager.stop() manager = null @@ -52,7 +49,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } after { - FileUtils.deleteQuietly(tempDir) + Utils.deleteRecursively(tempDir) } test("WriteAheadLogWriter - writing data") { |