aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-03-23 11:45:45 -0500
committerStephen Haberman <stephen@exigencecorp.com>2013-03-23 11:45:45 -0500
commit4ca273edc4b19894675e96fb62b3afb04ce77a37 (patch)
tree4e0c0e38def464f18106cd24fc2eed90ef123bc5 /core
parent00170eb0b9c30c1ab4f470c694b434385b8cbb7a (diff)
parentb8949cab889da4ba0f613a17b2eef52a32476410 (diff)
downloadspark-4ca273edc4b19894675e96fb62b3afb04ce77a37.tar.gz
spark-4ca273edc4b19894675e96fb62b3afb04ce77a37.tar.bz2
spark-4ca273edc4b19894675e96fb62b3afb04ce77a37.zip
Merge branch 'master' into shufflecoalesce
Conflicts: core/src/test/scala/spark/RDDSuite.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala66
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala87
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala16
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala13
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala43
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala92
6 files changed, 251 insertions, 66 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index b88d711dcf..33dc7627a3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -373,6 +373,62 @@ abstract class RDD[T: ClassManifest](
new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
/**
+ * Maps f over this RDD, where f takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => U): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val a = constructA(index)
+ iter.map(t => f(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * FlatMaps f over this RDD, where f takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => Seq[U]): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val a = constructA(index)
+ iter.flatMap(t => f(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * Applies f to each element of this RDD, where f takes an additional parameter of type A.
+ * This additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def foreachWith[A: ClassManifest](constructA: Int => A)
+ (f:(T, A) => Unit) {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+ val a = constructA(index)
+ iter.map(t => {f(t, a); t})
+ }
+ (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
+ }
+
+ /**
+ * Filters this RDD with p, where p takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def filterWith[A: ClassManifest](constructA: Int => A)
+ (p:(T, A) => Boolean): RDD[T] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+ val a = constructA(index)
+ iter.filter(t => p(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
+ }
+
+ /**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
@@ -391,6 +447,14 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Applies a function f to each partition of this RDD.
+ */
+ def foreachPartition(f: Iterator[T] => Unit) {
+ val cleanF = sc.clean(f)
+ sc.runJob(this, (iter: Iterator[T]) => f(iter))
+ }
+
+ /**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
@@ -412,7 +476,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 4474ef4593..3e7407b58d 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -16,66 +16,61 @@ import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
*/
-private[spark] class Executor extends Logging {
- var urlClassLoader : ExecutorURLClassLoader = null
- var threadPool: ExecutorService = null
- var env: SparkEnv = null
-
+private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
+
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
- val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
- val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
+ private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
+ private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
- val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
+ private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
initLogging()
- def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
- // Make sure the local hostname we report matches the cluster scheduler's name for this host
- Utils.setCustomHostname(slaveHostname)
+ // Make sure the local hostname we report matches the cluster scheduler's name for this host
+ Utils.setCustomHostname(slaveHostname)
- // Set spark.* system properties from executor arg
- for ((key, value) <- properties) {
- System.setProperty(key, value)
- }
+ // Set spark.* system properties from executor arg
+ for ((key, value) <- properties) {
+ System.setProperty(key, value)
+ }
+
+ // Create our ClassLoader and set it on this thread
+ private val urlClassLoader = createClassLoader()
+ Thread.currentThread.setContextClassLoader(urlClassLoader)
- // Create our ClassLoader and set it on this thread
- urlClassLoader = createClassLoader()
- Thread.currentThread.setContextClassLoader(urlClassLoader)
-
- // Make any thread terminations due to uncaught exceptions kill the entire
- // executor process to avoid surprising stalls.
- Thread.setDefaultUncaughtExceptionHandler(
- new Thread.UncaughtExceptionHandler {
- override def uncaughtException(thread: Thread, exception: Throwable) {
- try {
- logError("Uncaught exception in thread " + thread, exception)
-
- // We may have been called from a shutdown hook. If so, we must not call System.exit().
- // (If we do, we will deadlock.)
- if (!Utils.inShutdown()) {
- if (exception.isInstanceOf[OutOfMemoryError]) {
- System.exit(ExecutorExitCode.OOM)
- } else {
- System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
- }
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+
+ // We may have been called from a shutdown hook. If so, we must not call System.exit().
+ // (If we do, we will deadlock.)
+ if (!Utils.inShutdown()) {
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
- } catch {
- case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
- case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
+ } catch {
+ case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+ case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
- )
+ }
+ )
- // Initialize Spark environment (using system properties read above)
- env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
- SparkEnv.set(env)
+ // Initialize Spark environment (using system properties read above)
+ val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
+ SparkEnv.set(env)
- // Start worker thread pool
- threadPool = new ThreadPoolExecutor(
- 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
- }
+ // Start worker thread pool
+ val threadPool = new ThreadPoolExecutor(
+ 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 818d6d1dda..10f3531df0 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
-private[spark] class MesosExecutorBackend(executor: Executor)
+private[spark] class MesosExecutorBackend
extends MesosExecutor
with ExecutorBackend
with Logging {
+ var executor: Executor = null
var driver: ExecutorDriver = null
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(
+ executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
- properties
- )
+ properties)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
val taskId = taskInfo.getTaskId.getValue.toLong
- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+ if (executor == null) {
+ logError("Received launchTask but executor was null")
+ } else {
+ executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+ }
}
override def error(d: ExecutorDriver, message: String) {
@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
- val runner = new MesosExecutorBackend(new Executor)
+ val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
}
}
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 9a82c3054c..1047f71c6a 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
- executor: Executor,
driverUrl: String,
executorId: String,
hostname: String,
@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend
with Logging {
+ var executor: Executor = null
var driver: ActorRef = null
override def preStart() {
@@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend(
override def receive = {
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
- executor.initialize(executorId, hostname, sparkProperties)
+ executor = new Executor(executorId, hostname, sparkProperties)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
@@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ if (executor == null) {
+ logError("Received launchTask but executor was null")
+ System.exit(1)
+ } else {
+ executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ }
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.")
@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend {
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
+ Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 65b4621b87..9213513e80 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -2,10 +2,11 @@ package spark.rdd
import java.io.{ObjectOutputStream, IOException}
import java.util.{HashMap => JHashMap}
+
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Partition, TaskContext}
+import spark.{Aggregator, Logging, Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -28,7 +29,8 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark]
-class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Partition with Serializable {
+class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep])
+ extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}
@@ -40,7 +42,19 @@ private[spark] class CoGroupAggregator
{ (b1, b2) => b1 ++ b2 })
with Serializable
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+
+/**
+ * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
+ * tuple with the list of values for that key.
+ *
+ * @param rdds parent RDDs.
+ * @param part partitioner used to partition the shuffle output.
+ * @param mapSideCombine flag indicating whether to merge values before shuffle step.
+ */
+class CoGroupedRDD[K](
+ @transient var rdds: Seq[RDD[(K, _)]],
+ part: Partitioner,
+ val mapSideCombine: Boolean = true)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private val aggr = new CoGroupAggregator
@@ -52,8 +66,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+ if (mapSideCombine) {
+ val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
+ new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+ } else {
+ new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part)
+ }
}
}
}
@@ -82,6 +100,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
val numRdds = split.deps.size
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
+
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
val seq = map.get(k)
if (seq != null) {
@@ -92,6 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
seq
}
}
+
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
@@ -102,9 +122,16 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- val fetchItr = fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics)
- for ((k, vs) <- fetchItr) {
- getSeq(k)(depNum) ++= vs
+ if (mapSideCombine) {
+ // With map side combine on, for each key, the shuffle fetcher returns a list of values.
+ fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics).foreach {
+ case (key, values) => getSeq(key)(depNum) ++= values
+ }
+ } else {
+ // With map side combine off, for each key the shuffle fetcher returns a single value.
+ fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics).foreach {
+ case (key, value) => getSeq(key)(depNum) += value
+ }
}
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index bcbb472f6c..a6178867bc 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,7 +3,7 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, PartitionPruningRDD, ShuffledRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -123,6 +123,36 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("cogrouped RDDs") {
+ sc = new SparkContext("local", "test")
+ val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)
+ val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2)
+
+ // Use cogroup function
+ val cogrouped = rdd1.cogroup(rdd2).collectAsMap()
+ assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1")))
+ assert(cogrouped(2) === (Seq("two"), Seq("two1")))
+ assert(cogrouped(3) === (Seq("three"), Seq()))
+
+ // Construct CoGroupedRDD directly, with map side combine enabled
+ val cogrouped1 = new CoGroupedRDD[Int](
+ Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
+ new HashPartitioner(3),
+ true).collectAsMap()
+ assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
+ assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1")))
+ assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq()))
+
+ // Construct CoGroupedRDD directly, with map side combine disabled
+ val cogrouped2 = new CoGroupedRDD[Int](
+ Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
+ new HashPartitioner(3),
+ false).collectAsMap()
+ assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
+ assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1")))
+ assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq()))
+ }
+
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
@@ -182,4 +212,64 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(prunedData.size === 1)
assert(prunedData(0) === 10)
}
+
+ test("mapWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ val randoms = ones.mapWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
+ val prn42_3 = {
+ val prng42 = new Random(42)
+ prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+ }
+ val prn43_3 = {
+ val prng43 = new Random(43)
+ prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+ }
+ assert(randoms(2) === prn42_3)
+ assert(randoms(5) === prn43_3)
+ }
+
+ test("flatMapWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ val randoms = ones.flatMapWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) =>
+ val random = prng.nextDouble()
+ Seq(random * t, random * t * 10)}.
+ collect()
+ val prn42_3 = {
+ val prng42 = new Random(42)
+ prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+ }
+ val prn43_3 = {
+ val prng43 = new Random(43)
+ prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+ }
+ assert(randoms(5) === prn42_3 * 10)
+ assert(randoms(11) === prn43_3 * 10)
+ }
+
+ test("filterWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
+ val sample = ints.filterWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
+ collect()
+ val checkSample = {
+ val prng42 = new Random(42)
+ val prng43 = new Random(43)
+ Array(1, 2, 3, 4, 5, 6).filter{i =>
+ if (i < 4) 0 == prng42.nextInt(3)
+ else 0 == prng43.nextInt(3)}
+ }
+ assert(sample.size === checkSample.size)
+ for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
+ }
}