aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala88
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala51
4 files changed, 149 insertions, 48 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 16d67cbfca..5048c7dab2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream, OutputStream}
+import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -489,16 +489,17 @@ private[spark] class BlockManager(
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
- externalBlockStore.getBytes(blockId) match {
- case Some(bytes) =>
- if (!asBlockResult) {
- return Some(bytes)
- } else {
- return Some(new BlockResult(
- dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
- }
+ val result = if (asBlockResult) {
+ externalBlockStore.getValues(blockId)
+ .map(new BlockResult(_, DataReadMethod.Memory, info.size))
+ } else {
+ externalBlockStore.getBytes(blockId)
+ }
+ result match {
+ case Some(values) =>
+ return result
case None =>
- logDebug(s"Block $blockId not found in externalBlockStore")
+ logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
@@ -1206,8 +1207,19 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
- val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
- serializer.newInstance().deserializeStream(stream).asIterator
+ dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+ }
+
+ /**
+ * Deserializes a InputStream into an iterator of values and disposes of it when the end of
+ * the iterator is reached.
+ */
+ def dataDeserializeStream(
+ blockId: BlockId,
+ inputStream: InputStream,
+ serializer: Serializer = defaultSerializer): Iterator[Any] = {
+ val stream = new BufferedInputStream(inputStream)
+ serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
}
def stop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
index 8964762df6..f39325a12d 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
@@ -32,6 +32,8 @@ import java.nio.ByteBuffer
*/
private[spark] abstract class ExternalBlockManager {
+ protected var blockManager: BlockManager = _
+
override def toString: String = {"External Block Store"}
/**
@@ -41,7 +43,9 @@ private[spark] abstract class ExternalBlockManager {
*
* @throws java.io.IOException if there is any file system failure during the initialization.
*/
- def init(blockManager: BlockManager, executorId: String): Unit
+ def init(blockManager: BlockManager, executorId: String): Unit = {
+ this.blockManager = blockManager
+ }
/**
* Drop the block from underlying external block store, if it exists..
@@ -73,6 +77,11 @@ private[spark] abstract class ExternalBlockManager {
*/
def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
+ def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+ val bytes = blockManager.dataSerialize(blockId, values)
+ putBytes(blockId, bytes)
+ }
+
/**
* Retrieve the block bytes.
* @return Some(ByteBuffer) if the block bytes is successfully retrieved
@@ -83,6 +92,17 @@ private[spark] abstract class ExternalBlockManager {
def getBytes(blockId: BlockId): Option[ByteBuffer]
/**
+ * Retrieve the block data.
+ * @return Some(Iterator[Any]) if the block data is successfully retrieved
+ * None if the block does not exist in the external block store.
+ *
+ * @throws java.io.IOException if there is any file system failure in getting the block.
+ */
+ def getValues(blockId: BlockId): Option[Iterator[_]] = {
+ getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+ }
+
+ /**
* Get the size of the block saved in the underlying external block store,
* which is saved before by putBytes.
* @return size of the block
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
index 0bf770306a..291394ed34 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -18,9 +18,11 @@
package org.apache.spark.storage
import java.nio.ByteBuffer
+
+import scala.util.control.NonFatal
+
import org.apache.spark.Logging
import org.apache.spark.util.Utils
-import scala.util.control.NonFatal
/**
@@ -40,7 +42,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
} catch {
case NonFatal(t) =>
- logError(s"error in getSize from $blockId", t)
+ logError(s"Error in getSize($blockId)", t)
0L
}
}
@@ -54,7 +56,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- putIterator(blockId, values.toIterator, level, returnValues)
+ putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
}
override def putIterator(
@@ -62,42 +64,70 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- logDebug(s"Attempting to write values for block $blockId")
- val bytes = blockManager.dataSerialize(blockId, values)
- putIntoExternalBlockStore(blockId, bytes, returnValues)
+ putIntoExternalBlockStore(blockId, values, returnValues)
}
private def putIntoExternalBlockStore(
blockId: BlockId,
- bytes: ByteBuffer,
+ values: Iterator[_],
returnValues: Boolean): PutResult = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val byteBuffer = bytes.duplicate()
- byteBuffer.rewind()
- logDebug(s"Attempting to put block $blockId into ExtBlk store")
+ logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
// we should never hit here if externalBlockManager is None. Handle it anyway for safety.
try {
val startTime = System.currentTimeMillis
if (externalBlockManager.isDefined) {
- externalBlockManager.get.putBytes(blockId, bytes)
+ externalBlockManager.get.putValues(blockId, values)
+ val size = getSize(blockId)
+ val data = if (returnValues) {
+ Left(getValues(blockId).get)
+ } else {
+ null
+ }
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
- blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
+ blockId, Utils.bytesToString(size), finishTime - startTime))
+ PutResult(size, data)
+ } else {
+ logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+ }
+ } catch {
+ case NonFatal(t) =>
+ logError(s"Error in putValues($blockId)", t)
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+ }
+ }
- if (returnValues) {
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ private def putIntoExternalBlockStore(
+ blockId: BlockId,
+ bytes: ByteBuffer,
+ returnValues: Boolean): PutResult = {
+ logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
+ // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
+ try {
+ val startTime = System.currentTimeMillis
+ if (externalBlockManager.isDefined) {
+ val byteBuffer = bytes.duplicate()
+ byteBuffer.rewind()
+ externalBlockManager.get.putBytes(blockId, byteBuffer)
+ val size = bytes.limit()
+ val data = if (returnValues) {
+ Right(bytes)
} else {
- PutResult(bytes.limit(), null)
+ null
}
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
+ blockId, Utils.bytesToString(size), finishTime - startTime))
+ PutResult(size, data)
} else {
- logError(s"error in putBytes $blockId")
- PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
} catch {
case NonFatal(t) =>
- logError(s"error in putBytes $blockId", t)
- PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ logError(s"Error in putBytes($blockId)", t)
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
}
@@ -107,13 +137,19 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
} catch {
case NonFatal(t) =>
- logError(s"error in removing $blockId", t)
+ logError(s"Error in removeBlock($blockId)", t)
true
}
}
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+ try {
+ externalBlockManager.flatMap(_.getValues(blockId))
+ } catch {
+ case NonFatal(t) =>
+ logError(s"Error in getValues($blockId)", t)
+ None
+ }
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -121,7 +157,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.flatMap(_.getBytes(blockId))
} catch {
case NonFatal(t) =>
- logError(s"error in getBytes from $blockId", t)
+ logError(s"Error in getBytes($blockId)", t)
None
}
}
@@ -130,13 +166,13 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
try {
val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
if (!ret) {
- logInfo(s"remove block $blockId")
+ logInfo(s"Remove block $blockId")
blockManager.removeBlock(blockId, true)
}
ret
} catch {
case NonFatal(t) =>
- logError(s"error in getBytes from $blockId", t)
+ logError(s"Error in getBytes($blockId)", t)
false
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index bdc6276e41..fb4ba0eac9 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.{Date, Random}
+import scala.util.control.NonFatal
+
import com.google.common.io.ByteStreams
+
import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
import tachyon.TachyonURI
@@ -38,7 +41,6 @@ import org.apache.spark.util.Utils
*/
private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
- var blockManager: BlockManager =_
var rootDirs: String = _
var master: String = _
var client: tachyon.client.TachyonFS = _
@@ -52,7 +54,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def init(blockManager: BlockManager, executorId: String): Unit = {
- this.blockManager = blockManager
+ super.init(blockManager, executorId)
val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
@@ -95,8 +97,29 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
- os.write(bytes.array())
- os.close()
+ try {
+ os.write(bytes.array())
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
+ os.cancel()
+ } finally {
+ os.close()
+ }
+ }
+
+ override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+ val file = getFile(blockId)
+ val os = file.getOutStream(WriteType.TRY_CACHE)
+ try {
+ blockManager.dataSerializeStream(blockId, os, values)
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to put values of block $blockId into Tachyon", e)
+ os.cancel()
+ } finally {
+ os.close()
+ }
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -105,21 +128,31 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
return None
}
val is = file.getInStream(ReadType.CACHE)
- assert (is != null)
try {
val size = file.length
val bs = new Array[Byte](size.asInstanceOf[Int])
ByteStreams.readFully(is, bs)
Some(ByteBuffer.wrap(bs))
} catch {
- case ioe: IOException =>
- logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+ case NonFatal(e) =>
+ logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
None
} finally {
is.close()
}
}
+ override def getValues(blockId: BlockId): Option[Iterator[_]] = {
+ val file = getFile(blockId)
+ if (file == null || file.getLocationHosts().size() == 0) {
+ return None
+ }
+ val is = file.getInStream(ReadType.CACHE)
+ Option(is).map { is =>
+ blockManager.dataDeserializeStream(blockId, is)
+ }
+ }
+
override def getSize(blockId: BlockId): Long = {
getFile(blockId.name).length
}
@@ -184,7 +217,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
tachyonDir = client.getFile(path)
}
} catch {
- case e: Exception =>
+ case NonFatal(e) =>
logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
}
}
@@ -206,7 +239,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
- case e: Exception =>
+ case NonFatal(e) =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}