aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-04 16:49:30 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-06 17:14:41 -0700
commit65113b7e1b32b4bc2cd879ac8be86562a7996120 (patch)
treeafdd0129393591dc0279fc4d3055907b3d066cbb /core
parent716e10ca32ecb470da086290ac7414360f6e7d0a (diff)
downloadspark-65113b7e1b32b4bc2cd879ac8be86562a7996120.tar.gz
spark-65113b7e1b32b4bc2cd879ac8be86562a7996120.tar.bz2
spark-65113b7e1b32b4bc2cd879ac8be86562a7996120.zip
Only group elements ten at a time into SequenceFile records in
saveAsObjectFile
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index f0d2b2d783..f32ff475da 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -415,7 +415,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
def saveAsObjectFile(path: String) {
- this.glom
+ this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
@@ -424,4 +424,4 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
private[spark] def collectPartitions(): Array[Array[T]] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
-} \ No newline at end of file
+}