aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-02 01:53:14 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-02 01:53:14 -0700
commitecc6eb50a59172dc132bd8f97957734f6f009024 (patch)
tree308bc1c27860debf9ce550007de0cf23addb5f92 /streaming
parent7394e7adeb03df159978f1d10061d9ec6a913968 (diff)
downloadspark-ecc6eb50a59172dc132bd8f97957734f6f009024.tar.gz
spark-ecc6eb50a59172dc132bd8f97957734f6f009024.tar.bz2
spark-ecc6eb50a59172dc132bd8f97957734f6f009024.zip
[SPARK-7315] [STREAMING] [TEST] Fix flaky WALBackedBlockRDDSuite
`FileUtils.getTempDirectoryPath()` path may or may not exist. We want to make sure that it does not exist. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5853 from tdas/SPARK-7315 and squashes the following commits: 141afd5 [Tathagata Das] Removed use of FileUtils b08d4f1 [Tathagata Das] Fix flaky WALBackedBlockRDDSuite
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ebdf418f4a..f4c8046e8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -16,13 +16,13 @@
*/
package org.apache.spark.streaming.rdd
+import java.io.File
import java.nio.ByteBuffer
+import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import org.apache.commons.io.FileUtils
-
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -108,9 +108,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
- val dummyDirectory = FileUtils.getTempDirectoryPath()
+ // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
+ // this dummy directory should not already exist otherwise the WAL will try to recover
+ // past events from the directory and throw errors.
+ val nonExistentDirectory = new File(
+ System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
- SparkEnv.get.conf, dummyDirectory, hadoopConf)
+ SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>