diff options
author | Raymond Liu <raymond.liu@intel.com> | 2014-08-12 23:19:35 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-08-12 23:19:35 -0700 |
commit | 246cb3f158686348a698d1c0da3001c314727129 (patch) | |
tree | 7b131b8272c3749f3e8e2d785ef7016529b876b8 | |
parent | 676f98289dad61c091bb45bd35a2b9613b22d64a (diff) | |
download | spark-246cb3f158686348a698d1c0da3001c314727129.tar.gz spark-246cb3f158686348a698d1c0da3001c314727129.tar.bz2 spark-246cb3f158686348a698d1c0da3001c314727129.zip |
Use transferTo when copy merge files in ExternalSorter
Since this is a file to file copy, using transferTo should be faster.
Author: Raymond Liu <raymond.liu@intel.com>
Closes #1884 from colorant/externalSorter and squashes the following commits:
6e42f3c [Raymond Liu] More code into copyStream
bfb496b [Raymond Liu] Use transferTo when copy merge files in ExternalSorter
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 29 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala | 7 |
2 files changed, 25 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c60be4f8a1..8cac5da644 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -284,17 +284,32 @@ private[spark] object Utils extends Logging { /** Copy all data from an InputStream to an OutputStream */ def copyStream(in: InputStream, out: OutputStream, - closeStreams: Boolean = false) + closeStreams: Boolean = false): Long = { + var count = 0L try { - val buf = new Array[Byte](8192) - var n = 0 - while (n != -1) { - n = in.read(buf) - if (n != -1) { - out.write(buf, 0, n) + if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) { + // When both streams are File stream, use transferTo to improve copy performance. + val inChannel = in.asInstanceOf[FileInputStream].getChannel() + val outChannel = out.asInstanceOf[FileOutputStream].getChannel() + val size = inChannel.size() + + // In case transferTo method transferred less data than we have required. + while (count < size) { + count += inChannel.transferTo(count, size - count, outChannel) + } + } else { + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + out.write(buf, 0, n) + count += n + } } } + count } finally { if (closeStreams) { try { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index b73d5e0cf1..5d8a648d95 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -745,12 +745,11 @@ private[spark] class ExternalSorter[K, V, C]( try { out = new FileOutputStream(outputFile) for (i <- 0 until numPartitions) { - val file = partitionWriters(i).fileSegment().file - in = new FileInputStream(file) - org.apache.spark.util.Utils.copyStream(in, out) + in = new FileInputStream(partitionWriters(i).fileSegment().file) + val size = org.apache.spark.util.Utils.copyStream(in, out, false) in.close() in = null - lengths(i) = file.length() + lengths(i) = size offsets(i + 1) = offsets(i) + lengths(i) } } finally { |