aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-10-20 10:20:21 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-20 10:20:21 -0700
commitc7aeecd08fd329085760fa89025ec0d9c04f5e3f (patch)
tree82d9bc132017bf3577e477488e5e39ff7b74e048 /core/src/main/scala
parentd1966f3a8bafdcef87d10ef9db5976cf89faee4b (diff)
downloadspark-c7aeecd08fd329085760fa89025ec0d9c04f5e3f.tar.gz
spark-c7aeecd08fd329085760fa89025ec0d9c04f5e3f.tar.bz2
spark-c7aeecd08fd329085760fa89025ec0d9c04f5e3f.zip
[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle
Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in [SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948). Author: jerryshao <saisai.shao@intel.com> Closes #2824 from jerryshao/SPARK-3948 and squashes the following commits: be0533a [jerryshao] Address the comments a82b184 [jerryshao] add configuration to control the NIO way of copying stream e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala5
2 files changed, 28 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 53a7512edd..0aeff6455b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -269,23 +269,44 @@ private[spark] object Utils extends Logging {
dir
}
- /** Copy all data from an InputStream to an OutputStream */
+ /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
+ * copying is disabled by default unless explicitly set transferToEnabled as true,
+ * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
+ */
def copyStream(in: InputStream,
out: OutputStream,
- closeStreams: Boolean = false): Long =
+ closeStreams: Boolean = false,
+ transferToEnabled: Boolean = false): Long =
{
var count = 0L
try {
- if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
+ if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
+ && transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
+ val initialPos = outChannel.position()
val size = inChannel.size()
// In case transferTo method transferred less data than we have required.
while (count < size) {
count += inChannel.transferTo(count, size - count, outChannel)
}
+
+ // Check the position after transferTo loop to see if it is in the right position and
+ // give user information if not.
+ // Position will not be increased to the expected length after calling transferTo in
+ // kernel version 2.6.32, this issue can be seen in
+ // https://bugs.openjdk.java.net/browse/JDK-7052359
+ // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
+ val finalPos = outChannel.position()
+ assert(finalPos == initialPos + size,
+ s"""
+ |Current position $finalPos do not equal to expected position ${initialPos + size}
+ |after transferTo, please check your kernel version to see if it is 2.6.32,
+ |this is a kernel bug which will lead to unexpected behavior when using transferTo.
+ |You can set spark.file.transferTo = false to disable this NIO feature.
+ """.stripMargin)
} else {
val buf = new Array[Byte](8192)
var n = 0
@@ -727,7 +748,7 @@ private[spark] object Utils extends Logging {
/**
* Determines if a directory contains any files newer than cutoff seconds.
- *
+ *
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files or directories in the
* given directory whose last modified time is later than this many seconds ago
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 644fa36818..d1b06d14ac 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+ private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
// Size of object batches when reading/writing from serializers.
//
@@ -705,10 +706,10 @@ private[spark] class ExternalSorter[K, V, C](
var out: FileOutputStream = null
var in: FileInputStream = null
try {
- out = new FileOutputStream(outputFile)
+ out = new FileOutputStream(outputFile, true)
for (i <- 0 until numPartitions) {
in = new FileInputStream(partitionWriters(i).fileSegment().file)
- val size = org.apache.spark.util.Utils.copyStream(in, out, false)
+ val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
in.close()
in = null
lengths(i) = size