From 46c63417c1bb1aea07baf9036cc5b8f1c3781bbe Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Oct 2014 00:04:16 -0700 Subject: [SPARK-4107] Fix incorrect handling of read() and skip() return values `read()` may return fewer bytes than requested; when this occurred, the old code would silently return less data than requested, which might cause stream corruption errors. `skip()` faces similar issues, too. This patch fixes several cases where we mis-handle these methods' return values. Author: Josh Rosen Closes #2969 from JoshRosen/file-channel-read-fix and squashes the following commits: e724a9f [Josh Rosen] Fix similar issue of not checking skip() return value. cbc03ce [Josh Rosen] Update the other log message, too. 01e6015 [Josh Rosen] file.getName -> file.getAbsolutePath d961d95 [Josh Rosen] Fix another issue in FileServerSuite. b9265d2 [Josh Rosen] Fix a similar (minor) issue in TestUtils. cd9d76f [Josh Rosen] Fix a similar error in Tachyon: 3db0008 [Josh Rosen] Fix a similar read() error in Utils.offsetBytes(). db985ed [Josh Rosen] Fix unsafe usage of FileChannel.read(): --- .../src/main/scala/org/apache/spark/TestUtils.scala | 9 ++------- .../org/apache/spark/network/ManagedBuffer.scala | 10 ++++++++-- .../spark/shuffle/IndexShuffleBlockManager.scala | 4 +++- .../scala/org/apache/spark/storage/DiskStore.scala | 10 ++++++++-- .../org/apache/spark/storage/TachyonStore.scala | 21 +++++++-------------- .../main/scala/org/apache/spark/util/Utils.scala | 6 +++--- .../scala/org/apache/spark/FileServerSuite.scala | 8 ++------ 7 files changed, 33 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index e72826dc25..34078142f5 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} -import com.google.common.io.Files import org.apache.spark.util.Utils @@ -64,12 +64,7 @@ private[spark] object TestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jarStream.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jarStream) in.close() } jarStream.close() diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index 4c9ca97a2a..4211ba4e43 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -81,7 +81,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead. if (length < MIN_MEMORY_MAP_BYTES) { val buf = ByteBuffer.allocate(length.toInt) - channel.read(buf, offset) + channel.position(offset) + while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { + throw new IOException("Reached EOF before filling buffer\n" + + s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") + } + } buf.flip() buf } else { @@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt var is: FileInputStream = null try { is = new FileInputStream(file) - is.skip(offset) + ByteStreams.skipFully(is, offset) ByteStreams.limit(is, length) } catch { case e: IOException => diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala index 4ab34336d3..b5cd34cacd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -20,6 +20,8 @@ package org.apache.spark.shuffle import java.io._ import java.nio.ByteBuffer +import com.google.common.io.ByteStreams + import org.apache.spark.SparkEnv import org.apache.spark.network.{ManagedBuffer, FileSegmentManagedBuffer} import org.apache.spark.storage._ @@ -101,7 +103,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager { val in = new DataInputStream(new FileInputStream(indexFile)) try { - in.skip(blockId.reduceId * 8) + ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() new FileSegmentManagedBuffer( diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index bac459e835..8dadf67940 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, FileOutputStream, RandomAccessFile} +import java.io.{IOException, File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode @@ -110,7 +110,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc // For small files, directly read rather than memory map if (length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(length.toInt) - channel.read(buf, offset) + channel.position(offset) + while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { + throw new IOException("Reached EOF before filling buffer\n" + + s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") + } + } buf.flip() Some(buf) } else { diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 932b561604..6dbad5ff05 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.IOException import java.nio.ByteBuffer +import com.google.common.io.ByteStreams import tachyon.client.{ReadType, WriteType} import org.apache.spark.Logging @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) - var buffer: ByteBuffer = null + assert (is != null) try { - if (is != null) { - val size = file.length - val bs = new Array[Byte](size.asInstanceOf[Int]) - val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) - buffer = ByteBuffer.wrap(bs) - if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + - s"is not equal to fetched size $fetchSize") - return None - } - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { case ioe: IOException => logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) - return None + None } - Some(buffer) } override def contains(blockId: BlockId): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 93ac9f1c33..4660030155 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -33,7 +33,7 @@ import scala.reflect.ClassTag import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} -import com.google.common.io.Files +import com.google.common.io.{ByteStreams, Files} import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration @@ -1062,8 +1062,8 @@ private[spark] object Utils extends Logging { val stream = new FileInputStream(file) try { - stream.skip(effectiveStart) - stream.read(buff) + ByteStreams.skipFully(stream, effectiveStart) + ByteStreams.readFully(stream, buff) } finally { stream.close() } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index a8867020e4..379c2a6ea4 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io._ import java.util.jar.{JarEntry, JarOutputStream} +import com.google.common.io.ByteStreams import org.scalatest.FunSuite import org.apache.spark.SparkContext._ @@ -58,12 +59,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { jar.putNextEntry(jarEntry) val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jar) in.close() jar.close() -- cgit v1.2.3