aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
diff options
context:
space:
mode:
authorDmitriy Lyubimov <dlyubimov@apache.org>2013-07-31 21:41:00 -0700
committerDmitriy Lyubimov <dlyubimov@apache.org>2013-07-31 21:41:00 -0700
commit28f1550f0134bad1391682135b9bfc43cb19fc01 (patch)
treee369bc11dd04d09d1afa5c78dfef4c2478c7c03a /core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
parent7c52ecc6a44d6898858c368a4857bdc89c2a5c2d (diff)
downloadspark-28f1550f0134bad1391682135b9bfc43cb19fc01.tar.gz
spark-28f1550f0134bad1391682135b9bfc43cb19fc01.tar.bz2
spark-28f1550f0134bad1391682135b9bfc43cb19fc01.zip
More elegant rewrite of the same.
Diffstat (limited to 'core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala15
1 files changed, 2 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
index 672c623537..33079cd539 100644
--- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
@@ -23,7 +23,6 @@ import scala.collection.Map
import spark._
import java.io._
import scala.Serializable
-import util.{NestedInputStream, NestedOutputStream}
private[spark] class ParallelCollectionPartition[T: ClassManifest](
var rddId: Long,
@@ -57,12 +56,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
out.writeInt(slice)
val ser = sfactory.newInstance()
- val ssout = ser.serializeStream(new NestedOutputStream(out))
- try {
- ssout.writeObject(values)
- } finally {
- ssout.close()
- }
+ Utils.serializeViaNestedStream(out, ser)(_.writeObject(values))
}
}
@@ -77,12 +71,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
slice = in.readInt()
val ser = sfactory.newInstance()
- val ssin = ser.deserializeStream(new NestedInputStream(in))
- try {
- values = ssin.readObject()
- } finally {
- ssin.close()
- }
+ Utils.deserializeViaNestedStream(in, ser)(ds => values = ds.readObject())
}
}
}