aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala45
1 files changed, 16 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 33e4ee0215..280e7a5fe8 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
import scala.collection.JavaConverters._
-import com.google.common.io.ByteStreams
+import com.google.common.io.{Closeables, ByteStreams}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
@@ -82,7 +82,6 @@ private[spark] abstract class StreamBasedRecordReader[T](
if (!processed) {
val fileIn = new PortableDataStream(split, context, index)
value = parseStream(fileIn)
- fileIn.close() // if it has not been open yet, close does nothing
key = fileIn.getPath
processed = true
true
@@ -134,12 +133,6 @@ class PortableDataStream(
index: Integer)
extends Serializable {
- // transient forces file to be reopened after being serialization
- // it is also used for non-serializable classes
-
- @transient private var fileIn: DataInputStream = null
- @transient private var isOpen = false
-
private val confBytes = {
val baos = new ByteArrayOutputStream()
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
@@ -175,40 +168,34 @@ class PortableDataStream(
}
/**
- * Create a new DataInputStream from the split and context
+ * Create a new DataInputStream from the split and context. The user of this method is responsible
+ * for closing the stream after usage.
*/
def open(): DataInputStream = {
- if (!isOpen) {
- val pathp = split.getPath(index)
- val fs = pathp.getFileSystem(conf)
- fileIn = fs.open(pathp)
- isOpen = true
- }
- fileIn
+ val pathp = split.getPath(index)
+ val fs = pathp.getFileSystem(conf)
+ fs.open(pathp)
}
/**
* Read the file as a byte array
*/
def toArray(): Array[Byte] = {
- open()
- val innerBuffer = ByteStreams.toByteArray(fileIn)
- close()
- innerBuffer
+ val stream = open()
+ try {
+ ByteStreams.toByteArray(stream)
+ } finally {
+ Closeables.close(stream, true)
+ }
}
/**
- * Close the file (if it is currently open)
+ * Closing the PortableDataStream is not needed anymore. The user either can use the
+ * PortableDataStream to get a DataInputStream (which the user needs to close after usage),
+ * or a byte array.
*/
+ @deprecated("Closing the PortableDataStream is not needed anymore.", "1.6.0")
def close(): Unit = {
- if (isOpen) {
- try {
- fileIn.close()
- isOpen = false
- } catch {
- case ioe: java.io.IOException => // do nothing
- }
- }
}
def getPath(): String = path