diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 29 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala | 5 |
2 files changed, 28 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 53a7512edd..0aeff6455b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -269,23 +269,44 @@ private[spark] object Utils extends Logging { dir } - /** Copy all data from an InputStream to an OutputStream */ + /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream + * copying is disabled by default unless explicitly set transferToEnabled as true, + * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. + */ def copyStream(in: InputStream, out: OutputStream, - closeStreams: Boolean = false): Long = + closeStreams: Boolean = false, + transferToEnabled: Boolean = false): Long = { var count = 0L try { - if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) { + if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] + && transferToEnabled) { // When both streams are File stream, use transferTo to improve copy performance. val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() + val initialPos = outChannel.position() val size = inChannel.size() // In case transferTo method transferred less data than we have required. while (count < size) { count += inChannel.transferTo(count, size - count, outChannel) } + + // Check the position after transferTo loop to see if it is in the right position and + // give user information if not. + // Position will not be increased to the expected length after calling transferTo in + // kernel version 2.6.32, this issue can be seen in + // https://bugs.openjdk.java.net/browse/JDK-7052359 + // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). + val finalPos = outChannel.position() + assert(finalPos == initialPos + size, + s""" + |Current position $finalPos do not equal to expected position ${initialPos + size} + |after transferTo, please check your kernel version to see if it is 2.6.32, + |this is a kernel bug which will lead to unexpected behavior when using transferTo. + |You can set spark.file.transferTo = false to disable this NIO feature. + """.stripMargin) } else { val buf = new Array[Byte](8192) var n = 0 @@ -727,7 +748,7 @@ private[spark] object Utils extends Logging { /** * Determines if a directory contains any files newer than cutoff seconds. - * + * * @param dir must be the path to a directory, or IllegalArgumentException is thrown * @param cutoff measured in seconds. Returns true if there are any files or directories in the * given directory whose last modified time is later than this many seconds ago diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 644fa36818..d1b06d14ac 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C]( private val conf = SparkEnv.get.conf private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 + private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true) // Size of object batches when reading/writing from serializers. // @@ -705,10 +706,10 @@ private[spark] class ExternalSorter[K, V, C]( var out: FileOutputStream = null var in: FileInputStream = null try { - out = new FileOutputStream(outputFile) + out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) - val size = org.apache.spark.util.Utils.copyStream(in, out, false) + val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) in.close() in = null lengths(i) = size |