diff options
author | jerryshao <saisai.shao@intel.com> | 2014-10-20 10:20:21 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-10-20 10:20:21 -0700 |
commit | c7aeecd08fd329085760fa89025ec0d9c04f5e3f (patch) | |
tree | 82d9bc132017bf3577e477488e5e39ff7b74e048 /core | |
parent | d1966f3a8bafdcef87d10ef9db5976cf89faee4b (diff) | |
download | spark-c7aeecd08fd329085760fa89025ec0d9c04f5e3f.tar.gz spark-c7aeecd08fd329085760fa89025ec0d9c04f5e3f.tar.bz2 spark-c7aeecd08fd329085760fa89025ec0d9c04f5e3f.zip |
[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle
Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in [SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948).
Author: jerryshao <saisai.shao@intel.com>
Closes #2824 from jerryshao/SPARK-3948 and squashes the following commits:
be0533a [jerryshao] Address the comments
a82b184 [jerryshao] add configuration to control the NIO way of copying stream
e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 29 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala | 5 |
2 files changed, 28 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 53a7512edd..0aeff6455b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -269,23 +269,44 @@ private[spark] object Utils extends Logging { dir } - /** Copy all data from an InputStream to an OutputStream */ + /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream + * copying is disabled by default unless explicitly set transferToEnabled as true, + * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. + */ def copyStream(in: InputStream, out: OutputStream, - closeStreams: Boolean = false): Long = + closeStreams: Boolean = false, + transferToEnabled: Boolean = false): Long = { var count = 0L try { - if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) { + if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] + && transferToEnabled) { // When both streams are File stream, use transferTo to improve copy performance. val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() + val initialPos = outChannel.position() val size = inChannel.size() // In case transferTo method transferred less data than we have required. while (count < size) { count += inChannel.transferTo(count, size - count, outChannel) } + + // Check the position after transferTo loop to see if it is in the right position and + // give user information if not. + // Position will not be increased to the expected length after calling transferTo in + // kernel version 2.6.32, this issue can be seen in + // https://bugs.openjdk.java.net/browse/JDK-7052359 + // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). + val finalPos = outChannel.position() + assert(finalPos == initialPos + size, + s""" + |Current position $finalPos do not equal to expected position ${initialPos + size} + |after transferTo, please check your kernel version to see if it is 2.6.32, + |this is a kernel bug which will lead to unexpected behavior when using transferTo. + |You can set spark.file.transferTo = false to disable this NIO feature. + """.stripMargin) } else { val buf = new Array[Byte](8192) var n = 0 @@ -727,7 +748,7 @@ private[spark] object Utils extends Logging { /** * Determines if a directory contains any files newer than cutoff seconds. - * + * * @param dir must be the path to a directory, or IllegalArgumentException is thrown * @param cutoff measured in seconds. Returns true if there are any files or directories in the * given directory whose last modified time is later than this many seconds ago diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 644fa36818..d1b06d14ac 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C]( private val conf = SparkEnv.get.conf private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 + private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true) // Size of object batches when reading/writing from serializers. // @@ -705,10 +706,10 @@ private[spark] class ExternalSorter[K, V, C]( var out: FileOutputStream = null var in: FileInputStream = null try { - out = new FileOutputStream(outputFile) + out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) - val size = org.apache.spark.util.Utils.copyStream(in, out, false) + val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) in.close() in = null lengths(i) = size |