aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala14
1 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 035f3767ff..79a1a8a0da 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -272,7 +272,8 @@ private[spark] class ExternalSorter[K, V, C](
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
curWriteMetrics = new ShuffleWriteMetrics()
- var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
+ var writer = blockManager.getDiskWriter(
+ blockId, file, serInstance, fileBufferSize, curWriteMetrics)
var objectsWritten = 0 // Objects written since the last flush
// List of batch sizes (bytes) in the order they are written to disk
@@ -308,7 +309,8 @@ private[spark] class ExternalSorter[K, V, C](
if (objectsWritten == serializerBatchSize) {
flush()
curWriteMetrics = new ShuffleWriteMetrics()
- writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
+ writer = blockManager.getDiskWriter(
+ blockId, file, serInstance, fileBufferSize, curWriteMetrics)
}
}
if (objectsWritten > 0) {
@@ -358,7 +360,9 @@ private[spark] class ExternalSorter[K, V, C](
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
- blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
+ val writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize,
+ curWriteMetrics)
+ writer.open()
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
@@ -749,8 +753,8 @@ private[spark] class ExternalSorter[K, V, C](
// partition and just write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
- val writer = blockManager.getDiskWriter(
- blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
+ val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
+ context.taskMetrics.shuffleWriteMetrics.get)
for (elem <- elements) {
writer.write(elem)
}