aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-10-28 00:04:16 -0700
committerReynold Xin <rxin@databricks.com>2014-10-28 00:04:16 -0700
commit46c63417c1bb1aea07baf9036cc5b8f1c3781bbe (patch)
tree8e30ee3760e4b9468b9a7d2b32e4d252aa953a67
parent4ceb048b38949dd0a909d2ee6777607341c9c93a (diff)
downloadspark-46c63417c1bb1aea07baf9036cc5b8f1c3781bbe.tar.gz
spark-46c63417c1bb1aea07baf9036cc5b8f1c3781bbe.tar.bz2
spark-46c63417c1bb1aea07baf9036cc5b8f1c3781bbe.zip
[SPARK-4107] Fix incorrect handling of read() and skip() return values
`read()` may return fewer bytes than requested; when this occurred, the old code would silently return less data than requested, which might cause stream corruption errors. `skip()` faces similar issues, too. This patch fixes several cases where we mis-handle these methods' return values. Author: Josh Rosen <joshrosen@databricks.com> Closes #2969 from JoshRosen/file-channel-read-fix and squashes the following commits: e724a9f [Josh Rosen] Fix similar issue of not checking skip() return value. cbc03ce [Josh Rosen] Update the other log message, too. 01e6015 [Josh Rosen] file.getName -> file.getAbsolutePath d961d95 [Josh Rosen] Fix another issue in FileServerSuite. b9265d2 [Josh Rosen] Fix a similar (minor) issue in TestUtils. cd9d76f [Josh Rosen] Fix a similar error in Tachyon: 3db0008 [Josh Rosen] Fix a similar read() error in Utils.offsetBytes(). db985ed [Josh Rosen] Fix unsafe usage of FileChannel.read():
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala8
7 files changed, 33 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index e72826dc25..34078142f5 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
-import com.google.common.io.Files
import org.apache.spark.util.Utils
@@ -64,12 +64,7 @@ private[spark] object TestUtils {
jarStream.putNextEntry(jarEntry)
val in = new FileInputStream(file)
- val buffer = new Array[Byte](10240)
- var nRead = 0
- while (nRead <= 0) {
- nRead = in.read(buffer, 0, buffer.length)
- jarStream.write(buffer, 0, nRead)
- }
+ ByteStreams.copy(in, jarStream)
in.close()
}
jarStream.close()
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index 4c9ca97a2a..4211ba4e43 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -81,7 +81,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
val buf = ByteBuffer.allocate(length.toInt)
- channel.read(buf, offset)
+ channel.position(offset)
+ while (buf.remaining() != 0) {
+ if (channel.read(buf) == -1) {
+ throw new IOException("Reached EOF before filling buffer\n" +
+ s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+ }
+ }
buf.flip()
buf
} else {
@@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
var is: FileInputStream = null
try {
is = new FileInputStream(file)
- is.skip(offset)
+ ByteStreams.skipFully(is, offset)
ByteStreams.limit(is, length)
} catch {
case e: IOException =>
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 4ab34336d3..b5cd34cacd 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -20,6 +20,8 @@ package org.apache.spark.shuffle
import java.io._
import java.nio.ByteBuffer
+import com.google.common.io.ByteStreams
+
import org.apache.spark.SparkEnv
import org.apache.spark.network.{ManagedBuffer, FileSegmentManagedBuffer}
import org.apache.spark.storage._
@@ -101,7 +103,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
val in = new DataInputStream(new FileInputStream(indexFile))
try {
- in.skip(blockId.reduceId * 8)
+ ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index bac459e835..8dadf67940 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.io.{File, FileOutputStream, RandomAccessFile}
+import java.io.{IOException, File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
@@ -110,7 +110,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
// For small files, directly read rather than memory map
if (length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(length.toInt)
- channel.read(buf, offset)
+ channel.position(offset)
+ while (buf.remaining() != 0) {
+ if (channel.read(buf) == -1) {
+ throw new IOException("Reached EOF before filling buffer\n" +
+ s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+ }
+ }
buf.flip()
Some(buf)
} else {
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index 932b561604..6dbad5ff05 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.IOException
import java.nio.ByteBuffer
+import com.google.common.io.ByteStreams
import tachyon.client.{ReadType, WriteType}
import org.apache.spark.Logging
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
return None
}
val is = file.getInStream(ReadType.CACHE)
- var buffer: ByteBuffer = null
+ assert (is != null)
try {
- if (is != null) {
- val size = file.length
- val bs = new Array[Byte](size.asInstanceOf[Int])
- val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
- buffer = ByteBuffer.wrap(bs)
- if (fetchSize != size) {
- logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " +
- s"is not equal to fetched size $fetchSize")
- return None
- }
- }
+ val size = file.length
+ val bs = new Array[Byte](size.asInstanceOf[Int])
+ ByteStreams.readFully(is, bs)
+ Some(ByteBuffer.wrap(bs))
} catch {
case ioe: IOException =>
logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
- return None
+ None
}
- Some(buffer)
}
override def contains(blockId: BlockId): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 93ac9f1c33..4660030155 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -33,7 +33,7 @@ import scala.reflect.ClassTag
import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}
-import com.google.common.io.Files
+import com.google.common.io.{ByteStreams, Files}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
@@ -1062,8 +1062,8 @@ private[spark] object Utils extends Logging {
val stream = new FileInputStream(file)
try {
- stream.skip(effectiveStart)
- stream.read(buff)
+ ByteStreams.skipFully(stream, effectiveStart)
+ ByteStreams.readFully(stream, buff)
} finally {
stream.close()
}
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index a8867020e4..379c2a6ea4 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import java.io._
import java.util.jar.{JarEntry, JarOutputStream}
+import com.google.common.io.ByteStreams
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
@@ -58,12 +59,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
jar.putNextEntry(jarEntry)
val in = new FileInputStream(textFile)
- val buffer = new Array[Byte](10240)
- var nRead = 0
- while (nRead <= 0) {
- nRead = in.read(buffer, 0, buffer.length)
- jar.write(buffer, 0, nRead)
- }
+ ByteStreams.copy(in, jar)
in.close()
jar.close()