diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-02-03 06:44:49 +0000 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-02-03 06:44:49 +0000 |
commit | 8fbd5380b7f36842297f624bad3a2513f7eca47b (patch) | |
tree | acddfb8879427a107b33b7b72d8257a02e513ba5 /core | |
parent | 2415c18f48fc28d88f29b88c312f98054f530f20 (diff) | |
download | spark-8fbd5380b7f36842297f624bad3a2513f7eca47b.tar.gz spark-8fbd5380b7f36842297f624bad3a2513f7eca47b.tar.bz2 spark-8fbd5380b7f36842297f624bad3a2513f7eca47b.zip |
Fetch fewer objects in PySpark's take() method.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 39758e94f4..ab8351e55e 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -238,6 +238,11 @@ private[spark] object PythonRDD { } def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) { + import scala.collection.JavaConverters._ + writeIteratorToPickleFile(items.asScala, filename) + } + + def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) { val file = new DataOutputStream(new FileOutputStream(filename)) for (item <- items) { writeAsPickle(item, file) @@ -245,8 +250,10 @@ private[spark] object PythonRDD { file.close() } - def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] = - rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head + def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { + implicit val cm : ClassManifest[T] = rdd.elementClassManifest + rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator + } } private object Pickle { |