aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMingfei <mingfei.shi@intel.com>2015-05-20 22:33:03 -0700
committerPatrick Wendell <patrick@databricks.com>2015-05-20 22:33:03 -0700
commit04940c49755fd2e7f1ed7b875da287c946bfebeb (patch)
tree5eabc4b6d9509670bb3a06c57f19fe3934e4b6d2
parentd0eb9ffe978c663b7aa06e908cadee81767d23d1 (diff)
downloadspark-04940c49755fd2e7f1ed7b875da287c946bfebeb.tar.gz
spark-04940c49755fd2e7f1ed7b875da287c946bfebeb.tar.bz2
spark-04940c49755fd2e7f1ed7b875da287c946bfebeb.zip
[SPARK-7389] [CORE] Tachyon integration improvement
Two main changes: Add two functions in ExternalBlockManager, which are putValues and getValues because the implementation may not rely on the putBytes and getBytes improve Tachyon integration. Currently, when putting data into Tachyon, Spark first serialize all data in one partition into a ByteBuffer, and then write into Tachyon, this will uses much memory and increase GC overhead when get data from Tachyon, getValues depends on getBytes, which also read all data into On heap byte arry, and result in much memory usage. This PR changes the approach of the two functions, make them read / write data by stream to reduce memory usage. In our testing, when data size is huge, this patch reduces about 30% GC time and 70% full GC time, and total execution time reduces about 10% Author: Mingfei <mingfei.shi@intel.com> Closes #5908 from shimingfei/Tachyon-integration-rebase and squashes the following commits: 033bc57 [Mingfei] modify accroding to comments 747c69a [Mingfei] modify according to comments - format changes ce52c67 [Mingfei] put close() in a finally block d2c60bb [Mingfei] modify according to comments, some code style change 4c11591 [Mingfei] modify according to comments split putIntoExternalBlockStore into two functions add default implementation for getValues and putValues cc0a32e [Mingfei] Make getValues read data from Tachyon by stream Make putValues write data to Tachyon by stream 017593d [Mingfei] add getValues and putValues in ExternalBlockManager's Interface
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala88
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala51
4 files changed, 149 insertions, 48 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 16d67cbfca..5048c7dab2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream, OutputStream}
+import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -489,16 +489,17 @@ private[spark] class BlockManager(
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
- externalBlockStore.getBytes(blockId) match {
- case Some(bytes) =>
- if (!asBlockResult) {
- return Some(bytes)
- } else {
- return Some(new BlockResult(
- dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
- }
+ val result = if (asBlockResult) {
+ externalBlockStore.getValues(blockId)
+ .map(new BlockResult(_, DataReadMethod.Memory, info.size))
+ } else {
+ externalBlockStore.getBytes(blockId)
+ }
+ result match {
+ case Some(values) =>
+ return result
case None =>
- logDebug(s"Block $blockId not found in externalBlockStore")
+ logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
@@ -1206,8 +1207,19 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
- val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
- serializer.newInstance().deserializeStream(stream).asIterator
+ dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+ }
+
+ /**
+ * Deserializes a InputStream into an iterator of values and disposes of it when the end of
+ * the iterator is reached.
+ */
+ def dataDeserializeStream(
+ blockId: BlockId,
+ inputStream: InputStream,
+ serializer: Serializer = defaultSerializer): Iterator[Any] = {
+ val stream = new BufferedInputStream(inputStream)
+ serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
}
def stop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
index 8964762df6..f39325a12d 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
@@ -32,6 +32,8 @@ import java.nio.ByteBuffer
*/
private[spark] abstract class ExternalBlockManager {
+ protected var blockManager: BlockManager = _
+
override def toString: String = {"External Block Store"}
/**
@@ -41,7 +43,9 @@ private[spark] abstract class ExternalBlockManager {
*
* @throws java.io.IOException if there is any file system failure during the initialization.
*/
- def init(blockManager: BlockManager, executorId: String): Unit
+ def init(blockManager: BlockManager, executorId: String): Unit = {
+ this.blockManager = blockManager
+ }
/**
* Drop the block from underlying external block store, if it exists..
@@ -73,6 +77,11 @@ private[spark] abstract class ExternalBlockManager {
*/
def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
+ def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+ val bytes = blockManager.dataSerialize(blockId, values)
+ putBytes(blockId, bytes)
+ }
+
/**
* Retrieve the block bytes.
* @return Some(ByteBuffer) if the block bytes is successfully retrieved
@@ -83,6 +92,17 @@ private[spark] abstract class ExternalBlockManager {
def getBytes(blockId: BlockId): Option[ByteBuffer]
/**
+ * Retrieve the block data.
+ * @return Some(Iterator[Any]) if the block data is successfully retrieved
+ * None if the block does not exist in the external block store.
+ *
+ * @throws java.io.IOException if there is any file system failure in getting the block.
+ */
+ def getValues(blockId: BlockId): Option[Iterator[_]] = {
+ getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+ }
+
+ /**
* Get the size of the block saved in the underlying external block store,
* which is saved before by putBytes.
* @return size of the block
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
index 0bf770306a..291394ed34 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -18,9 +18,11 @@
package org.apache.spark.storage
import java.nio.ByteBuffer
+
+import scala.util.control.NonFatal
+
import org.apache.spark.Logging
import org.apache.spark.util.Utils
-import scala.util.control.NonFatal
/**
@@ -40,7 +42,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
} catch {
case NonFatal(t) =>
- logError(s"error in getSize from $blockId", t)
+ logError(s"Error in getSize($blockId)", t)
0L
}
}
@@ -54,7 +56,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- putIterator(blockId, values.toIterator, level, returnValues)
+ putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
}
override def putIterator(
@@ -62,42 +64,70 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- logDebug(s"Attempting to write values for block $blockId")
- val bytes = blockManager.dataSerialize(blockId, values)
- putIntoExternalBlockStore(blockId, bytes, returnValues)
+ putIntoExternalBlockStore(blockId, values, returnValues)
}
private def putIntoExternalBlockStore(
blockId: BlockId,
- bytes: ByteBuffer,
+ values: Iterator[_],
returnValues: Boolean): PutResult = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val byteBuffer = bytes.duplicate()
- byteBuffer.rewind()
- logDebug(s"Attempting to put block $blockId into ExtBlk store")
+ logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
// we should never hit here if externalBlockManager is None. Handle it anyway for safety.
try {
val startTime = System.currentTimeMillis
if (externalBlockManager.isDefined) {
- externalBlockManager.get.putBytes(blockId, bytes)
+ externalBlockManager.get.putValues(blockId, values)
+ val size = getSize(blockId)
+ val data = if (returnValues) {
+ Left(getValues(blockId).get)
+ } else {
+ null
+ }
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
- blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
+ blockId, Utils.bytesToString(size), finishTime - startTime))
+ PutResult(size, data)
+ } else {
+ logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+ }
+ } catch {
+ case NonFatal(t) =>
+ logError(s"Error in putValues($blockId)", t)
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+ }
+ }
- if (returnValues) {
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ private def putIntoExternalBlockStore(
+ blockId: BlockId,
+ bytes: ByteBuffer,
+ returnValues: Boolean): PutResult = {
+ logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
+ // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
+ try {
+ val startTime = System.currentTimeMillis
+ if (externalBlockManager.isDefined) {
+ val byteBuffer = bytes.duplicate()
+ byteBuffer.rewind()
+ externalBlockManager.get.putBytes(blockId, byteBuffer)
+ val size = bytes.limit()
+ val data = if (returnValues) {
+ Right(bytes)
} else {
- PutResult(bytes.limit(), null)
+ null
}
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
+ blockId, Utils.bytesToString(size), finishTime - startTime))
+ PutResult(size, data)
} else {
- logError(s"error in putBytes $blockId")
- PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
} catch {
case NonFatal(t) =>
- logError(s"error in putBytes $blockId", t)
- PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ logError(s"Error in putBytes($blockId)", t)
+ PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
}
@@ -107,13 +137,19 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
} catch {
case NonFatal(t) =>
- logError(s"error in removing $blockId", t)
+ logError(s"Error in removeBlock($blockId)", t)
true
}
}
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+ try {
+ externalBlockManager.flatMap(_.getValues(blockId))
+ } catch {
+ case NonFatal(t) =>
+ logError(s"Error in getValues($blockId)", t)
+ None
+ }
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -121,7 +157,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.flatMap(_.getBytes(blockId))
} catch {
case NonFatal(t) =>
- logError(s"error in getBytes from $blockId", t)
+ logError(s"Error in getBytes($blockId)", t)
None
}
}
@@ -130,13 +166,13 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
try {
val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
if (!ret) {
- logInfo(s"remove block $blockId")
+ logInfo(s"Remove block $blockId")
blockManager.removeBlock(blockId, true)
}
ret
} catch {
case NonFatal(t) =>
- logError(s"error in getBytes from $blockId", t)
+ logError(s"Error in getBytes($blockId)", t)
false
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index bdc6276e41..fb4ba0eac9 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.{Date, Random}
+import scala.util.control.NonFatal
+
import com.google.common.io.ByteStreams
+
import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
import tachyon.TachyonURI
@@ -38,7 +41,6 @@ import org.apache.spark.util.Utils
*/
private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
- var blockManager: BlockManager =_
var rootDirs: String = _
var master: String = _
var client: tachyon.client.TachyonFS = _
@@ -52,7 +54,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def init(blockManager: BlockManager, executorId: String): Unit = {
- this.blockManager = blockManager
+ super.init(blockManager, executorId)
val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
@@ -95,8 +97,29 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
- os.write(bytes.array())
- os.close()
+ try {
+ os.write(bytes.array())
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
+ os.cancel()
+ } finally {
+ os.close()
+ }
+ }
+
+ override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+ val file = getFile(blockId)
+ val os = file.getOutStream(WriteType.TRY_CACHE)
+ try {
+ blockManager.dataSerializeStream(blockId, os, values)
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to put values of block $blockId into Tachyon", e)
+ os.cancel()
+ } finally {
+ os.close()
+ }
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -105,21 +128,31 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
return None
}
val is = file.getInStream(ReadType.CACHE)
- assert (is != null)
try {
val size = file.length
val bs = new Array[Byte](size.asInstanceOf[Int])
ByteStreams.readFully(is, bs)
Some(ByteBuffer.wrap(bs))
} catch {
- case ioe: IOException =>
- logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+ case NonFatal(e) =>
+ logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
None
} finally {
is.close()
}
}
+ override def getValues(blockId: BlockId): Option[Iterator[_]] = {
+ val file = getFile(blockId)
+ if (file == null || file.getLocationHosts().size() == 0) {
+ return None
+ }
+ val is = file.getInStream(ReadType.CACHE)
+ Option(is).map { is =>
+ blockManager.dataDeserializeStream(blockId, is)
+ }
+ }
+
override def getSize(blockId: BlockId): Long = {
getFile(blockId.name).length
}
@@ -184,7 +217,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
tachyonDir = client.getFile(path)
}
} catch {
- case e: Exception =>
+ case NonFatal(e) =>
logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
}
}
@@ -206,7 +239,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
- case e: Exception =>
+ case NonFatal(e) =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}