aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-05-05 18:32:16 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-05 18:32:16 -0700
commit0092abb47a0f9fdc716d5dfc1c591ddb45de8c98 (patch)
treedcef832b0bd31bb709a1f9d848a0848b5b513d07 /core/src
parentc688e3c5e46b26cb9fdba7987139c9ea63e2458b (diff)
downloadspark-0092abb47a0f9fdc716d5dfc1c591ddb45de8c98.tar.gz
spark-0092abb47a0f9fdc716d5dfc1c591ddb45de8c98.tar.bz2
spark-0092abb47a0f9fdc716d5dfc1c591ddb45de8c98.zip
Some minor cleanup after SPARK-4550.
JoshRosen this PR addresses the comments you left on #4450 after it got merged. Author: Sandy Ryza <sandy@cloudera.com> Closes #5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits: dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala4
2 files changed, 4 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 499dd97c06..8bc4e205bc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -59,7 +59,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) extends Ou
def write(key: Any, value: Any)
/**
- * Notify the writer that a record worth of bytes has been written with writeBytes.
+ * Notify the writer that a record worth of bytes has been written with OutputStream#write.
*/
def recordWritten()
@@ -215,12 +215,7 @@ private[spark] class DiskBlockObjectWriter(
objOut.writeKey(key)
objOut.writeValue(value)
- numRecordsWritten += 1
- writeMetrics.incShuffleRecordsWritten(1)
-
- if (numRecordsWritten % 32 == 0) {
- updateBytesWritten()
- }
+ recordWritten()
}
override def write(b: Int): Unit = throw new UnsupportedOperationException()
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
index b5ca0c62a0..ac9ea63936 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
@@ -71,10 +71,10 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
if (keyStart < 0) {
throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes")
}
- kvSerializationStream.writeObject[Any](key)
+ kvSerializationStream.writeKey[Any](key)
kvSerializationStream.flush()
val valueStart = kvBuffer.size
- kvSerializationStream.writeObject[Any](value)
+ kvSerializationStream.writeValue[Any](value)
kvSerializationStream.flush()
val valueEnd = kvBuffer.size