aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala47
-rw-r--r--core/src/main/scala/org/apache/spark/util/NextIterator.scala4
4 files changed, 66 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 77b57132b9..d841f05ec5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -251,8 +251,21 @@ class HadoopRDD[K, V](
}
override def close() {
- try {
- reader.close()
+ if (reader != null) {
+ // Close the reader and release it. Note: it's very important that we don't close the
+ // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+ // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
+ // corruption issues when reading compressed input.
+ try {
+ reader.close()
+ } catch {
+ case e: Exception =>
+ if (!ShutdownHookManager.inShutdown()) {
+ logWarning("Exception in RecordReader.close()", e)
+ }
+ } finally {
+ reader = null
+ }
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
@@ -266,12 +279,6 @@ class HadoopRDD[K, V](
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
- } catch {
- case e: Exception => {
- if (!ShutdownHookManager.inShutdown()) {
- logWarning("Exception in RecordReader.close()", e)
- }
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2872b93b87..9c4b70844b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -184,30 +184,32 @@ class NewHadoopRDD[K, V](
}
private def close() {
- try {
- if (reader != null) {
- // Close reader and release it
+ if (reader != null) {
+ // Close the reader and release it. Note: it's very important that we don't close the
+ // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+ // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
+ // corruption issues when reading compressed input.
+ try {
reader.close()
- reader = null
-
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
- } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
- split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
- // If we can't get the bytes read from the FS stats, fall back to the split size,
- // which may be inaccurate.
- try {
- inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
- } catch {
- case e: java.io.IOException =>
- logWarning("Unable to get input size to set InputMetrics for task", e)
+ } catch {
+ case e: Exception =>
+ if (!ShutdownHookManager.inShutdown()) {
+ logWarning("Exception in RecordReader.close()", e)
}
- }
+ } finally {
+ reader = null
}
- } catch {
- case e: Exception => {
- if (!ShutdownHookManager.inShutdown()) {
- logWarning("Exception in RecordReader.close()", e)
+ if (bytesReadCallback.isDefined) {
+ inputMetrics.updateBytesRead()
+ } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+ split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+ // If we can't get the bytes read from the FS stats, fall back to the split size,
+ // which may be inaccurate.
+ try {
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 0228c54e05..264dae7f39 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -189,32 +189,35 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
}
private def close() {
- try {
- if (reader != null) {
+ if (reader != null) {
+ SqlNewHadoopRDD.unsetInputFileName()
+ // Close the reader and release it. Note: it's very important that we don't close the
+ // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+ // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
+ // corruption issues when reading compressed input.
+ try {
reader.close()
- reader = null
-
- SqlNewHadoopRDD.unsetInputFileName()
-
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
- } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
- split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
- // If we can't get the bytes read from the FS stats, fall back to the split size,
- // which may be inaccurate.
- try {
- inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
- } catch {
- case e: java.io.IOException =>
- logWarning("Unable to get input size to set InputMetrics for task", e)
+ } catch {
+ case e: Exception =>
+ if (!ShutdownHookManager.inShutdown()) {
+ logWarning("Exception in RecordReader.close()", e)
}
- }
+ } finally {
+ reader = null
}
- } catch {
- case e: Exception =>
- if (!ShutdownHookManager.inShutdown()) {
- logWarning("Exception in RecordReader.close()", e)
+ if (bytesReadCallback.isDefined) {
+ inputMetrics.updateBytesRead()
+ } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+ split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+ // If we can't get the bytes read from the FS stats, fall back to the split size,
+ // which may be inaccurate.
+ try {
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
}
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
index e5c732a5a5..0b505a5767 100644
--- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
@@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
*/
def closeIfNeeded() {
if (!closed) {
- close()
+ // Note: it's important that we set closed = true before calling close(), since setting it
+ // afterwards would permit us to call close() multiple times if close() threw an exception.
closed = true
+ close()
}
}