diff options
author | Josh Rosen <joshrosen@databricks.com> | 2014-10-28 00:04:16 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-10-28 00:04:16 -0700 |
commit | 46c63417c1bb1aea07baf9036cc5b8f1c3781bbe (patch) | |
tree | 8e30ee3760e4b9468b9a7d2b32e4d252aa953a67 /core | |
parent | 4ceb048b38949dd0a909d2ee6777607341c9c93a (diff) | |
download | spark-46c63417c1bb1aea07baf9036cc5b8f1c3781bbe.tar.gz spark-46c63417c1bb1aea07baf9036cc5b8f1c3781bbe.tar.bz2 spark-46c63417c1bb1aea07baf9036cc5b8f1c3781bbe.zip |
[SPARK-4107] Fix incorrect handling of read() and skip() return values
`read()` may return fewer bytes than requested; when this occurred, the old code would silently return less data than requested, which might cause stream corruption errors. `skip()` faces similar issues, too.
This patch fixes several cases where we mis-handle these methods' return values.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #2969 from JoshRosen/file-channel-read-fix and squashes the following commits:
e724a9f [Josh Rosen] Fix similar issue of not checking skip() return value.
cbc03ce [Josh Rosen] Update the other log message, too.
01e6015 [Josh Rosen] file.getName -> file.getAbsolutePath
d961d95 [Josh Rosen] Fix another issue in FileServerSuite.
b9265d2 [Josh Rosen] Fix a similar (minor) issue in TestUtils.
cd9d76f [Josh Rosen] Fix a similar error in Tachyon:
3db0008 [Josh Rosen] Fix a similar read() error in Utils.offsetBytes().
db985ed [Josh Rosen] Fix unsafe usage of FileChannel.read():
Diffstat (limited to 'core')
7 files changed, 33 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index e72826dc25..34078142f5 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} -import com.google.common.io.Files import org.apache.spark.util.Utils @@ -64,12 +64,7 @@ private[spark] object TestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jarStream.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jarStream) in.close() } jarStream.close() diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index 4c9ca97a2a..4211ba4e43 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -81,7 +81,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead. if (length < MIN_MEMORY_MAP_BYTES) { val buf = ByteBuffer.allocate(length.toInt) - channel.read(buf, offset) + channel.position(offset) + while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { + throw new IOException("Reached EOF before filling buffer\n" + + s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") + } + } buf.flip() buf } else { @@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt var is: FileInputStream = null try { is = new FileInputStream(file) - is.skip(offset) + ByteStreams.skipFully(is, offset) ByteStreams.limit(is, length) } catch { case e: IOException => diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala index 4ab34336d3..b5cd34cacd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -20,6 +20,8 @@ package org.apache.spark.shuffle import java.io._ import java.nio.ByteBuffer +import com.google.common.io.ByteStreams + import org.apache.spark.SparkEnv import org.apache.spark.network.{ManagedBuffer, FileSegmentManagedBuffer} import org.apache.spark.storage._ @@ -101,7 +103,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager { val in = new DataInputStream(new FileInputStream(indexFile)) try { - in.skip(blockId.reduceId * 8) + ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() new FileSegmentManagedBuffer( diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index bac459e835..8dadf67940 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, FileOutputStream, RandomAccessFile} +import java.io.{IOException, File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode @@ -110,7 +110,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc // For small files, directly read rather than memory map if (length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(length.toInt) - channel.read(buf, offset) + channel.position(offset) + while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { + throw new IOException("Reached EOF before filling buffer\n" + + s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") + } + } buf.flip() Some(buf) } else { diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 932b561604..6dbad5ff05 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.IOException import java.nio.ByteBuffer +import com.google.common.io.ByteStreams import tachyon.client.{ReadType, WriteType} import org.apache.spark.Logging @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) - var buffer: ByteBuffer = null + assert (is != null) try { - if (is != null) { - val size = file.length - val bs = new Array[Byte](size.asInstanceOf[Int]) - val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) - buffer = ByteBuffer.wrap(bs) - if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + - s"is not equal to fetched size $fetchSize") - return None - } - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { case ioe: IOException => logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) - return None + None } - Some(buffer) } override def contains(blockId: BlockId): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 93ac9f1c33..4660030155 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -33,7 +33,7 @@ import scala.reflect.ClassTag import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} -import com.google.common.io.Files +import com.google.common.io.{ByteStreams, Files} import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration @@ -1062,8 +1062,8 @@ private[spark] object Utils extends Logging { val stream = new FileInputStream(file) try { - stream.skip(effectiveStart) - stream.read(buff) + ByteStreams.skipFully(stream, effectiveStart) + ByteStreams.readFully(stream, buff) } finally { stream.close() } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index a8867020e4..379c2a6ea4 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io._ import java.util.jar.{JarEntry, JarOutputStream} +import com.google.common.io.ByteStreams import org.scalatest.FunSuite import org.apache.spark.SparkContext._ @@ -58,12 +59,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { jar.putNextEntry(jarEntry) val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jar) in.close() jar.close() |