From e7ca142b45255f6b41582c25fe590a664d5fc8b9 Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Wed, 20 Oct 2010 20:20:00 +0000 Subject: Some exception handling fixes in parallel colle... Some exception handling fixes in parallel collections. Fixed some regressions. Fixed some tests. No review. --- .../collection/parallel/ParIterableLike.scala | 54 ++++++------ .../scala/collection/parallel/ParMapLike.scala | 2 +- .../scala/collection/parallel/ParSeqLike.scala | 4 +- src/library/scala/collection/parallel/Tasks.scala | 22 +++-- .../collection/parallel/immutable/ParHashMap.scala | 2 + .../collection/parallel/mutable/ParHashMap.scala | 32 ++++--- .../collection/parallel/mutable/ParMapLike.scala | 4 +- .../scala/collection/parallel/package.scala | 92 +++++++++++--------- src/partest/scala/tools/partest/nest/Worker.scala | 4 +- .../parallel-collections/PairOperators.scala | 97 ++++++++++++++++++++++ .../parallel-collections/PairValues.scala | 28 +++++++ .../parallel-collections/ParallelArrayCheck.scala | 2 +- .../ParallelHashMapCheck.scala | 66 +++++++++++++++ .../ParallelIterableCheck.scala | 17 +++- .../files/scalacheck/parallel-collections/pc.scala | 29 ++++++- 15 files changed, 359 insertions(+), 96 deletions(-) create mode 100644 test/files/scalacheck/parallel-collections/PairOperators.scala create mode 100644 test/files/scalacheck/parallel-collections/PairValues.scala create mode 100644 test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 881ab80038..3d839119b0 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -227,15 +227,11 @@ self => def map(r: R): R1 = mapping(r) } - def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { - val ft = tsk - val st = t2 + def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]](tsk, t2) { def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) } - def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { - val ft = tsk - val st = t2 + def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]](tsk, t2) { def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) } } @@ -460,7 +456,9 @@ self => othtask.compute othtask.result } - val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result } + val task = (copythis parallel copythat) { _ combine _ } mapResult { + _.result + } executeAndWaitResult(task) } else if (bf.isParallel) { // println("case parallel builder, `that` not parallel") @@ -687,37 +685,42 @@ self => protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]] - protected[this] trait Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + protected[this] abstract class Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + (val ft: First, val st: Second) extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] { - val ft: First - val st: Second def combineResults(fr: FR, sr: SR): R var result: R = null.asInstanceOf[R] private[parallel] override def signalAbort { ft.signalAbort st.signalAbort } + protected def mergeSubtasks { + ft mergeThrowables st + if (throwable eq null) result = combineResults(ft.result, st.result) + } override def requiresStrictSplitters = ft.requiresStrictSplitters || st.requiresStrictSplitters } /** Sequentially performs one task after another. */ - protected[this] trait SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] - extends Composite[FR, SR, R, First, Second] { + protected[this] abstract class SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + (f: First, s: Second) + extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { - ft.compute - st.compute - result = combineResults(ft.result, st.result) + executeAndWaitResult(ft) + executeAndWaitResult(st) + mergeSubtasks } } /** Performs two tasks in parallel, and waits for both to finish. */ - protected[this] trait ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] - extends Composite[FR, SR, R, First, Second] { + protected[this] abstract class ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + (f: First, s: Second) + extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { - st.start - ft.compute - st.sync - result = combineResults(ft.result, st.result) + val ftfuture = execute(ft) + executeAndWaitResult(st) + ftfuture() + mergeSubtasks } } @@ -727,7 +730,8 @@ self => def map(r: R): R1 def leaf(prevr: Option[R1]) = { inner.compute - result = map(inner.result) + throwable = inner.throwable + if (throwable eq null) result = map(inner.result) } private[parallel] override def signalAbort { inner.signalAbort @@ -756,7 +760,7 @@ self => protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p) override def merge(that: Reduce[U]) = if (this.result == None) result = that.result - else if (that.result != None) op(result.get, that.result.get) + else if (that.result != None) result = Some(op(result.get, that.result.get)) override def requiresStrictSplitters = true } @@ -890,7 +894,9 @@ self => protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Take[U, This]] { var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf())) + def leaf(prev: Option[Combiner[U, This]]) = { + result = pit.take2combiner(n, reuse(prev, cbf())) + } protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException override def split = { val pits = pit.split diff --git a/src/library/scala/collection/parallel/ParMapLike.scala b/src/library/scala/collection/parallel/ParMapLike.scala index e6944953b5..6eb621c76a 100644 --- a/src/library/scala/collection/parallel/ParMapLike.scala +++ b/src/library/scala/collection/parallel/ParMapLike.scala @@ -24,7 +24,7 @@ extends MapLike[K, V, Repr] protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner - protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.") + protected[this] override def newCombiner: Combiner[(K, V), Repr] = unsupportedop("Must implement `newCombiner` in concrete collections.") override def empty: Repr diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index d2e0f965d6..b3d0593ecd 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -249,7 +249,9 @@ self => tsk.result } val copyend = new Copy[U, That](() => pbf(repr), pits(2)) - executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result }) + executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { + _.result + }) } else patch_sequential(from, patch, replaced) private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 915e02f787..c58bc8b734 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -59,8 +59,8 @@ trait Tasks { protected[this] def merge(that: Tp) {} // exception handling mechanism - var exception: Exception = null - def forwardException = if (exception != null) throw exception + var throwable: Throwable = null + def forwardThrowable = if (throwable != null) throw throwable // tries to do the leaf computation, storing the possible exception protected def tryLeaf(result: Option[R]) { try { @@ -70,15 +70,21 @@ trait Tasks { signalAbort } } catch { - case e: Exception => - exception = e + case thr: Throwable => + throwable = thr signalAbort } } protected[this] def tryMerge(t: Tp) { val that = t.asInstanceOf[Task[R, Tp]] - if (this.exception == null && that.exception == null) merge(that.repr) - else if (that.exception != null) this.exception = that.exception + if (this.throwable == null && that.throwable == null) merge(t) + mergeThrowables(that) + } + private[parallel] def mergeThrowables(that: Task[_, _]) { + if (this.throwable != null && that.throwable != null) { + // merge exceptions, since there were multiple exceptions + this.throwable = this.throwable alongWith that.throwable + } else if (that.throwable != null) this.throwable = that.throwable } // override in concrete task implementations to signal abort to other tasks private[parallel] def signalAbort {} @@ -206,7 +212,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { () => { fjtask.join - fjtask.forwardException + fjtask.forwardThrowable fjtask.result } } @@ -225,7 +231,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { forkJoinPool.execute(fjtask) } fjtask.join - fjtask.forwardException + fjtask.forwardThrowable fjtask.result } diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 37b52b7a40..7cc0adbb4f 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -40,6 +40,8 @@ self => override def empty: ParHashMap[K, V] = new ParHashMap[K, V] + protected[this] override def newCombiner = HashMapCombiner[K, V] + def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI def seq = trie diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index fb4119bddc..3648945857 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -28,6 +28,8 @@ self => override def empty: ParHashMap[K, V] = new ParHashMap[K, V] + protected[this] override def newCombiner = ParHashMapCombiner[K, V] + def seq = new collection.mutable.HashMap[K, V](hashTableContents) def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI @@ -108,7 +110,9 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => // construct table val table = new AddingHashTable(size, tableLoadFactor) - executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks)) + val insertcount = executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks)) + + // TODO compare insertcount and size to see if compression is needed val c = table.hashTableContents new ParHashMap(c) @@ -118,7 +122,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => * it allocates the table of the required size when created. * * Entries are added using the `insertEntry` method. This method checks whether the element - * exists and updates the size map. + * exists and updates the size map. It returns false if the key was already in the table, + * and true if the key was successfully inserted. */ class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] { import HashTable._ @@ -127,7 +132,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => tableSize = 0 threshold = newThreshold(_loadFactor, table.length) sizeMapInit(table.length) - def insertEntry(e: DefaultEntry[K, V]) { + def insertEntry(e: DefaultEntry[K, V]) = { var h = index(elemHashCode(e.key)) var olde = table(h).asInstanceOf[DefaultEntry[K, V]] @@ -146,24 +151,27 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => table(h) = e tableSize = tableSize + 1 nnSizeMapAdd(h) - } + true + } else false } } /* tasks */ class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int) - extends super.Task[Unit, FillBlocks] { - var result = () - def leaf(prev: Option[Unit]) = { + extends super.Task[Int, FillBlocks] { + var result = Int.MinValue + def leaf(prev: Option[Int]) = { var i = offset val until = offset + howmany + result = 0 while (i < until) { - fillBlock(buckets(i)) + result += fillBlock(buckets(i)) i += 1 } } - private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) { + private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) = { + var insertcount = 0 var unrolled = elems var i = 0 val t = table @@ -172,17 +180,21 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => val chunksz = unrolled.size while (i < chunksz) { val elem = chunkarr(i) - t.insertEntry(elem) + if (t.insertEntry(elem)) insertcount += 1 i += 1 } i = 0 unrolled = unrolled.next } + insertcount } def split = { val fp = howmany / 2 List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp)) } + override def merge(that: FillBlocks) { + this.result += that.result + } def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel) } diff --git a/src/library/scala/collection/parallel/mutable/ParMapLike.scala b/src/library/scala/collection/parallel/mutable/ParMapLike.scala index 902186eb2d..25b5c3b5f7 100644 --- a/src/library/scala/collection/parallel/mutable/ParMapLike.scala +++ b/src/library/scala/collection/parallel/mutable/ParMapLike.scala @@ -12,6 +12,6 @@ trait ParMapLike[K, V, +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V], +Sequential <: collection.mutable.Map[K, V] with collection.mutable.MapLike[K, V, Sequential]] -extends collection.parallel.ParMapLike[K, V, Repr, Sequential] - with collection.mutable.MapLike[K, V, Repr] +extends collection.mutable.MapLike[K, V, Repr] + with collection.parallel.ParMapLike[K, V, Repr, Sequential] diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index a30d564039..bb53dcdaaa 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -31,12 +31,61 @@ package object parallel { else sz } - private[parallel] def unsupported(msg: String) = throw new UnsupportedOperationException(msg) - private[parallel] def unsupported = throw new UnsupportedOperationException + private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg) + + /* implicit conversions */ + + /** An implicit conversion providing arrays with a `par` method, which + * returns a parallel array. + * + * @tparam T type of the elements in the array, which is a subtype of AnyRef + * @param array the array to be parallelized + * @return a `Parallelizable` object with a `par` method= + */ + implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] { + def par = mutable.ParArray.handoff[T](array) + } + + implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new { + def isParallel = bf.isInstanceOf[Parallel] + def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]] + def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new { + def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody + } + } + + implicit def traversable2ops[T](t: TraversableOnce[T]) = new { + def isParallel = t.isInstanceOf[Parallel] + def isParIterable = t.isInstanceOf[ParIterable[_]] + def asParIterable = t.asInstanceOf[ParIterable[T]] + def isParSeq = t.isInstanceOf[ParSeq[_]] + def asParSeq = t.asInstanceOf[ParSeq[T]] + def ifParSeq[R](isbody: ParSeq[T] => R) = new { + def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody + } + def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else { + val it = t.toIterator + val cb = mutable.ParArrayCombiner[T]() + while (it.hasNext) cb += it.next + cb.result + } + } + + implicit def throwable2ops(self: Throwable) = new { + def alongWith(that: Throwable) = self match { + case ct: CompositeThrowable => new CompositeThrowable(ct.throwables + that) + case _ => new CompositeThrowable(Set(self, that)) + } + } + /* classes */ + /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */ + final class CompositeThrowable(val throwables: Set[Throwable]) + extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", ")) + /** Unrolled list node. */ private[parallel] class Unrolled[T: ClassManifest] { @@ -159,45 +208,6 @@ package object parallel { } else this } - - /* implicit conversions */ - - /** An implicit conversion providing arrays with a `par` method, which - * returns a parallel array. - * - * @tparam T type of the elements in the array, which is a subtype of AnyRef - * @param array the array to be parallelized - * @return a `Parallelizable` object with a `par` method= - */ - implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] { - def par = mutable.ParArray.handoff[T](array) - } - - implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new { - def isParallel = bf.isInstanceOf[Parallel] - def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]] - def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new { - def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody - } - } - - implicit def traversable2ops[T](t: TraversableOnce[T]) = new { - def isParallel = t.isInstanceOf[Parallel] - def isParIterable = t.isInstanceOf[ParIterable[_]] - def asParIterable = t.asInstanceOf[ParIterable[T]] - def isParSeq = t.isInstanceOf[ParSeq[_]] - def asParSeq = t.asInstanceOf[ParSeq[T]] - def ifParSeq[R](isbody: ParSeq[T] => R) = new { - def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody - } - def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else { - val it = t.toIterator - val cb = mutable.ParArrayCombiner[T]() - while (it.hasNext) cb += it.next - cb.result - } - } - } diff --git a/src/partest/scala/tools/partest/nest/Worker.scala b/src/partest/scala/tools/partest/nest/Worker.scala index fba24394a5..6e8ccff723 100644 --- a/src/partest/scala/tools/partest/nest/Worker.scala +++ b/src/partest/scala/tools/partest/nest/Worker.scala @@ -538,8 +538,8 @@ class Worker(val fileManager: FileManager, params: TestRunParams) extends Actor val lines = SFile(logFile).lines.filter(_.trim != "").toBuffer succeeded = { val failures = lines filter (_ startsWith "!") - val passedok = lines filter (_ startsWith "+") forall (_ contains "OK") - failures.isEmpty && passedok + //val passedok = lines filter (_ startsWith "+") forall (_ contains "OK") - OK may wrap!! + failures.isEmpty } if (!succeeded) { NestUI.normal("ScalaCheck test failed. Output:\n") diff --git a/test/files/scalacheck/parallel-collections/PairOperators.scala b/test/files/scalacheck/parallel-collections/PairOperators.scala new file mode 100644 index 0000000000..48cbd136e5 --- /dev/null +++ b/test/files/scalacheck/parallel-collections/PairOperators.scala @@ -0,0 +1,97 @@ +package scala.collection.parallel.ops + + +import scala.collection.parallel._ + + +trait PairOperators[K, V] extends Operators[(K, V)] { + def koperators: Operators[K] + def voperators: Operators[V] + + private def zipPredicates(kps: List[K => Boolean], vps: List[V => Boolean]): List[((K, V)) => Boolean] = for { + (kp, vp) <- koperators.countPredicates zip voperators.countPredicates + } yield new Function1[(K, V), Boolean] { + def apply(kv: (K, V)) = kp(kv._1) && vp(kv._2) + } + + /* operators */ + + def reduceOperators = for { + (kop, vop) <- koperators.reduceOperators zip voperators.reduceOperators + } yield new Function2[(K, V), (K, V), (K, V)] { + def apply(kv1: (K, V), kv2: (K, V)) = (kop(kv1._1, kv2._1), vop(kv1._2, kv2._2)) + } + + def countPredicates = zipPredicates(koperators.countPredicates, voperators.countPredicates) + + def forallPredicates = zipPredicates(koperators.forallPredicates, voperators.forallPredicates) + + def existsPredicates = zipPredicates(koperators.existsPredicates, voperators.existsPredicates) + + def findPredicates = zipPredicates(koperators.findPredicates, voperators.findPredicates) + + def mapFunctions = for { + (km, vm) <- koperators.mapFunctions zip voperators.mapFunctions + } yield new Function1[(K, V), (K, V)] { + def apply(kv: (K, V)) = (km(kv._1), vm(kv._2)) + } + + def partialMapFunctions = for { + (kpm, vpm) <- koperators.partialMapFunctions zip voperators.partialMapFunctions + } yield new PartialFunction[(K, V), (K, V)] { + def isDefinedAt(kv: (K, V)) = kpm.isDefinedAt(kv._1) && vpm.isDefinedAt(kv._2) + def apply(kv: (K, V)) = (kpm(kv._1), vpm(kv._2)) + } + + def flatMapFunctions = for { + (kfm, vfm) <- koperators.flatMapFunctions zip voperators.flatMapFunctions + } yield new Function1[(K, V), Traversable[(K, V)]] { + def apply(kv: (K, V)) = kfm(kv._1).toIterable zip vfm(kv._2).toIterable + } + + def filterPredicates = zipPredicates(koperators.filterPredicates, voperators.existsPredicates) + + def filterNotPredicates = filterPredicates + + def partitionPredicates = filterPredicates + + def takeWhilePredicates = zipPredicates(koperators.takeWhilePredicates, voperators.takeWhilePredicates) + + def dropWhilePredicates = takeWhilePredicates + + def spanPredicates = takeWhilePredicates + + def foldArguments = for { + ((kinit, kop), (vinit, vop)) <- koperators.foldArguments zip voperators.foldArguments + } yield ((kinit, vinit), new Function2[(K, V), (K, V), (K, V)] { + def apply(kv1: (K, V), kv2: (K, V)) = (kop(kv1._1, kv2._1), vop(kv1._2, kv2._2)) + }) + + def addAllTraversables = for { + (kt, vt) <- koperators.addAllTraversables zip voperators.addAllTraversables + } yield kt.toIterable zip vt.toIterable + + def newArray(sz: Int) = new Array[(K, V)](sz) + +} + + + + + + + + + + + + + + + + + + + + + diff --git a/test/files/scalacheck/parallel-collections/PairValues.scala b/test/files/scalacheck/parallel-collections/PairValues.scala new file mode 100644 index 0000000000..864dad2425 --- /dev/null +++ b/test/files/scalacheck/parallel-collections/PairValues.scala @@ -0,0 +1,28 @@ +package scala.collection.parallel.ops + + + + + +import org.scalacheck._ +import org.scalacheck.Gen +import org.scalacheck.Gen._ +import org.scalacheck.Prop._ +import org.scalacheck.Properties +import org.scalacheck.Arbitrary._ + + + + +trait PairValues[K, V] { + def kvalues: Seq[Gen[K]] + def vvalues: Seq[Gen[V]] + + def values = for { + kg <- kvalues + vg <- vvalues + } yield for { + k <- kg + v <- vg + } yield (k, v) +} diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala index 9169890e98..394dc6b370 100644 --- a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala @@ -14,7 +14,7 @@ import scala.collection._ import scala.collection.parallel.ops._ -abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParallelArray[" + tp + "]") { +abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParArray[" + tp + "]") { ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala new file mode 100644 index 0000000000..1224ec8d4d --- /dev/null +++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala @@ -0,0 +1,66 @@ +package scala.collection.parallel +package immutable + + + +import org.scalacheck._ +import org.scalacheck.Gen +import org.scalacheck.Gen._ +import org.scalacheck.Prop._ +import org.scalacheck.Properties +import org.scalacheck.Arbitrary._ + +import scala.collection._ +import scala.collection.parallel.ops._ + + +abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelIterableCheck[(K, V)]("immutable.ParHashMap[" + tp + "]") { + ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + + type CollType = ParHashMap[K, V] + + def isCheckingViews = false + + def instances(vals: Seq[Gen[(K, V)]]): Gen[Iterable[(K, V)]] = sized { sz => + var hm = new immutable.HashMap[K, V] + val gen = vals(rnd.nextInt(vals.size)) + for (i <- 0 until sz) hm += sample(gen) + hm + } + + def fromTraversable(t: Traversable[(K, V)]) = { + var phm = new ParHashMap[K, V] + var i = 0 + for (kv <- t.toList) { + phm += kv + i += 1 + } + phm + } + +} + + +object IntIntParallelHashMapCheck extends ParallelHashMapCheck[Int, Int]("Int, Int") +with PairOperators[Int, Int] +with PairValues[Int, Int] +{ + def intvalues = new IntValues {} + def kvalues = intvalues.values + def vvalues = intvalues.values + + val intoperators = new IntOperators {} + def voperators = intoperators + def koperators = intoperators +} + + + + + + + + + + diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala index fd323ef82c..bc08947af4 100644 --- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala @@ -95,8 +95,18 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col } property("mappings must be equal") = forAll(collectionPairs) { case (t, coll) => - val results = for ((f, ind) <- mapFunctions.zipWithIndex) - yield ("op index: " + ind) |: t.map(f) == coll.map(f) + val results = for ((f, ind) <- mapFunctions.zipWithIndex) yield { + val ms = t.map(f) + val mp = coll.map(f) + if (ms != mp) { + println(t) + println(coll) + println("mapped to: ") + println(ms) + println(mp) + } + ("op index: " + ind) |: ms == mp + } results.reduceLeft(_ && _) } @@ -107,7 +117,7 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col if (ps != pp) { println(t) println(coll) - println("partially mapped to: ") + println("collected to: ") println(ps) println(pp) } @@ -166,7 +176,6 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col println(tsl) println(collsl) println("as list: " + collsl.toList) - println(tsl.asInstanceOf[Seq[T]].sameElements(collsl)) println(collsl.iterator.hasNext) println(collsl.iterator.next) println(collsl.iterator.hasNext) diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala index f77c6db435..04b7168286 100644 --- a/test/files/scalacheck/parallel-collections/pc.scala +++ b/test/files/scalacheck/parallel-collections/pc.scala @@ -3,21 +3,46 @@ import org.scalacheck._ + import scala.collection.parallel._ class ParCollProperties extends Properties("Parallel collections") { + /* Collections */ + // parallel arrays //include(mutable.IntParallelArrayCheck) // parallel ranges //include(immutable.ParallelRangeCheck) + + // parallel immutable hash maps (tries) + include(immutable.IntIntParallelHashMapCheck) + + // parallel immutable hash sets (tries) + + // parallel mutable hash maps (tables) + + + /* Views */ + + // parallel array views + + // parallel immutable hash map views + + // parallel mutable hash map views } object Test { def main(args: Array[String]) { - val results = org.scalacheck.Test.checkProperties(new ParCollProperties) - if (!results.forall(_._2.passed)) println("Test results: " + results.mkString("\n")) + val pc = new ParCollProperties + org.scalacheck.Test.checkProperties( + org.scalacheck.Test.Params( + rng = new java.util.Random(5134L), + testCallback = new ConsoleReporter(0) + ), + pc + ) } } -- cgit v1.2.3