diff options
-rw-r--r-- | core/src/main/scala/spark/DiskSpillingCache.scala | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala index 06359254b9..9e52fee69e 100644 --- a/core/src/main/scala/spark/DiskSpillingCache.scala +++ b/core/src/main/scala/spark/DiskSpillingCache.scala @@ -2,12 +2,12 @@ package spark import java.io.File import java.io.{FileOutputStream,FileInputStream} +import java.io.IOException import java.util.LinkedHashMap import java.util.UUID -// TODO: error handling // TODO: cache into a separate directory using Utils.createTempDir -// TODO: after reading an entry from disk, put it into the cache +// TODO: clean up disk cache afterwards class DiskSpillingCache extends BoundedMemoryCache { private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true) @@ -21,14 +21,22 @@ class DiskSpillingCache extends BoundedMemoryCache { case _ => diskMap.get(key) match { case file: Any => // found on disk - val startTime = System.currentTimeMillis - val bytes = new Array[Byte](file.length.toInt) - new FileInputStream(file).read(bytes) - val timeTaken = System.currentTimeMillis - startTime - logInfo("Reading key %s of size %d bytes from disk took %d ms".format( - key, file.length, timeTaken)) - super.put(key, bytes) - ser.deserialize(bytes.asInstanceOf[Array[Byte]]) + try { + val startTime = System.currentTimeMillis + val bytes = new Array[Byte](file.length.toInt) + new FileInputStream(file).read(bytes) + val timeTaken = System.currentTimeMillis - startTime + logInfo("Reading key %s of size %d bytes from disk took %d ms".format( + key, file.length, timeTaken)) + super.put(key, bytes) + ser.deserialize(bytes.asInstanceOf[Array[Byte]]) + } catch { + case e: IOException => + logWarning("Failed to read key %s from disk at %s: %s".format( + key, file.getPath(), e.getMessage())) + diskMap.remove(key) // remove dead entry + null + } case _ => // not found null @@ -50,12 +58,19 @@ class DiskSpillingCache extends BoundedMemoryCache { logInfo("Spilling key %s of size %d to make space".format( key, entry.size)) val cacheDir = System.getProperty( - "spark.DiskSpillingCache.cacheDir", + "spark.diskSpillingCache.cacheDir", System.getProperty("java.io.tmpdir")) val file = new File(cacheDir, "spark-dsc-" + UUID.randomUUID.toString) - val stream = new FileOutputStream(file) - stream.write(entry.value.asInstanceOf[Array[Byte]]) - stream.close() - diskMap.put(key, file) + try { + val stream = new FileOutputStream(file) + stream.write(entry.value.asInstanceOf[Array[Byte]]) + stream.close() + diskMap.put(key, file) + } catch { + case e: IOException => + logWarning("Failed to spill key %s to disk at %s: %s".format( + key, file.getPath(), e.getMessage())) + // Do nothing and let the entry be discarded + } } } |