aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2014-08-12 23:19:35 -0700
committerReynold Xin <rxin@apache.org>2014-08-12 23:19:35 -0700
commit246cb3f158686348a698d1c0da3001c314727129 (patch)
tree7b131b8272c3749f3e8e2d785ef7016529b876b8 /core
parent676f98289dad61c091bb45bd35a2b9613b22d64a (diff)
downloadspark-246cb3f158686348a698d1c0da3001c314727129.tar.gz
spark-246cb3f158686348a698d1c0da3001c314727129.tar.bz2
spark-246cb3f158686348a698d1c0da3001c314727129.zip
Use transferTo when copy merge files in ExternalSorter
Since this is a file to file copy, using transferTo should be faster. Author: Raymond Liu <raymond.liu@intel.com> Closes #1884 from colorant/externalSorter and squashes the following commits: 6e42f3c [Raymond Liu] More code into copyStream bfb496b [Raymond Liu] Use transferTo when copy merge files in ExternalSorter
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala7
2 files changed, 25 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c60be4f8a1..8cac5da644 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -284,17 +284,32 @@ private[spark] object Utils extends Logging {
/** Copy all data from an InputStream to an OutputStream */
def copyStream(in: InputStream,
out: OutputStream,
- closeStreams: Boolean = false)
+ closeStreams: Boolean = false): Long =
{
+ var count = 0L
try {
- val buf = new Array[Byte](8192)
- var n = 0
- while (n != -1) {
- n = in.read(buf)
- if (n != -1) {
- out.write(buf, 0, n)
+ if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
+ // When both streams are File stream, use transferTo to improve copy performance.
+ val inChannel = in.asInstanceOf[FileInputStream].getChannel()
+ val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
+ val size = inChannel.size()
+
+ // In case transferTo method transferred less data than we have required.
+ while (count < size) {
+ count += inChannel.transferTo(count, size - count, outChannel)
+ }
+ } else {
+ val buf = new Array[Byte](8192)
+ var n = 0
+ while (n != -1) {
+ n = in.read(buf)
+ if (n != -1) {
+ out.write(buf, 0, n)
+ count += n
+ }
}
}
+ count
} finally {
if (closeStreams) {
try {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b73d5e0cf1..5d8a648d95 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -745,12 +745,11 @@ private[spark] class ExternalSorter[K, V, C](
try {
out = new FileOutputStream(outputFile)
for (i <- 0 until numPartitions) {
- val file = partitionWriters(i).fileSegment().file
- in = new FileInputStream(file)
- org.apache.spark.util.Utils.copyStream(in, out)
+ in = new FileInputStream(partitionWriters(i).fileSegment().file)
+ val size = org.apache.spark.util.Utils.copyStream(in, out, false)
in.close()
in = null
- lengths(i) = file.length()
+ lengths(i) = size
offsets(i + 1) = offsets(i) + lengths(i)
}
} finally {