aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-12-16 20:01:47 -0800
committerAndrew Or <andrew@databricks.com>2015-12-16 20:01:47 -0800
commit97678edeaaafc19ea18d044233a952d2e2e89fbc (patch)
tree7f5c3a646fbda5ddb4057074c93900b9b24b332f /core
parentd1508dd9b765489913bc948575a69ebab82f217b (diff)
downloadspark-97678edeaaafc19ea18d044233a952d2e2e89fbc.tar.gz
spark-97678edeaaafc19ea18d044233a952d2e2e89fbc.tar.bz2
spark-97678edeaaafc19ea18d044233a952d2e2e89fbc.zip
[SPARK-12390] Clean up unused serializer parameter in BlockManager
No change in functionality is intended. This only changes internal API. Author: Andrew Or <andrew@databricks.com> Closes #10343 from andrewor14/clean-bm-serializer.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala10
2 files changed, 11 insertions, 28 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 540e1ec003..6074fc58d7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
def dataSerializeStream(
blockId: BlockId,
outputStream: OutputStream,
- values: Iterator[Any],
- serializer: Serializer = defaultSerializer): Unit = {
+ values: Iterator[Any]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
- val ser = serializer.newInstance()
+ val ser = defaultSerializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
/** Serializes into a byte buffer. */
- def dataSerialize(
- blockId: BlockId,
- values: Iterator[Any],
- serializer: Serializer = defaultSerializer): ByteBuffer = {
+ def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
val byteStream = new ByteBufferOutputStream(4096)
- dataSerializeStream(blockId, byteStream, values, serializer)
+ dataSerializeStream(blockId, byteStream, values)
byteStream.toByteBuffer
}
@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserialize(
- blockId: BlockId,
- bytes: ByteBuffer,
- serializer: Serializer = defaultSerializer): Iterator[Any] = {
+ def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
bytes.rewind()
- dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+ dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
}
/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserializeStream(
- blockId: BlockId,
- inputStream: InputStream,
- serializer: Serializer = defaultSerializer): Iterator[Any] = {
+ def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
val stream = new BufferedInputStream(inputStream)
- serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
+ defaultSerializer
+ .newInstance()
+ .deserializeStream(wrapForCompression(blockId, stream))
+ .asIterator
}
def stop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c008b9dc16..6c4477184d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}
- /**
- * A version of getValues that allows a custom serializer. This is used as part of the
- * shuffle short-circuit code.
- */
- def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
- // TODO: Should bypass getBytes and use a stream based implementation, so that
- // we won't use a lot of memory during e.g. external sort merge.
- getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
- }
-
override def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {