aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhaitao.yao <yao.erix@gmail.com>2013-02-05 14:09:45 +0800
committerhaitao.yao <yao.erix@gmail.com>2013-02-05 14:09:45 +0800
commitf609182e5bfc73110181f8c432cea460a74e61d6 (patch)
tree393223423b101e42f8d16285c2a4f5c6afa7155a
parentfaa4d9e31f4bef3947399d79e05b74dde24c7ebf (diff)
parentf6ec547ea7b56ee607a4c2a69206f8952318eaf1 (diff)
downloadspark-f609182e5bfc73110181f8c432cea460a74e61d6.tar.gz
spark-f609182e5bfc73110181f8c432cea460a74e61d6.tar.bz2
spark-f609182e5bfc73110181f8c432cea460a74e61d6.zip
Merge branch 'mesos'
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala11
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala3
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala2
-rw-r--r--python/pyspark/accumulators.py9
-rw-r--r--python/pyspark/broadcast.py9
-rw-r--r--python/pyspark/context.py4
-rw-r--r--python/pyspark/rdd.py8
7 files changed, 21 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 39758e94f4..ab8351e55e 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -238,6 +238,11 @@ private[spark] object PythonRDD {
}
def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
+ import scala.collection.JavaConverters._
+ writeIteratorToPickleFile(items.asScala, filename)
+ }
+
+ def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) {
val file = new DataOutputStream(new FileOutputStream(filename))
for (item <- items) {
writeAsPickle(item, file)
@@ -245,8 +250,10 @@ private[spark] object PythonRDD {
file.close()
}
- def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] =
- rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head
+ def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
+ implicit val cm : ClassManifest[T] = rdd.elementClassManifest
+ rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
+ }
}
private object Pickle {
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index f4e7ec39fe..dd19442dcb 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -79,8 +79,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
try {
System.clearProperty("spark.driver.host") // In case some previous test had set it
- val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("test", "localhost", 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0)
System.setProperty("spark.driver.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 89a3687386..fe7deb10d6 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -14,7 +14,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct().count() === 4)
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
- assert(dups.distinct().collect === dups.distinct().collect)
+ assert(dups.distinct.collect === dups.distinct().collect)
assert(dups.distinct(2).collect === dups.distinct().collect)
assert(nums.reduce(_ + _) === 10)
assert(nums.fold(0)(_ + _) === 10)
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 61fcbbd376..3e9d7d36da 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -196,12 +196,3 @@ def _start_update_server():
thread.daemon = True
thread.start()
return server
-
-
-def _test():
- import doctest
- doctest.testmod()
-
-
-if __name__ == "__main__":
- _test()
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 93876fa738..def810dd46 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -37,12 +37,3 @@ class Broadcast(object):
def __reduce__(self):
self._pickle_registry.add(self)
return (_from_id, (self.bid, ))
-
-
-def _test():
- import doctest
- doctest.testmod()
-
-
-if __name__ == "__main__":
- _test()
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6831f9b7f8..657fe6f989 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -256,8 +256,10 @@ def _test():
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
- doctest.testmod(globs=globs)
+ (failure_count, test_count) = doctest.testmod(globs=globs)
globs['sc'].stop()
+ if failure_count:
+ exit(-1)
if __name__ == "__main__":
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 41ea6e6e14..4cda6cf661 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -372,6 +372,10 @@ class RDD(object):
items = []
for partition in range(self._jrdd.splits().size()):
iterator = self.ctx._takePartition(self._jrdd.rdd(), partition)
+ # Each item in the iterator is a string, Python object, batch of
+ # Python objects. Regardless, it is sufficient to take `num`
+ # of these objects in order to collect `num` Python objects:
+ iterator = iterator.take(num)
items.extend(self._collect_iterator_through_file(iterator))
if len(items) >= num:
break
@@ -748,8 +752,10 @@ def _test():
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
- doctest.testmod(globs=globs)
+ (failure_count, test_count) = doctest.testmod(globs=globs)
globs['sc'].stop()
+ if failure_count:
+ exit(-1)
if __name__ == "__main__":