aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-12-15 14:33:43 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-15 14:33:43 -0800
commitf6b8591a08835c9af19210f9cdfbaab2537135c4 (patch)
tree9983a44dbb17a708f55361357165dbac45c7118d /streaming
parent8098fab06cb2be22cca4e531e8e65ab29dbb909a (diff)
downloadspark-f6b8591a08835c9af19210f9cdfbaab2537135c4.tar.gz
spark-f6b8591a08835c9af19210f9cdfbaab2537135c4.tar.bz2
spark-f6b8591a08835c9af19210f9cdfbaab2537135c4.zip
[SPARK-4826] Fix generation of temp file names in WAL tests
This PR should fix SPARK-4826, an issue where a bug in how we generate temp. file names was causing spurious test failures in the write ahead log suites. Closes #3695. Closes #3701. Author: Josh Rosen <joshrosen@databricks.com> Closes #3704 from JoshRosen/SPARK-4826 and squashes the following commits: f2307f5 [Josh Rosen] Use Spark Utils class for directory creation/deletion a693ddb [Josh Rosen] remove unused Random import b275e41 [Josh Rosen] Move creation of temp. dir to beforeEach/afterEach. 9362919 [Josh Rosen] [SPARK-4826] Fix bug in generation of temp file names. in WAL suites. 86c1944 [Josh Rosen] Revert "HOTFIX: Disabling failing block manager test"
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala28
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala9
2 files changed, 20 insertions, 17 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 728e7f0afa..7a6a2f3e57 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -20,15 +20,15 @@ import java.io.File
import scala.util.Random
-import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
+import org.apache.spark.util.Utils
-class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
+class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
@@ -38,36 +38,42 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
var blockManager: BlockManager = null
var dir: File = null
+ override def beforeEach(): Unit = {
+ dir = Utils.createTempDir()
+ }
+
+ override def afterEach(): Unit = {
+ Utils.deleteRecursively(dir)
+ }
+
override def beforeAll(): Unit = {
sparkContext = new SparkContext(conf)
blockManager = sparkContext.env.blockManager
- dir = Files.createTempDir()
}
override def afterAll(): Unit = {
// Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
sparkContext.stop()
- dir.delete()
System.clearProperty("spark.driver.port")
}
- ignore("Read data available in block manager and write ahead log") {
+ test("Read data available in block manager and write ahead log") {
testRDD(5, 5)
}
- ignore("Read data available only in block manager, not in write ahead log") {
+ test("Read data available only in block manager, not in write ahead log") {
testRDD(5, 0)
}
- ignore("Read data available only in write ahead log, not in block manager") {
+ test("Read data available only in write ahead log, not in block manager") {
testRDD(0, 5)
}
- ignore("Read data available only in write ahead log, and test storing in block manager") {
+ test("Read data available only in write ahead log, and test storing in block manager") {
testRDD(0, 5, testStoreInBM = true)
}
- ignore("Read data with partially available in block manager, and rest in write ahead log") {
+ test("Read data with partially available in block manager, and rest in write ahead log") {
testRDD(3, 2)
}
@@ -137,7 +143,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
blockIds: Seq[BlockId]
): Seq[WriteAheadLogFileSegment] = {
require(blockData.size === blockIds.size)
- val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf)
+ val writer = new WriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
val segments = blockData.zip(blockIds).map { case (data, id) =>
writer.write(blockManager.dataSerialize(id, data.iterator))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 1956a4f1db..8f69bcb642 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -22,11 +22,8 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
-import scala.util.Random
import WriteAheadLogSuite._
-import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.util.Utils
@@ -42,9 +39,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
var manager: WriteAheadLogManager = null
before {
- tempDir = Files.createTempDir()
+ tempDir = Utils.createTempDir()
testDir = tempDir.toString
- testFile = new File(tempDir, Random.nextString(10)).toString
+ testFile = new File(tempDir, "testFile").toString
if (manager != null) {
manager.stop()
manager = null
@@ -52,7 +49,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}
after {
- FileUtils.deleteQuietly(tempDir)
+ Utils.deleteRecursively(tempDir)
}
test("WriteAheadLogWriter - writing data") {