aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-29 12:38:24 -0700
committerAaron Davidson <aaron@databricks.com>2014-09-29 12:38:24 -0700
commite43c72fe04d4fbf2a108b456d533e641b71b0a2a (patch)
tree87f640883744f80bfd72b623ab4fbb66829341ff
parentdab1b0ae29a6d3017bdca23464f22a51d51eaae1 (diff)
downloadspark-e43c72fe04d4fbf2a108b456d533e641b71b0a2a.tar.gz
spark-e43c72fe04d4fbf2a108b456d533e641b71b0a2a.tar.bz2
spark-e43c72fe04d4fbf2a108b456d533e641b71b0a2a.zip
Add more debug message for ManagedBuffer
This is to help debug the error reported at http://apache-spark-user-list.1001560.n3.nabble.com/SQL-queries-fail-in-1-2-0-SNAPSHOT-td15327.html Author: Reynold Xin <rxin@apache.org> Closes #2580 from rxin/buffer-debug and squashes the following commits: 5814292 [Reynold Xin] Logging close() in case close() fails. 323dfec [Reynold Xin] Add more debug message.
-rw-r--r--core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala14
2 files changed, 51 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index e990c1da67..a4409181ec 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -17,15 +17,17 @@
package org.apache.spark.network
-import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
+import java.io._
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode
+import scala.util.Try
+
import com.google.common.io.ByteStreams
import io.netty.buffer.{ByteBufInputStream, ByteBuf}
-import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
/**
@@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
+ } catch {
+ case e: IOException =>
+ Try(channel.size).toOption match {
+ case Some(fileLen) =>
+ throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
+ case None =>
+ throw new IOException(s"Error in opening $this", e)
+ }
} finally {
if (channel != null) {
- channel.close()
+ Utils.tryLog(channel.close())
}
}
}
override def inputStream(): InputStream = {
- val is = new FileInputStream(file)
- is.skip(offset)
- ByteStreams.limit(is, length)
+ var is: FileInputStream = null
+ try {
+ is = new FileInputStream(file)
+ is.skip(offset)
+ ByteStreams.limit(is, length)
+ } catch {
+ case e: IOException =>
+ if (is != null) {
+ Utils.tryLog(is.close())
+ }
+ Try(file.length).toOption match {
+ case Some(fileLen) =>
+ throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
+ case None =>
+ throw new IOException(s"Error in opening $this", e)
+ }
+ case e: Throwable =>
+ if (is != null) {
+ Utils.tryLog(is.close())
+ }
+ throw e
+ }
}
+
+ override def toString: String = s"${getClass.getName}($file, $offset, $length)"
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2755887fee..10d440828e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging {
}
}
+ /** Executes the given block in a Try, logging any uncaught exceptions. */
+ def tryLog[T](f: => T): Try[T] = {
+ try {
+ val res = f
+ scala.util.Success(res)
+ } catch {
+ case ct: ControlThrowable =>
+ throw ct
+ case t: Throwable =>
+ logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
+ scala.util.Failure(t)
+ }
+ }
+
/** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
def isFatalError(e: Throwable): Boolean = {
e match {