aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala6
8 files changed, 34 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8300f9f219..8af46f3327 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{SignalLogger, Utils}
private[spark] class CoarseGrainedExecutorBackend(
@@ -47,6 +48,10 @@ private[spark] class CoarseGrainedExecutorBackend(
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None
+ // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
+ // to be changed so that we don't share the serializer instance across threads
+ private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
+
override def onStart() {
import scala.concurrent.ExecutionContext.Implicits.global
logInfo("Connecting to driver: " + driverUrl)
@@ -83,7 +88,6 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
- val ser = env.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 5be3ed771e..538e150ead 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -113,11 +113,12 @@ class FileShuffleBlockManager(conf: SparkConf)
private var fileGroup: ShuffleFileGroup = null
val openStartTime = System.nanoTime
+ val serializerInstance = serializer.newInstance()
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
+ blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
writeMetrics)
}
} else {
@@ -133,7 +134,8 @@ class FileShuffleBlockManager(conf: SparkConf)
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
- blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
+ blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
+ writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1aa0ef18de..145a9c1ae3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -37,7 +37,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{SerializerInstance, Serializer}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.util._
@@ -646,13 +646,13 @@ private[spark] class BlockManager(
def getDiskWriter(
blockId: BlockId,
file: File,
- serializer: Serializer,
+ serializerInstance: SerializerInstance,
bufferSize: Int,
writeMetrics: ShuffleWriteMetrics): BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
- new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites,
- writeMetrics)
+ new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream,
+ syncWrites, writeMetrics)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 0dfc91dfaf..14833791f7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -21,7 +21,7 @@ import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel
import org.apache.spark.Logging
-import org.apache.spark.serializer.{SerializationStream, Serializer}
+import org.apache.spark.serializer.{SerializerInstance, SerializationStream}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.util.Utils
@@ -71,7 +71,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
- serializer: Serializer,
+ serializerInstance: SerializerInstance,
bufferSize: Int,
compressStream: OutputStream => OutputStream,
syncWrites: Boolean,
@@ -134,7 +134,7 @@ private[spark] class DiskBlockObjectWriter(
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
- objOut = serializer.newInstance().serializeStream(bs)
+ objOut = serializerInstance.serializeStream(bs)
initialized = true
this
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 8f28ef49a8..f3379521d5 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{SerializerInstance, Serializer}
import org.apache.spark.util.{CompletionIterator, Utils}
/**
@@ -106,6 +106,8 @@ final class ShuffleBlockFetcherIterator(
private[this] val shuffleMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
+ private[this] val serializerInstance: SerializerInstance = serializer.newInstance()
+
/**
* Whether the iterator is still active. If isZombie is true, the callback interface will no
* longer place fetched blocks into [[results]].
@@ -299,7 +301,7 @@ final class ShuffleBlockFetcherIterator(
// the scheduler gets a FetchFailedException.
Try(buf.createInputStream()).map { is0 =>
val is = blockManager.wrapForCompression(blockId, is0)
- val iter = serializer.newInstance().deserializeStream(is).asIterator
+ val iter = serializerInstance.deserializeStream(is).asIterator
CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
// so we don't release it again in cleanup.
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 9ff4744593..30dd7f22e4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -151,8 +151,7 @@ class ExternalAppendOnlyMap[K, V, C](
override protected[this] def spill(collection: SizeTracker): Unit = {
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
- var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
- curWriteMetrics)
+ var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
var objectsWritten = 0
// List of batch sizes (bytes) in the order they are written to disk
@@ -179,8 +178,7 @@ class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten == serializerBatchSize) {
flush()
curWriteMetrics = new ShuffleWriteMetrics()
- writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
- curWriteMetrics)
+ writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
}
}
if (objectsWritten > 0) {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 035f3767ff..79a1a8a0da 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -272,7 +272,8 @@ private[spark] class ExternalSorter[K, V, C](
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
curWriteMetrics = new ShuffleWriteMetrics()
- var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
+ var writer = blockManager.getDiskWriter(
+ blockId, file, serInstance, fileBufferSize, curWriteMetrics)
var objectsWritten = 0 // Objects written since the last flush
// List of batch sizes (bytes) in the order they are written to disk
@@ -308,7 +309,8 @@ private[spark] class ExternalSorter[K, V, C](
if (objectsWritten == serializerBatchSize) {
flush()
curWriteMetrics = new ShuffleWriteMetrics()
- writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
+ writer = blockManager.getDiskWriter(
+ blockId, file, serInstance, fileBufferSize, curWriteMetrics)
}
}
if (objectsWritten > 0) {
@@ -358,7 +360,9 @@ private[spark] class ExternalSorter[K, V, C](
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
- blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
+ val writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize,
+ curWriteMetrics)
+ writer.open()
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
@@ -749,8 +753,8 @@ private[spark] class ExternalSorter[K, V, C](
// partition and just write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
- val writer = blockManager.getDiskWriter(
- blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
+ val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
+ context.taskMetrics.shuffleWriteMetrics.get)
for (elem <- elements) {
writer.write(elem)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
index 78bbc4ec2c..003a728cb8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
@@ -30,7 +30,7 @@ class BlockObjectWriterSuite extends FunSuite {
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+ new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
// Record metrics update on every write
@@ -52,7 +52,7 @@ class BlockObjectWriterSuite extends FunSuite {
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+ new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
// Record metrics update on every write
@@ -75,7 +75,7 @@ class BlockObjectWriterSuite extends FunSuite {
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+ new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.open()
writer.close()