aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/DiskSpillingCache.scala45
1 files changed, 30 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala
index 06359254b9..9e52fee69e 100644
--- a/core/src/main/scala/spark/DiskSpillingCache.scala
+++ b/core/src/main/scala/spark/DiskSpillingCache.scala
@@ -2,12 +2,12 @@ package spark
import java.io.File
import java.io.{FileOutputStream,FileInputStream}
+import java.io.IOException
import java.util.LinkedHashMap
import java.util.UUID
-// TODO: error handling
// TODO: cache into a separate directory using Utils.createTempDir
-// TODO: after reading an entry from disk, put it into the cache
+// TODO: clean up disk cache afterwards
class DiskSpillingCache extends BoundedMemoryCache {
private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true)
@@ -21,14 +21,22 @@ class DiskSpillingCache extends BoundedMemoryCache {
case _ => diskMap.get(key) match {
case file: Any => // found on disk
- val startTime = System.currentTimeMillis
- val bytes = new Array[Byte](file.length.toInt)
- new FileInputStream(file).read(bytes)
- val timeTaken = System.currentTimeMillis - startTime
- logInfo("Reading key %s of size %d bytes from disk took %d ms".format(
- key, file.length, timeTaken))
- super.put(key, bytes)
- ser.deserialize(bytes.asInstanceOf[Array[Byte]])
+ try {
+ val startTime = System.currentTimeMillis
+ val bytes = new Array[Byte](file.length.toInt)
+ new FileInputStream(file).read(bytes)
+ val timeTaken = System.currentTimeMillis - startTime
+ logInfo("Reading key %s of size %d bytes from disk took %d ms".format(
+ key, file.length, timeTaken))
+ super.put(key, bytes)
+ ser.deserialize(bytes.asInstanceOf[Array[Byte]])
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to read key %s from disk at %s: %s".format(
+ key, file.getPath(), e.getMessage()))
+ diskMap.remove(key) // remove dead entry
+ null
+ }
case _ => // not found
null
@@ -50,12 +58,19 @@ class DiskSpillingCache extends BoundedMemoryCache {
logInfo("Spilling key %s of size %d to make space".format(
key, entry.size))
val cacheDir = System.getProperty(
- "spark.DiskSpillingCache.cacheDir",
+ "spark.diskSpillingCache.cacheDir",
System.getProperty("java.io.tmpdir"))
val file = new File(cacheDir, "spark-dsc-" + UUID.randomUUID.toString)
- val stream = new FileOutputStream(file)
- stream.write(entry.value.asInstanceOf[Array[Byte]])
- stream.close()
- diskMap.put(key, file)
+ try {
+ val stream = new FileOutputStream(file)
+ stream.write(entry.value.asInstanceOf[Array[Byte]])
+ stream.close()
+ diskMap.put(key, file)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to spill key %s to disk at %s: %s".format(
+ key, file.getPath(), e.getMessage()))
+ // Do nothing and let the entry be discarded
+ }
}
}