diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-12-21 17:41:31 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-12-21 17:41:31 +0000 |
commit | 003fc687839528bf99b44a415a038eb13ef8eae5 (patch) | |
tree | 597e57e8882449865bcde4c214188bb87c4709df /src | |
parent | cc1f960036c0f60caca6c8f862701dd37bac2f50 (diff) | |
download | scala-003fc687839528bf99b44a415a038eb13ef8eae5.tar.gz scala-003fc687839528bf99b44a415a038eb13ef8eae5.tar.bz2 scala-003fc687839528bf99b44a415a038eb13ef8eae5.zip |
Bencharking a larger program with parallel coll...
Bencharking a larger program with parallel collections.
Fixed a couple of bugs in parallel collections.
No review.
Diffstat (limited to 'src')
6 files changed, 107 insertions, 69 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index ef6120b370..2b2ebda2e2 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -262,7 +262,7 @@ self => protected def wrap[R](body: => R) = new NonDivisible[R] { def leaf(prevr: Option[R]) = result = body - var result: R = null.asInstanceOf[R] + @volatile var result: R = null.asInstanceOf[R] } /* convenience signalling operations wrapper */ @@ -281,6 +281,12 @@ self => } } + override def mkString(start: String, sep: String, end: String): String = seq.mkString(start, sep, end) + + override def mkString(sep: String): String = seq.mkString("", sep, "") + + override def mkString: String = seq.mkString("") + override def toString = seq.mkString(stringPrefix + "(", ", ", ")") /** Reduces the elements of this sequence using the specified associative binary operator. @@ -719,7 +725,7 @@ self => def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) def split = pit.split.map(newSubtask(_)) // default split procedure private[parallel] override def signalAbort = pit.abort - override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")" + override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")" } protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] { @@ -733,7 +739,7 @@ self => (val ft: First, val st: Second) extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] { def combineResults(fr: FR, sr: SR): R - var result: R = null.asInstanceOf[R] + @volatile var result: R = null.asInstanceOf[R] private[parallel] override def signalAbort { ft.signalAbort st.signalAbort @@ -770,7 +776,7 @@ self => protected[this] abstract class ResultMapping[R, Tp, R1](val inner: StrictSplitterCheckTask[R, Tp]) extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] { - var result: R1 = null.asInstanceOf[R1] + @volatile var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 def leaf(prevr: Option[R1]) = { result = map(executeAndWaitResult(inner)) @@ -784,14 +790,14 @@ self => protected trait Transformer[R, Tp] extends Accessor[R, Tp] protected[this] class Foreach[S](op: T => S, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Unit, Foreach[S]] { - var result: Unit = () + @volatile var result: Unit = () def leaf(prevr: Option[Unit]) = pit.foreach(op) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Foreach[S](op, p) } protected[this] class Count(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Int, Count] { // val pittxt = pit.toString - var result: Int = 0 + @volatile var result: Int = 0 def leaf(prevr: Option[Int]) = result = pit.count(pred) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Count(pred, p) override def merge(that: Count) = result = result + that.result @@ -799,7 +805,7 @@ self => } protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] { - var result: Option[U] = None + @volatile var result: Option[U] = None def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p) override def merge(that: Reduce[U]) = @@ -809,7 +815,7 @@ self => } protected[this] class Fold[U >: T](z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Fold[U]] { - var result: U = null.asInstanceOf[U] + @volatile var result: U = null.asInstanceOf[U] def leaf(prevr: Option[U]) = result = pit.fold(z)(op) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Fold(z, op, p) override def merge(that: Fold[U]) = result = op(result, that.result) @@ -817,28 +823,28 @@ self => protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: ParIterableIterator[T]) extends Accessor[S, Aggregate[S]] { - var result: S = null.asInstanceOf[S] + @volatile var result: S = null.asInstanceOf[S] def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Aggregate(z, seqop, combop, p) override def merge(that: Aggregate[S]) = result = combop(result, that.result) } protected[this] class Sum[U >: T](num: Numeric[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Sum[U]] { - var result: U = null.asInstanceOf[U] + @volatile var result: U = null.asInstanceOf[U] def leaf(prevr: Option[U]) = result = pit.sum(num) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Sum(num, p) override def merge(that: Sum[U]) = result = num.plus(result, that.result) } protected[this] class Product[U >: T](num: Numeric[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Product[U]] { - var result: U = null.asInstanceOf[U] + @volatile var result: U = null.asInstanceOf[U] def leaf(prevr: Option[U]) = result = pit.product(num) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Product(num, p) override def merge(that: Product[U]) = result = num.times(result, that.result) } protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Min[U]] { - var result: Option[U] = None + @volatile var result: Option[U] = None def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.min(ord)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Min(ord, p) override def merge(that: Min[U]) = @@ -848,7 +854,7 @@ self => } protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Max[U]] { - var result: Option[U] = None + @volatile var result: Option[U] = None def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.max(ord)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Max(ord, p) override def merge(that: Max[U]) = @@ -859,7 +865,7 @@ self => protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[S, That], Map[S, That]] { - var result: Combiner[S, That] = null + @volatile var result: Combiner[S, That] = null def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr))) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Map(f, pbf, p) override def merge(that: Map[S, That]) = result = result combine that.result @@ -868,7 +874,7 @@ self => protected[this] class Collect[S, That] (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[S, That], Collect[S, That]] { - var result: Combiner[S, That] = null + @volatile var result: Combiner[S, That] = null def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf(self.repr)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Collect(pf, pbf, p) override def merge(that: Collect[S, That]) = result = result combine that.result @@ -876,28 +882,32 @@ self => protected[this] class FlatMap[S, That](f: T => TraversableOnce[S], pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[S, That], FlatMap[S, That]] { - var result: Combiner[S, That] = null + @volatile var result: Combiner[S, That] = null def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new FlatMap(f, pbf, p) - override def merge(that: FlatMap[S, That]) = result = result combine that.result + override def merge(that: FlatMap[S, That]) = { + debuglog("merging " + result + " and " + that.result) + result = result combine that.result + debuglog("merged into " + result) + } } protected[this] class Forall(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Boolean, Forall] { - var result: Boolean = true + @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort } protected[this] def newSubtask(p: ParIterableIterator[T]) = new Forall(pred, p) override def merge(that: Forall) = result = result && that.result } protected[this] class Exists(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Boolean, Exists] { - var result: Boolean = false + @volatile var result: Boolean = false def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort } protected[this] def newSubtask(p: ParIterableIterator[T]) = new Exists(pred, p) override def merge(that: Exists) = result = result || that.result } protected[this] class Find[U >: T](pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Find[U]] { - var result: Option[U] = None + @volatile var result: Option[U] = None def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort } protected[this] def newSubtask(p: ParIterableIterator[T]) = new Find(pred, p) override def merge(that: Find[U]) = if (this.result == None) result = that.result @@ -905,7 +915,7 @@ self => protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Filter[U, This]] { - var result: Combiner[U, This] = null + @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = { result = pit.filter2combiner(pred, reuse(prev, cbf())) } @@ -915,7 +925,7 @@ self => protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], FilterNot[U, This]] { - var result: Combiner[U, This] = null + @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = { result = pit.filterNot2combiner(pred, reuse(prev, cbf())) } @@ -925,7 +935,7 @@ self => protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, That], Copy[U, That]] { - var result: Combiner[U, That] = null + @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory())) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Copy[U, That](cfactory, p) override def merge(that: Copy[U, That]) = result = result combine that.result @@ -933,7 +943,7 @@ self => protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] { - var result: (Combiner[U, This], Combiner[U, This]) = null + @volatile var result: (Combiner[U, This], Combiner[U, This]) = null def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Partition(pred, cbf, p) override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) @@ -941,7 +951,7 @@ self => protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Take[U, This]] { - var result: Combiner[U, This] = null + @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = { result = pit.take2combiner(n, reuse(prev, cbf())) } @@ -960,7 +970,7 @@ self => protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Drop[U, This]] { - var result: Combiner[U, This] = null + @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf())) protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException override def split = { @@ -977,7 +987,7 @@ self => protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Slice[U, This]] { - var result: Combiner[U, This] = null + @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf())) protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException override def split = { @@ -995,7 +1005,7 @@ self => protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] { - var result: (Combiner[U, This], Combiner[U, This]) = null + @volatile var result: (Combiner[U, This], Combiner[U, This]) = null def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException override def split = { @@ -1010,7 +1020,7 @@ self => protected[this] class TakeWhile[U >: T, This >: Repr] (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] { - var result: (Combiner[U, This], Boolean) = null + @volatile var result: (Combiner[U, This], Boolean) = null def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) { result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf())) if (!result._2) pit.setIndexFlagIfLesser(pos) @@ -1029,7 +1039,7 @@ self => protected[this] class Span[U >: T, This >: Repr] (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] { - var result: (Combiner[U, This], Combiner[U, This]) = null + @volatile var result: (Combiner[U, This], Combiner[U, This]) = null def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) { // val lst = pit.toList // val pa = mutable.ParArray(lst: _*) @@ -1055,7 +1065,7 @@ self => protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { - var result: Result = null + @volatile var result: Result = null def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf(self.repr)) protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported override def split = { @@ -1071,7 +1081,7 @@ self => protected[this] class ZipAll[U >: T, S, That] (len: Int, thiselem: U, thatelem: S, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S]) extends Transformer[Combiner[(U, S), That], ZipAll[U, S, That]] { - var result: Result = null + @volatile var result: Result = null def leaf(prev: Option[Result]) = result = pit.zipAll2combiner[U, S, That](othpit, thiselem, thatelem, pbf(self.repr)) protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported override def split = if (pit.remaining <= len) { @@ -1093,7 +1103,7 @@ self => protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Unit, CopyToArray[U, This]] { - var result: Unit = () + @volatile var result: Unit = () def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len) protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported override def split = { @@ -1108,7 +1118,7 @@ self => protected[this] class ToParCollection[U >: T, That](cbf: () => Combiner[U, That], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, That], ToParCollection[U, That]] { - var result: Result = null + @volatile var result: Result = null def leaf(prev: Option[Combiner[U, That]]) { result = cbf() while (pit.hasNext) result += pit.next @@ -1119,7 +1129,7 @@ self => protected[this] class ToParMap[K, V, That](cbf: () => Combiner[(K, V), That], protected[this] val pit: ParIterableIterator[T])(implicit ev: T <:< (K, V)) extends Transformer[Combiner[(K, V), That], ToParMap[K, V, That]] { - var result: Result = null + @volatile var result: Result = null def leaf(prev: Option[Combiner[(K, V), That]]) { result = cbf() while (pit.hasNext) result += pit.next @@ -1130,7 +1140,7 @@ self => protected[this] class CreateScanTree[U >: T](from: Int, len: Int, z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Transformer[ScanTree[U], CreateScanTree[U]] { - var result: ScanTree[U] = null + @volatile var result: ScanTree[U] = null def leaf(prev: Option[ScanTree[U]]) = if (pit.remaining > 0) { val trees = ArrayBuffer[ScanTree[U]]() var i = from @@ -1168,7 +1178,7 @@ self => protected[this] class FromScanTree[U >: T, That] (tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CanCombineFrom[Repr, U, That]) extends StrictSplitterCheckTask[Combiner[U, That], FromScanTree[U, That]] { - var result: Combiner[U, That] = null + @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) { val cb = reuse(prev, cbf(self.repr)) iterate(tree, cb) @@ -1247,8 +1257,9 @@ self => private[parallel] def brokenInvariants = Seq[String]() - // private val dbbuff = ArrayBuffer[String]() - def debugBuffer: ArrayBuffer[String] = null // dbbuff + private val dbbuff = ArrayBuffer[String]() + def debugBuffer: ArrayBuffer[String] = dbbuff + // def debugBuffer: ArrayBuffer[String] = null private[parallel] def debugclear() = synchronized { debugBuffer.clear diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 58e8bcd031..063a8cab7d 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -327,7 +327,7 @@ self => protected[this] class SegmentLength(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T]) extends Accessor[(Int, Boolean), SegmentLength] { - var result: (Int, Boolean) = null + @volatile var result: (Int, Boolean) = null def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) { val itsize = pit.remaining val seglen = pit.prefixLength(pred) @@ -345,7 +345,7 @@ self => protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T]) extends Accessor[Int, IndexWhere] { - var result: Int = -1 + @volatile var result: Int = -1 def leaf(prev: Option[Int]) = if (from < pit.indexFlag) { val r = pit.indexWhere(pred) if (r != -1) { @@ -366,7 +366,7 @@ self => protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: ParSeqIterator[T]) extends Accessor[Int, LastIndexWhere] { - var result: Int = -1 + @volatile var result: Int = -1 def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) { val r = pit.lastIndexWhere(pred) if (r != -1) { @@ -387,7 +387,7 @@ self => protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: ParSeqIterator[T]) extends Transformer[Combiner[U, This], Reverse[U, This]] { - var result: Combiner[U, This] = null + @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf())) protected[this] def newSubtask(p: SuperParIterator) = new Reverse(cbf, down(p)) override def merge(that: Reverse[U, This]) = result = that.result combine result @@ -395,7 +395,7 @@ self => protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParSeqIterator[T]) extends Transformer[Combiner[S, That], ReverseMap[S, That]] { - var result: Combiner[S, That] = null + @volatile var result: Combiner[S, That] = null def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf(self.repr)) protected[this] def newSubtask(p: SuperParIterator) = new ReverseMap(f, pbf, down(p)) override def merge(that: ReverseMap[S, That]) = result = that.result combine result @@ -403,7 +403,7 @@ self => protected[this] class SameElements[U >: T](protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[U]) extends Accessor[Boolean, SameElements[U]] { - var result: Boolean = true + @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { result = pit.sameElements(otherpit) if (!result) pit.abort @@ -420,7 +420,7 @@ self => protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: ParSeqIterator[T]) extends Transformer[Combiner[U, That], Updated[U, That]] { - var result: Combiner[U, That] = null + @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf(self.repr)) protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { @@ -433,7 +433,7 @@ self => protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { - var result: Result = null + @volatile var result: Result = null def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr)) protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { @@ -451,7 +451,7 @@ self => protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S]) extends Accessor[Boolean, Corresponds[S]] { - var result: Boolean = true + @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { result = pit.corresponds(corr)(otherpit) if (!result) pit.abort diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 662a600d42..e88d5dd0e0 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -30,48 +30,70 @@ trait Tasks { type Result = R def repr = this.asInstanceOf[Tp] + /** Body of the task - non-divisible unit of work done by this task. * Optionally is provided with the result from the previous completed task * or `None` if there was no previous task (or the previous task is uncompleted or unknown). */ def leaf(result: Option[R]) + /** A result that can be accessed once the task is completed. */ - var result: R + @volatile var result: R + /** Decides whether or not this task should be split further. */ def shouldSplitFurther: Boolean + /** Splits this task into a list of smaller tasks. */ private[parallel] def split: Seq[Task[R, Tp]] + /** Read of results of `that` task and merge them into results of this one. */ private[parallel] def merge(that: Tp @uncheckedVariance) {} // exception handling mechanism - var throwable: Throwable = null + @volatile var throwable: Throwable = null def forwardThrowable = if (throwable != null) throw throwable + // tries to do the leaf computation, storing the possible exception - private[parallel] def tryLeaf(result: Option[R]) { + private[parallel] def tryLeaf(lastres: Option[R]) { try { tryBreakable { - leaf(result) + leaf(lastres) + result = result // ensure that effects of `leaf` are visible to readers of `result` } catchBreak { signalAbort } } catch { - case thr: Throwable => + case thr: Exception => + result = result // ensure that effects of `leaf` are visible throwable = thr signalAbort } } + private[parallel] def tryMerge(t: Tp @uncheckedVariance) { val that = t.asInstanceOf[Task[R, Tp]] + val local = result // ensure that any effects of modifying `result` are detected + // checkMerge(that) if (this.throwable == null && that.throwable == null) merge(t) mergeThrowables(that) } + + private def checkMerge(that: Task[R, Tp] @uncheckedVariance) { + if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) { + println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable) + } else if (this.throwable != null || that.throwable != null) { + println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable) + } + } + private[parallel] def mergeThrowables(that: Task[_, _]) { if (this.throwable != null && that.throwable != null) { // merge exceptions, since there were multiple exceptions this.throwable = this.throwable alongWith that.throwable } else if (that.throwable != null) this.throwable = that.throwable + else this.throwable = this.throwable } + // override in concrete task implementations to signal abort to other tasks private[parallel] def signalAbort {} } @@ -128,8 +150,8 @@ trait Tasks { trait AdaptiveWorkStealingTasks extends Tasks { trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] { - var next: TaskImpl[R, Tp] = null - var shouldWaitFor = true + @volatile var next: TaskImpl[R, Tp] = null + @volatile var shouldWaitFor = true def split: Seq[TaskImpl[R, Tp]] @@ -140,20 +162,21 @@ trait AdaptiveWorkStealingTasks extends Tasks { last.body.tryLeaf(None) body.result = last.body.result + body.throwable = last.body.throwable while (last.next != null) { // val lastresult = Option(last.body.result) val beforelast = last last = last.next if (last.tryCancel) { - // debuglog("Done with " + beforelast.body + ", next direct is " + last.body) + // println("Done with " + beforelast.body + ", next direct is " + last.body) last.body.tryLeaf(Some(body.result)) last.release } else { - // debuglog("Done with " + beforelast.body + ", next sync is " + last.body) + // println("Done with " + beforelast.body + ", next sync is " + last.body) last.sync } - // debuglog("Merging " + body + " with " + last.body) + // println("Merging " + body + " with " + last.body) body.tryMerge(last.body.repr) } } @@ -191,14 +214,6 @@ trait AdaptiveWorkStealingTasks extends Tasks { } -/** - * A trait describing objects that provide a fork/join pool. - */ -trait HavingForkJoinPool { - def forkJoinPool: ForkJoinPool -} - - trait ThreadPoolTasks extends Tasks { import java.util.concurrent._ @@ -324,6 +339,13 @@ object ThreadPoolTasks { } +/** + * A trait describing objects that provide a fork/join pool. + */ +trait HavingForkJoinPool { + def forkJoinPool: ForkJoinPool +} + /** An implementation trait for parallel tasks based on the fork/join framework. * @@ -384,6 +406,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { } fjtask.sync + // if (fjtask.body.throwable != null) println("throwing: " + fjtask.body.throwable + " at " + fjtask.body) fjtask.body.forwardThrowable fjtask.body.result } diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 13b7670865..812a2ed94d 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -192,7 +192,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => class CreateTrie(bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends Task[Unit, CreateTrie] { - var result = () + @volatile var result = () def leaf(prev: Option[Unit]) = { var i = offset val until = offset + howmany @@ -200,6 +200,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => root(i) = createTrie(bucks(i)) i += 1 } + result = result } private def createTrie(elems: Unrolled[(K, V)]): HashMap[K, V] = { var trie = new HashMap[K, V] diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala index ac031605fb..b1be7ffab5 100644 --- a/src/library/scala/collection/parallel/immutable/ParRange.scala +++ b/src/library/scala/collection/parallel/immutable/ParRange.scala @@ -3,7 +3,6 @@ package scala.collection.parallel.immutable import scala.collection.immutable.Range -import scala.collection.immutable.RangeUtils import scala.collection.parallel.ParSeq import scala.collection.parallel.Combiner import scala.collection.generic.CanCombineFrom @@ -28,6 +27,10 @@ self => type SCPI = SignalContextPassingIterator[ParRangeIterator] + override def toParSeq = this // TODO remove when we have ParSeq, when ParVector is in place + + override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U]) // TODO remove when we have ParSeq, when ParVector is in place + class ParRangeIterator(range: Range = self.range) extends ParIterator { me: SignalContextPassingIterator[ParRangeIterator] => diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 5faf73c1db..cf342b0203 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -40,7 +40,7 @@ package object parallel { if (util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport else new ThreadPoolTaskSupport - private[parallel] val tasksupport = getTaskSupport + val tasksupport = getTaskSupport /* implicit conversions */ |