diff options
author | Aleksandar Prokopec <axel22@gmail.com> | 2012-02-01 18:24:50 +0100 |
---|---|---|
committer | Aleksandar Prokopec <axel22@gmail.com> | 2012-02-01 18:24:50 +0100 |
commit | 8aa87f15e3887dbeb1a39bfea002b56cf68c445a (patch) | |
tree | eb76514b57160e1b7cbd2df0e009cbcdc2a5f922 /src/library/scala/collection/parallel/ParSeqLike.scala | |
parent | fe289dc0fd8172012e4d57d09658e2dfd0a4cdcf (diff) | |
download | scala-8aa87f15e3887dbeb1a39bfea002b56cf68c445a.tar.gz scala-8aa87f15e3887dbeb1a39bfea002b56cf68c445a.tar.bz2 scala-8aa87f15e3887dbeb1a39bfea002b56cf68c445a.zip |
Remove ParIterator and SignalContextPassingIterator.
This unclutters the namespace and makes defining custom parallel
collections a lot easier.
Diffstat (limited to 'src/library/scala/collection/parallel/ParSeqLike.scala')
-rw-r--r-- | src/library/scala/collection/parallel/ParSeqLike.scala | 71 |
1 files changed, 20 insertions, 51 deletions
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 22c587b498..6a5ee5c69b 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -48,35 +48,6 @@ self => type SuperParIterator = IterableSplitter[T] - /** An iterator that can be split into arbitrary subsets of iterators. - * The self-type requirement ensures that the signal context passing behaviour gets mixed in - * the concrete iterator instance in some concrete collection. - * - * '''Note:''' In concrete collection classes, collection implementers might want to override the iterator - * `reverse2builder` method to ensure higher efficiency. - */ - trait ParIterator extends SeqSplitter[T] with super.ParIterator { - me: SignalContextPassingIterator[ParIterator] => - def split: Seq[ParIterator] - def psplit(sizes: Int*): Seq[ParIterator] - } - - /** A stackable modification that ensures signal contexts get passed along the iterators. - * A self-type requirement in `ParIterator` ensures that this trait gets mixed into - * concrete iterators. - */ - trait SignalContextPassingIterator[+IterRepr <: ParIterator] - extends ParIterator with super.SignalContextPassingIterator[IterRepr] { - // Note: See explanation in `ParallelIterableLike.this.SignalContextPassingIterator` - // to understand why we do the cast here, and have a type parameter. - // Bottomline: avoiding boilerplate and fighting against inability to override stackable modifications. - abstract override def psplit(sizes: Int*): Seq[IterRepr] = { - val pits = super.psplit(sizes: _*) - pits foreach { _.signalDelegate = signalDelegate } - pits.asInstanceOf[Seq[IterRepr]] - } - } - /** A more refined version of the iterator found in the `ParallelIterable` trait, * this iterator can be split into arbitrary subsets of iterators. * @@ -89,9 +60,7 @@ self => override def size = length /** Used to iterate elements using indices */ - protected abstract class Elements(start: Int, val end: Int) extends ParIterator with BufferedIterator[T] { - me: SignalContextPassingIterator[ParIterator] => - + protected abstract class Elements(start: Int, val end: Int) extends SeqSplitter[T] with BufferedIterator[T] { private var i = start def hasNext = i < end @@ -106,14 +75,14 @@ self => final def remaining = end - i - def dup = new Elements(i, end) with SignalContextPassingIterator[ParIterator] + def dup = new Elements(i, end) {} def split = psplit(remaining / 2, remaining - remaining / 2) def psplit(sizes: Int*) = { val incr = sizes.scanLeft(0)(_ + _) for ((from, until) <- incr.init zip incr.tail) yield { - new Elements(start + from, (start + until) min end) with SignalContextPassingIterator[ParIterator] + new Elements(start + from, (start + until) min end) {} } } @@ -138,7 +107,7 @@ self => val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new SegmentLength(p, 0, splitter.psplit(realfrom, length - realfrom)(1) assign ctx))._1 + executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1 } /** Finds the first element satisfying some predicate. @@ -156,7 +125,7 @@ self => val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplit(realfrom, length - realfrom)(1) assign ctx)) + executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx)) } /** Finds the last element satisfying some predicate. @@ -174,7 +143,7 @@ self => val until = if (end >= length) length else end + 1 val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MinValue) - executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplit(until, length - until)(0) assign ctx)) + executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx)) } def reverse: Repr = { @@ -203,7 +172,7 @@ self => else if (pthat.length > length - offset) false else { val ctx = new DefaultSignalling with VolatileAbort - executeAndWaitResult(new SameElements(splitter.psplit(offset, pthat.length)(1) assign ctx, pthat.splitter)) + executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter)) } } otherwise seq.startsWith(that, offset) @@ -226,7 +195,7 @@ self => else { val ctx = new DefaultSignalling with VolatileAbort val tlen = that.length - executeAndWaitResult(new SameElements(splitter.psplit(length - tlen, tlen)(1) assign ctx, pthat.splitter)) + executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter)) } } otherwise seq.endsWith(that) @@ -235,7 +204,7 @@ self => if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) { val that = patch.asParSeq val pbf = bf.asParallel - val pits = splitter.psplit(from, replaced, length - from - realreplaced) + val pits = splitter.psplitWithSignalling(from, replaced, length - from - realreplaced) val cfactory = combinerFactory(() => pbf(repr)) val copystart = new Copy[U, That](cfactory, pits(0)) val copymiddle = wrap { @@ -253,7 +222,7 @@ self => val from = 0 max fromarg val b = bf(repr) val repl = (r min (length - from)) max 0 - val pits = splitter.psplit(from, repl, length - from - repl) + val pits = splitter.psplitWithSignalling(from, repl, length - from - repl) b ++= pits(0) b ++= patch b ++= pits(2) @@ -373,7 +342,7 @@ self => } else result = (0, false) protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException override def split = { - val pits = pit.split + val pits = pit.splitWithSignalling for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p) } override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2) @@ -392,7 +361,7 @@ self => } protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { - val pits = pit.split + val pits = pit.splitWithSignalling for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p) } override def merge(that: IndexWhere) = result = if (result == -1) that.result else { @@ -413,7 +382,7 @@ self => } protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { - val pits = pit.split + val pits = pit.splitWithSignalling for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p) } override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else { @@ -438,7 +407,7 @@ self => override def merge(that: ReverseMap[S, That]) = result = that.result combine result } - protected[this] class SameElements[U >: T](protected[this] val pit: SeqSplitter[T], val otherpit: PreciseSplitter[U]) + protected[this] class SameElements[U >: T](protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[U]) extends Accessor[Boolean, SameElements[U]] { @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { @@ -449,7 +418,7 @@ self => override def split = { val fp = pit.remaining / 2 val sp = pit.remaining - fp - for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op) + for ((p, op) <- pit.psplitWithSignalling(fp, sp) zip otherpit.psplitWithSignalling(fp, sp)) yield new SameElements(p, op) } override def merge(that: SameElements[U]) = result = result && that.result override def requiresStrictSplitters = true @@ -461,7 +430,7 @@ self => def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf()) protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { - val pits = pit.split + val pits = pit.splitWithSignalling for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p) } override def merge(that: Updated[U, That]) = result = result combine that.result @@ -476,8 +445,8 @@ self => override def split = { val fp = len / 2 val sp = len - len / 2 - val pits = pit.psplit(fp, sp) - val opits = otherpit.psplit(fp, sp) + val pits = pit.psplitWithSignalling(fp, sp) + val opits = otherpit.psplitWithSignalling(fp, sp) Seq( new Zip(fp, pbf, pits(0), opits(0)), new Zip(sp, pbf, pits(1), opits(1)) @@ -486,7 +455,7 @@ self => override def merge(that: Zip[U, S, That]) = result = result combine that.result } - protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: SeqSplitter[T], val otherpit: PreciseSplitter[S]) + protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S]) extends Accessor[Boolean, Corresponds[S]] { @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { @@ -497,7 +466,7 @@ self => override def split = { val fp = pit.remaining / 2 val sp = pit.remaining - fp - for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op) + for ((p, op) <- pit.psplitWithSignalling(fp, sp) zip otherpit.psplitWithSignalling(fp, sp)) yield new Corresponds(corr, p, op) } override def merge(that: Corresponds[S]) = result = result && that.result override def requiresStrictSplitters = true |