/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.shuffle.sort; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.LinkedList; import scala.Tuple2; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.DummySerializerInstance; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.storage.BlockManager; import org.apache.spark.storage.DiskBlockObjectWriter; import org.apache.spark.storage.TempShuffleBlockId; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.Utils; /** * An external sorter that is specialized for sort-based shuffle. *

* Incoming records are appended to data pages. When all records have been inserted (or when the * current thread's shuffle memory limit is reached), the in-memory records are sorted according to * their partition ids (using a {@link ShuffleInMemorySorter}). The sorted records are then * written to a single output file (or multiple files, if we've spilled). The format of the output * files is the same as the format of the final output file written by * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are * written as a single serialized, compressed stream that can be read with a new decompression and * deserialization stream. *

* Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a * specialized merge procedure that avoids extra serialization/deserialization. */ final class ShuffleExternalSorter extends MemoryConsumer { private final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class); @VisibleForTesting static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024; private final int numPartitions; private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; private final TaskContext taskContext; private final ShuffleWriteMetrics writeMetrics; /** Force this sorter to spill when there are this many elements in memory. For testing only */ private final long numElementsForSpillThreshold; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ private final int fileBufferSizeBytes; /** * Memory pages that hold the records being sorted. The pages in this list are freed when * spilling, although in principle we could recycle these pages across spills (on the other hand, * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager * itself). */ private final LinkedList allocatedPages = new LinkedList<>(); private final LinkedList spills = new LinkedList<>(); /** Peak memory used by this sorter so far, in bytes. **/ private long peakMemoryUsedBytes; // These variables are reset after spilling: @Nullable private ShuffleInMemorySorter inMemSorter; @Nullable private MemoryBlock currentPage = null; private long pageCursor = -1; ShuffleExternalSorter( TaskMemoryManager memoryManager, BlockManager blockManager, TaskContext taskContext, int initialSize, int numPartitions, SparkConf conf, ShuffleWriteMetrics writeMetrics) { super(memoryManager, (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes())); this.taskMemoryManager = memoryManager; this.blockManager = blockManager; this.taskContext = taskContext; this.numPartitions = numPartitions; // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.numElementsForSpillThreshold = conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); this.peakMemoryUsedBytes = getMemoryUsage(); } /** * Sorts the in-memory records and writes the sorted records to an on-disk file. * This method does not free the sort data structures. * * @param isLastFile if true, this indicates that we're writing the final output file and that the * bytes written should be counted towards shuffle spill metrics rather than * shuffle write metrics. */ private void writeSortedFile(boolean isLastFile) throws IOException { final ShuffleWriteMetrics writeMetricsToUse; if (isLastFile) { // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. writeMetricsToUse = writeMetrics; } else { // We're spilling, so bytes written should be counted towards spill rather than write. // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count // them towards shuffle bytes written. writeMetricsToUse = new ShuffleWriteMetrics(); } // This call performs the actual sort. final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords = inMemSorter.getSortedIterator(); // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this // after SPARK-5581 is fixed. DiskBlockObjectWriter writer; // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to // be an API to directly transfer bytes from managed memory to the disk writer, we buffer // data through a byte array. This array does not need to be large enough to hold a single // record; final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE]; // Because this output will be read during shuffle, its compression codec must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more details. final Tuple2 spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = spilledFileInfo._2(); final TempShuffleBlockId blockId = spilledFileInfo._1(); final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter. // Our write path doesn't actually use this serializer (since we end up calling the `write()` // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work // around this, we pass a dummy no-op serializer. final SerializerInstance ser = DummySerializerInstance.INSTANCE; writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); int currentPartition = -1; while (sortedRecords.hasNext()) { sortedRecords.loadNext(); final int partition = sortedRecords.packedRecordPointer.getPartitionId(); assert (partition >= currentPartition); if (partition != currentPartition) { // Switch to the new partition if (currentPartition != -1) { writer.commitAndClose(); spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length(); } currentPartition = partition; writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); } final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); final Object recordPage = taskMemoryManager.getPage(recordPointer); final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage); long recordReadPosition = recordOffsetInPage + 4; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining); Platform.copyMemory( recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); writer.write(writeBuffer, 0, toTransfer); recordReadPosition += toTransfer; dataRemaining -= toTransfer; } writer.recordWritten(); } if (writer != null) { writer.commitAndClose(); // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, // then the file might be empty. Note that it might be better to avoid calling // writeSortedFile() in that case. if (currentPartition != -1) { spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length(); spills.add(spillInfo); } } if (!isLastFile) { // i.e. this is a spill file // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter // relies on its `recordWritten()` method being called in order to trigger periodic updates to // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that // counter at a higher-level, then the in-progress metrics for records written and bytes // written would get out of sync. // // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter; // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those // metrics to the true write metrics here. The reason for performing this copying is so that // we can avoid reporting spilled bytes as shuffle write bytes. // // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`. // Consistent with ExternalSorter, we do not count this IO towards shuffle write time. // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this. writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten()); taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten()); } } /** * Sort and spill the current records in response to memory pressure. */ @Override public long spill(long size, MemoryConsumer trigger) throws IOException { if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) { return 0L; } logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), Utils.bytesToString(getMemoryUsage()), spills.size(), spills.size() > 1 ? " times" : " time"); writeSortedFile(false); final long spillSize = freeMemory(); inMemSorter.reset(); // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the // records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, // we might not be able to get memory for the pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); return spillSize; } private long getMemoryUsage() { long totalPageSize = 0; for (MemoryBlock page : allocatedPages) { totalPageSize += page.size(); } return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize; } private void updatePeakMemoryUsed() { long mem = getMemoryUsage(); if (mem > peakMemoryUsedBytes) { peakMemoryUsedBytes = mem; } } /** * Return the peak memory used so far, in bytes. */ long getPeakMemoryUsedBytes() { updatePeakMemoryUsed(); return peakMemoryUsedBytes; } private long freeMemory() { updatePeakMemoryUsed(); long memoryFreed = 0; for (MemoryBlock block : allocatedPages) { memoryFreed += block.size(); freePage(block); } allocatedPages.clear(); currentPage = null; pageCursor = 0; return memoryFreed; } /** * Force all memory and spill files to be deleted; called by shuffle error-handling code. */ public void cleanupResources() { freeMemory(); if (inMemSorter != null) { inMemSorter.free(); inMemSorter = null; } for (SpillInfo spill : spills) { if (spill.file.exists() && !spill.file.delete()) { logger.error("Unable to delete spill file {}", spill.file.getPath()); } } } /** * Checks whether there is enough space to insert an additional record in to the sort pointer * array and grows the array if additional space is required. If the required space cannot be * obtained, then the in-memory data will be spilled to disk. */ private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); if (!inMemSorter.hasSpaceForAnotherRecord()) { long used = inMemSorter.getMemoryUsage(); LongArray array; try { // could trigger spilling array = allocateArray(used / 8 * 2); } catch (OutOfMemoryError e) { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); throw e; } return; } // check if spilling is triggered or not if (inMemSorter.hasSpaceForAnotherRecord()) { freeArray(array); } else { inMemSorter.expandPointerArray(array); } } } /** * Allocates more memory in order to insert an additional record. This will request additional * memory from the memory manager and spill if the requested memory can not be obtained. * * @param required the required space in the data page, in bytes, including space for storing * the record size. This must be less than or equal to the page size (records * that exceed the page size are handled via a different code path which uses * special overflow pages). */ private void acquireNewPageIfNecessary(int required) { if (currentPage == null || pageCursor + required > currentPage.getBaseOffset() + currentPage.size() ) { // TODO: try to find space in previous pages currentPage = allocatePage(required); pageCursor = currentPage.getBaseOffset(); allocatedPages.add(currentPage); } } /** * Write a record to the shuffle sorter. */ public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException { // for tests assert(inMemSorter != null); if (inMemSorter.numRecords() > numElementsForSpillThreshold) { spill(); } growPointerArrayIfNecessary(); // Need 4 bytes to store the record length. final int required = length + 4; acquireNewPageIfNecessary(required); assert(currentPage != null); final Object base = currentPage.getBaseObject(); final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); Platform.putInt(base, pageCursor, length); pageCursor += 4; Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); pageCursor += length; inMemSorter.insertRecord(recordAddress, partitionId); } /** * Close the sorter, causing any buffered data to be sorted and written out to disk. * * @return metadata for the spill files written by this sorter. If no records were ever inserted * into this sorter, then this will return an empty array. * @throws IOException */ public SpillInfo[] closeAndGetSpills() throws IOException { try { if (inMemSorter != null) { // Do not count the final file towards the spill count. writeSortedFile(true); freeMemory(); inMemSorter.free(); inMemSorter = null; } return spills.toArray(new SpillInfo[spills.size()]); } catch (IOException e) { cleanupResources(); throw e; } } }