diff options
author | Aleksandar Prokopec <axel22@gmail.com> | 2012-02-23 16:34:34 +0100 |
---|---|---|
committer | Aleksandar Prokopec <axel22@gmail.com> | 2012-02-23 16:34:34 +0100 |
commit | 1852a7ddf7f8c5fb4a85e64b73123d333e698932 (patch) | |
tree | f9b5f405c56544b37f51c9f99b8dab1882fe2d58 /src/library/scala/collection/parallel/immutable | |
parent | 4a984f82d5bfca05123c53bd385d0299818f8a75 (diff) | |
download | scala-1852a7ddf7f8c5fb4a85e64b73123d333e698932.tar.gz scala-1852a7ddf7f8c5fb4a85e64b73123d333e698932.tar.bz2 scala-1852a7ddf7f8c5fb4a85e64b73123d333e698932.zip |
Add tasksupport as a configurable field in parallel collections.
This required a bit of refactoring in the tasks objects and implementations
of various operations. Combiners now hold a reference to a tasksupport
object and pass it on to their result if `resultWithTaskSupport` is called.
Additionally, several bugs that have to do with CanBuildFrom and combiner
resolution have been fixed.
Diffstat (limited to 'src/library/scala/collection/parallel/immutable')
-rw-r--r-- | src/library/scala/collection/parallel/immutable/ParHashMap.scala | 14 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/immutable/ParHashSet.scala | 10 |
2 files changed, 16 insertions, 8 deletions
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 7adf51cffb..266b179401 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -8,6 +8,8 @@ package scala.collection.parallel.immutable + + import scala.collection.parallel.ParMapLike import scala.collection.parallel.Combiner import scala.collection.parallel.IterableSplitter @@ -19,6 +21,9 @@ import scala.collection.generic.GenericParMapTemplate import scala.collection.generic.GenericParMapCompanion import scala.collection.immutable.{ HashMap, TrieIterator } import annotation.unchecked.uncheckedVariance +import collection.parallel.Task + + /** Immutable parallel hash map, based on hash tries. * @@ -153,7 +158,6 @@ private[parallel] abstract class HashMapCombiner[K, V] extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) { //self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => import HashMapCombiner._ - import collection.parallel.tasksupport._ val emptyTrie = HashMap.empty[K, V] def +=(elem: (K, V)) = { @@ -173,7 +177,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val bucks = buckets.filter(_ != null).map(_.headPtr) val root = new Array[HashMap[K, V]](bucks.length) - executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) + combinerTaskSupport.executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 @@ -195,7 +199,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val bucks = buckets.filter(_ != null).map(_.headPtr) val root = new Array[HashMap[K, AnyRef]](bucks.length) - executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length)) + combinerTaskSupport.executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 @@ -256,7 +260,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val fp = howmany / 2 List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel) } class CreateGroupedTrie[Repr](cbf: () => Combiner[V, Repr], bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, AnyRef]], offset: Int, howmany: Int) @@ -321,7 +325,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val fp = howmany / 2 List(new CreateGroupedTrie(cbf, bucks, root, offset, fp), new CreateGroupedTrie(cbf, bucks, root, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel) } } diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index 1cf0ccd391..0d7f04976e 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -8,6 +8,8 @@ package scala.collection.parallel.immutable + + import scala.collection.parallel.ParSetLike import scala.collection.parallel.Combiner import scala.collection.parallel.IterableSplitter @@ -19,6 +21,9 @@ import scala.collection.generic.GenericParTemplate import scala.collection.generic.GenericParCompanion import scala.collection.generic.GenericCompanion import scala.collection.immutable.{ HashSet, TrieIterator } +import collection.parallel.Task + + /** Immutable parallel hash set, based on hash tries. * @@ -127,7 +132,6 @@ private[immutable] abstract class HashSetCombiner[T] extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombiner[T]](HashSetCombiner.rootsize) { //self: EnvironmentPassingCombiner[T, ParHashSet[T]] => import HashSetCombiner._ - import collection.parallel.tasksupport._ val emptyTrie = HashSet.empty[T] def +=(elem: T) = { @@ -147,7 +151,7 @@ extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombine val bucks = buckets.filter(_ != null).map(_.headPtr) val root = new Array[HashSet[T]](bucks.length) - executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) + combinerTaskSupport.executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 @@ -202,7 +206,7 @@ extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombine val fp = howmany / 2 List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel) } } |