aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ebdf418f4a..f4c8046e8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -16,13 +16,13 @@
*/
package org.apache.spark.streaming.rdd
+import java.io.File
import java.nio.ByteBuffer
+import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import org.apache.commons.io.FileUtils
-
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -108,9 +108,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
- val dummyDirectory = FileUtils.getTempDirectoryPath()
+ // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
+ // this dummy directory should not already exist otherwise the WAL will try to recover
+ // past events from the directory and throw errors.
+ val nonExistentDirectory = new File(
+ System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
- SparkEnv.get.conf, dummyDirectory, hadoopConf)
+ SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>