aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/TachyonStore.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala21
1 files changed, 7 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index 932b561604..6dbad5ff05 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.IOException
import java.nio.ByteBuffer
+import com.google.common.io.ByteStreams
import tachyon.client.{ReadType, WriteType}
import org.apache.spark.Logging
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
return None
}
val is = file.getInStream(ReadType.CACHE)
- var buffer: ByteBuffer = null
+ assert (is != null)
try {
- if (is != null) {
- val size = file.length
- val bs = new Array[Byte](size.asInstanceOf[Int])
- val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
- buffer = ByteBuffer.wrap(bs)
- if (fetchSize != size) {
- logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " +
- s"is not equal to fetched size $fetchSize")
- return None
- }
- }
+ val size = file.length
+ val bs = new Array[Byte](size.asInstanceOf[Int])
+ ByteStreams.readFully(is, bs)
+ Some(ByteBuffer.wrap(bs))
} catch {
case ioe: IOException =>
logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
- return None
+ None
}
- Some(buffer)
}
override def contains(blockId: BlockId): Boolean = {