summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-21 17:41:31 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-21 17:41:31 +0000
commit003fc687839528bf99b44a415a038eb13ef8eae5 (patch)
tree597e57e8882449865bcde4c214188bb87c4709df /src/library
parentcc1f960036c0f60caca6c8f862701dd37bac2f50 (diff)
downloadscala-003fc687839528bf99b44a415a038eb13ef8eae5.tar.gz
scala-003fc687839528bf99b44a415a038eb13ef8eae5.tar.bz2
scala-003fc687839528bf99b44a415a038eb13ef8eae5.zip
Bencharking a larger program with parallel coll...
Bencharking a larger program with parallel collections. Fixed a couple of bugs in parallel collections. No review.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala89
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala18
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala59
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala3
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala5
-rw-r--r--src/library/scala/collection/parallel/package.scala2
6 files changed, 107 insertions, 69 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index ef6120b370..2b2ebda2e2 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -262,7 +262,7 @@ self =>
protected def wrap[R](body: => R) = new NonDivisible[R] {
def leaf(prevr: Option[R]) = result = body
- var result: R = null.asInstanceOf[R]
+ @volatile var result: R = null.asInstanceOf[R]
}
/* convenience signalling operations wrapper */
@@ -281,6 +281,12 @@ self =>
}
}
+ override def mkString(start: String, sep: String, end: String): String = seq.mkString(start, sep, end)
+
+ override def mkString(sep: String): String = seq.mkString("", sep, "")
+
+ override def mkString: String = seq.mkString("")
+
override def toString = seq.mkString(stringPrefix + "(", ", ", ")")
/** Reduces the elements of this sequence using the specified associative binary operator.
@@ -719,7 +725,7 @@ self =>
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
def split = pit.split.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
- override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")"
+ override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")"
}
protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] {
@@ -733,7 +739,7 @@ self =>
(val ft: First, val st: Second)
extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] {
def combineResults(fr: FR, sr: SR): R
- var result: R = null.asInstanceOf[R]
+ @volatile var result: R = null.asInstanceOf[R]
private[parallel] override def signalAbort {
ft.signalAbort
st.signalAbort
@@ -770,7 +776,7 @@ self =>
protected[this] abstract class ResultMapping[R, Tp, R1](val inner: StrictSplitterCheckTask[R, Tp])
extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] {
- var result: R1 = null.asInstanceOf[R1]
+ @volatile var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
def leaf(prevr: Option[R1]) = {
result = map(executeAndWaitResult(inner))
@@ -784,14 +790,14 @@ self =>
protected trait Transformer[R, Tp] extends Accessor[R, Tp]
protected[this] class Foreach[S](op: T => S, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Unit, Foreach[S]] {
- var result: Unit = ()
+ @volatile var result: Unit = ()
def leaf(prevr: Option[Unit]) = pit.foreach(op)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Foreach[S](op, p)
}
protected[this] class Count(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Int, Count] {
// val pittxt = pit.toString
- var result: Int = 0
+ @volatile var result: Int = 0
def leaf(prevr: Option[Int]) = result = pit.count(pred)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Count(pred, p)
override def merge(that: Count) = result = result + that.result
@@ -799,7 +805,7 @@ self =>
}
protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] {
- var result: Option[U] = None
+ @volatile var result: Option[U] = None
def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p)
override def merge(that: Reduce[U]) =
@@ -809,7 +815,7 @@ self =>
}
protected[this] class Fold[U >: T](z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Fold[U]] {
- var result: U = null.asInstanceOf[U]
+ @volatile var result: U = null.asInstanceOf[U]
def leaf(prevr: Option[U]) = result = pit.fold(z)(op)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Fold(z, op, p)
override def merge(that: Fold[U]) = result = op(result, that.result)
@@ -817,28 +823,28 @@ self =>
protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: ParIterableIterator[T])
extends Accessor[S, Aggregate[S]] {
- var result: S = null.asInstanceOf[S]
+ @volatile var result: S = null.asInstanceOf[S]
def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Aggregate(z, seqop, combop, p)
override def merge(that: Aggregate[S]) = result = combop(result, that.result)
}
protected[this] class Sum[U >: T](num: Numeric[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Sum[U]] {
- var result: U = null.asInstanceOf[U]
+ @volatile var result: U = null.asInstanceOf[U]
def leaf(prevr: Option[U]) = result = pit.sum(num)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Sum(num, p)
override def merge(that: Sum[U]) = result = num.plus(result, that.result)
}
protected[this] class Product[U >: T](num: Numeric[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Product[U]] {
- var result: U = null.asInstanceOf[U]
+ @volatile var result: U = null.asInstanceOf[U]
def leaf(prevr: Option[U]) = result = pit.product(num)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Product(num, p)
override def merge(that: Product[U]) = result = num.times(result, that.result)
}
protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Min[U]] {
- var result: Option[U] = None
+ @volatile var result: Option[U] = None
def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.min(ord))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Min(ord, p)
override def merge(that: Min[U]) =
@@ -848,7 +854,7 @@ self =>
}
protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Max[U]] {
- var result: Option[U] = None
+ @volatile var result: Option[U] = None
def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.max(ord))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Max(ord, p)
override def merge(that: Max[U]) =
@@ -859,7 +865,7 @@ self =>
protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[S, That], Map[S, That]] {
- var result: Combiner[S, That] = null
+ @volatile var result: Combiner[S, That] = null
def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr)))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Map(f, pbf, p)
override def merge(that: Map[S, That]) = result = result combine that.result
@@ -868,7 +874,7 @@ self =>
protected[this] class Collect[S, That]
(pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[S, That], Collect[S, That]] {
- var result: Combiner[S, That] = null
+ @volatile var result: Combiner[S, That] = null
def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf(self.repr))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Collect(pf, pbf, p)
override def merge(that: Collect[S, That]) = result = result combine that.result
@@ -876,28 +882,32 @@ self =>
protected[this] class FlatMap[S, That](f: T => TraversableOnce[S], pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[S, That], FlatMap[S, That]] {
- var result: Combiner[S, That] = null
+ @volatile var result: Combiner[S, That] = null
def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new FlatMap(f, pbf, p)
- override def merge(that: FlatMap[S, That]) = result = result combine that.result
+ override def merge(that: FlatMap[S, That]) = {
+ debuglog("merging " + result + " and " + that.result)
+ result = result combine that.result
+ debuglog("merged into " + result)
+ }
}
protected[this] class Forall(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Boolean, Forall] {
- var result: Boolean = true
+ @volatile var result: Boolean = true
def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Forall(pred, p)
override def merge(that: Forall) = result = result && that.result
}
protected[this] class Exists(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Boolean, Exists] {
- var result: Boolean = false
+ @volatile var result: Boolean = false
def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Exists(pred, p)
override def merge(that: Exists) = result = result || that.result
}
protected[this] class Find[U >: T](pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Find[U]] {
- var result: Option[U] = None
+ @volatile var result: Option[U] = None
def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Find(pred, p)
override def merge(that: Find[U]) = if (this.result == None) result = that.result
@@ -905,7 +915,7 @@ self =>
protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Filter[U, This]] {
- var result: Combiner[U, This] = null
+ @volatile var result: Combiner[U, This] = null
def leaf(prev: Option[Combiner[U, This]]) = {
result = pit.filter2combiner(pred, reuse(prev, cbf()))
}
@@ -915,7 +925,7 @@ self =>
protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], FilterNot[U, This]] {
- var result: Combiner[U, This] = null
+ @volatile var result: Combiner[U, This] = null
def leaf(prev: Option[Combiner[U, This]]) = {
result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
}
@@ -925,7 +935,7 @@ self =>
protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, That], Copy[U, That]] {
- var result: Combiner[U, That] = null
+ @volatile var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory()))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Copy[U, That](cfactory, p)
override def merge(that: Copy[U, That]) = result = result combine that.result
@@ -933,7 +943,7 @@ self =>
protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] {
- var result: (Combiner[U, This], Combiner[U, This]) = null
+ @volatile var result: (Combiner[U, This], Combiner[U, This]) = null
def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Partition(pred, cbf, p)
override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
@@ -941,7 +951,7 @@ self =>
protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Take[U, This]] {
- var result: Combiner[U, This] = null
+ @volatile var result: Combiner[U, This] = null
def leaf(prev: Option[Combiner[U, This]]) = {
result = pit.take2combiner(n, reuse(prev, cbf()))
}
@@ -960,7 +970,7 @@ self =>
protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Drop[U, This]] {
- var result: Combiner[U, This] = null
+ @volatile var result: Combiner[U, This] = null
def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf()))
protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException
override def split = {
@@ -977,7 +987,7 @@ self =>
protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Slice[U, This]] {
- var result: Combiner[U, This] = null
+ @volatile var result: Combiner[U, This] = null
def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf()))
protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException
override def split = {
@@ -995,7 +1005,7 @@ self =>
protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] {
- var result: (Combiner[U, This], Combiner[U, This]) = null
+ @volatile var result: (Combiner[U, This], Combiner[U, This]) = null
def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException
override def split = {
@@ -1010,7 +1020,7 @@ self =>
protected[this] class TakeWhile[U >: T, This >: Repr]
(pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] {
- var result: (Combiner[U, This], Boolean) = null
+ @volatile var result: (Combiner[U, This], Boolean) = null
def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) {
result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf()))
if (!result._2) pit.setIndexFlagIfLesser(pos)
@@ -1029,7 +1039,7 @@ self =>
protected[this] class Span[U >: T, This >: Repr]
(pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] {
- var result: (Combiner[U, This], Combiner[U, This]) = null
+ @volatile var result: (Combiner[U, This], Combiner[U, This]) = null
def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) {
// val lst = pit.toList
// val pa = mutable.ParArray(lst: _*)
@@ -1055,7 +1065,7 @@ self =>
protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S])
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
- var result: Result = null
+ @volatile var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf(self.repr))
protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported
override def split = {
@@ -1071,7 +1081,7 @@ self =>
protected[this] class ZipAll[U >: T, S, That]
(len: Int, thiselem: U, thatelem: S, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S])
extends Transformer[Combiner[(U, S), That], ZipAll[U, S, That]] {
- var result: Result = null
+ @volatile var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zipAll2combiner[U, S, That](othpit, thiselem, thatelem, pbf(self.repr))
protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported
override def split = if (pit.remaining <= len) {
@@ -1093,7 +1103,7 @@ self =>
protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T])
extends Accessor[Unit, CopyToArray[U, This]] {
- var result: Unit = ()
+ @volatile var result: Unit = ()
def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len)
protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported
override def split = {
@@ -1108,7 +1118,7 @@ self =>
protected[this] class ToParCollection[U >: T, That](cbf: () => Combiner[U, That], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, That], ToParCollection[U, That]] {
- var result: Result = null
+ @volatile var result: Result = null
def leaf(prev: Option[Combiner[U, That]]) {
result = cbf()
while (pit.hasNext) result += pit.next
@@ -1119,7 +1129,7 @@ self =>
protected[this] class ToParMap[K, V, That](cbf: () => Combiner[(K, V), That], protected[this] val pit: ParIterableIterator[T])(implicit ev: T <:< (K, V))
extends Transformer[Combiner[(K, V), That], ToParMap[K, V, That]] {
- var result: Result = null
+ @volatile var result: Result = null
def leaf(prev: Option[Combiner[(K, V), That]]) {
result = cbf()
while (pit.hasNext) result += pit.next
@@ -1130,7 +1140,7 @@ self =>
protected[this] class CreateScanTree[U >: T](from: Int, len: Int, z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T])
extends Transformer[ScanTree[U], CreateScanTree[U]] {
- var result: ScanTree[U] = null
+ @volatile var result: ScanTree[U] = null
def leaf(prev: Option[ScanTree[U]]) = if (pit.remaining > 0) {
val trees = ArrayBuffer[ScanTree[U]]()
var i = from
@@ -1168,7 +1178,7 @@ self =>
protected[this] class FromScanTree[U >: T, That]
(tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CanCombineFrom[Repr, U, That])
extends StrictSplitterCheckTask[Combiner[U, That], FromScanTree[U, That]] {
- var result: Combiner[U, That] = null
+ @volatile var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) {
val cb = reuse(prev, cbf(self.repr))
iterate(tree, cb)
@@ -1247,8 +1257,9 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
- // private val dbbuff = ArrayBuffer[String]()
- def debugBuffer: ArrayBuffer[String] = null // dbbuff
+ private val dbbuff = ArrayBuffer[String]()
+ def debugBuffer: ArrayBuffer[String] = dbbuff
+ // def debugBuffer: ArrayBuffer[String] = null
private[parallel] def debugclear() = synchronized {
debugBuffer.clear
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 58e8bcd031..063a8cab7d 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -327,7 +327,7 @@ self =>
protected[this] class SegmentLength(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T])
extends Accessor[(Int, Boolean), SegmentLength] {
- var result: (Int, Boolean) = null
+ @volatile var result: (Int, Boolean) = null
def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) {
val itsize = pit.remaining
val seglen = pit.prefixLength(pred)
@@ -345,7 +345,7 @@ self =>
protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T])
extends Accessor[Int, IndexWhere] {
- var result: Int = -1
+ @volatile var result: Int = -1
def leaf(prev: Option[Int]) = if (from < pit.indexFlag) {
val r = pit.indexWhere(pred)
if (r != -1) {
@@ -366,7 +366,7 @@ self =>
protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: ParSeqIterator[T])
extends Accessor[Int, LastIndexWhere] {
- var result: Int = -1
+ @volatile var result: Int = -1
def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) {
val r = pit.lastIndexWhere(pred)
if (r != -1) {
@@ -387,7 +387,7 @@ self =>
protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: ParSeqIterator[T])
extends Transformer[Combiner[U, This], Reverse[U, This]] {
- var result: Combiner[U, This] = null
+ @volatile var result: Combiner[U, This] = null
def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf()))
protected[this] def newSubtask(p: SuperParIterator) = new Reverse(cbf, down(p))
override def merge(that: Reverse[U, This]) = result = that.result combine result
@@ -395,7 +395,7 @@ self =>
protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParSeqIterator[T])
extends Transformer[Combiner[S, That], ReverseMap[S, That]] {
- var result: Combiner[S, That] = null
+ @volatile var result: Combiner[S, That] = null
def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf(self.repr))
protected[this] def newSubtask(p: SuperParIterator) = new ReverseMap(f, pbf, down(p))
override def merge(that: ReverseMap[S, That]) = result = that.result combine result
@@ -403,7 +403,7 @@ self =>
protected[this] class SameElements[U >: T](protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[U])
extends Accessor[Boolean, SameElements[U]] {
- var result: Boolean = true
+ @volatile var result: Boolean = true
def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
result = pit.sameElements(otherpit)
if (!result) pit.abort
@@ -420,7 +420,7 @@ self =>
protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: ParSeqIterator[T])
extends Transformer[Combiner[U, That], Updated[U, That]] {
- var result: Combiner[U, That] = null
+ @volatile var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf(self.repr))
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
@@ -433,7 +433,7 @@ self =>
protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S])
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
- var result: Result = null
+ @volatile var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr))
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
@@ -451,7 +451,7 @@ self =>
protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S])
extends Accessor[Boolean, Corresponds[S]] {
- var result: Boolean = true
+ @volatile var result: Boolean = true
def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
result = pit.corresponds(corr)(otherpit)
if (!result) pit.abort
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 662a600d42..e88d5dd0e0 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -30,48 +30,70 @@ trait Tasks {
type Result = R
def repr = this.asInstanceOf[Tp]
+
/** Body of the task - non-divisible unit of work done by this task.
* Optionally is provided with the result from the previous completed task
* or `None` if there was no previous task (or the previous task is uncompleted or unknown).
*/
def leaf(result: Option[R])
+
/** A result that can be accessed once the task is completed. */
- var result: R
+ @volatile var result: R
+
/** Decides whether or not this task should be split further. */
def shouldSplitFurther: Boolean
+
/** Splits this task into a list of smaller tasks. */
private[parallel] def split: Seq[Task[R, Tp]]
+
/** Read of results of `that` task and merge them into results of this one. */
private[parallel] def merge(that: Tp @uncheckedVariance) {}
// exception handling mechanism
- var throwable: Throwable = null
+ @volatile var throwable: Throwable = null
def forwardThrowable = if (throwable != null) throw throwable
+
// tries to do the leaf computation, storing the possible exception
- private[parallel] def tryLeaf(result: Option[R]) {
+ private[parallel] def tryLeaf(lastres: Option[R]) {
try {
tryBreakable {
- leaf(result)
+ leaf(lastres)
+ result = result // ensure that effects of `leaf` are visible to readers of `result`
} catchBreak {
signalAbort
}
} catch {
- case thr: Throwable =>
+ case thr: Exception =>
+ result = result // ensure that effects of `leaf` are visible
throwable = thr
signalAbort
}
}
+
private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
val that = t.asInstanceOf[Task[R, Tp]]
+ val local = result // ensure that any effects of modifying `result` are detected
+ // checkMerge(that)
if (this.throwable == null && that.throwable == null) merge(t)
mergeThrowables(that)
}
+
+ private def checkMerge(that: Task[R, Tp] @uncheckedVariance) {
+ if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
+ println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
+ } else if (this.throwable != null || that.throwable != null) {
+ println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
+ }
+ }
+
private[parallel] def mergeThrowables(that: Task[_, _]) {
if (this.throwable != null && that.throwable != null) {
// merge exceptions, since there were multiple exceptions
this.throwable = this.throwable alongWith that.throwable
} else if (that.throwable != null) this.throwable = that.throwable
+ else this.throwable = this.throwable
}
+
// override in concrete task implementations to signal abort to other tasks
private[parallel] def signalAbort {}
}
@@ -128,8 +150,8 @@ trait Tasks {
trait AdaptiveWorkStealingTasks extends Tasks {
trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] {
- var next: TaskImpl[R, Tp] = null
- var shouldWaitFor = true
+ @volatile var next: TaskImpl[R, Tp] = null
+ @volatile var shouldWaitFor = true
def split: Seq[TaskImpl[R, Tp]]
@@ -140,20 +162,21 @@ trait AdaptiveWorkStealingTasks extends Tasks {
last.body.tryLeaf(None)
body.result = last.body.result
+ body.throwable = last.body.throwable
while (last.next != null) {
// val lastresult = Option(last.body.result)
val beforelast = last
last = last.next
if (last.tryCancel) {
- // debuglog("Done with " + beforelast.body + ", next direct is " + last.body)
+ // println("Done with " + beforelast.body + ", next direct is " + last.body)
last.body.tryLeaf(Some(body.result))
last.release
} else {
- // debuglog("Done with " + beforelast.body + ", next sync is " + last.body)
+ // println("Done with " + beforelast.body + ", next sync is " + last.body)
last.sync
}
- // debuglog("Merging " + body + " with " + last.body)
+ // println("Merging " + body + " with " + last.body)
body.tryMerge(last.body.repr)
}
}
@@ -191,14 +214,6 @@ trait AdaptiveWorkStealingTasks extends Tasks {
}
-/**
- * A trait describing objects that provide a fork/join pool.
- */
-trait HavingForkJoinPool {
- def forkJoinPool: ForkJoinPool
-}
-
-
trait ThreadPoolTasks extends Tasks {
import java.util.concurrent._
@@ -324,6 +339,13 @@ object ThreadPoolTasks {
}
+/**
+ * A trait describing objects that provide a fork/join pool.
+ */
+trait HavingForkJoinPool {
+ def forkJoinPool: ForkJoinPool
+}
+
/** An implementation trait for parallel tasks based on the fork/join framework.
*
@@ -384,6 +406,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
}
fjtask.sync
+ // if (fjtask.body.throwable != null) println("throwing: " + fjtask.body.throwable + " at " + fjtask.body)
fjtask.body.forwardThrowable
fjtask.body.result
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 13b7670865..812a2ed94d 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -192,7 +192,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
class CreateTrie(bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int)
extends Task[Unit, CreateTrie] {
- var result = ()
+ @volatile var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
val until = offset + howmany
@@ -200,6 +200,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
root(i) = createTrie(bucks(i))
i += 1
}
+ result = result
}
private def createTrie(elems: Unrolled[(K, V)]): HashMap[K, V] = {
var trie = new HashMap[K, V]
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index ac031605fb..b1be7ffab5 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -3,7 +3,6 @@ package scala.collection.parallel.immutable
import scala.collection.immutable.Range
-import scala.collection.immutable.RangeUtils
import scala.collection.parallel.ParSeq
import scala.collection.parallel.Combiner
import scala.collection.generic.CanCombineFrom
@@ -28,6 +27,10 @@ self =>
type SCPI = SignalContextPassingIterator[ParRangeIterator]
+ override def toParSeq = this // TODO remove when we have ParSeq, when ParVector is in place
+
+ override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U]) // TODO remove when we have ParSeq, when ParVector is in place
+
class ParRangeIterator(range: Range = self.range)
extends ParIterator {
me: SignalContextPassingIterator[ParRangeIterator] =>
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 5faf73c1db..cf342b0203 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -40,7 +40,7 @@ package object parallel {
if (util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport
else new ThreadPoolTaskSupport
- private[parallel] val tasksupport = getTaskSupport
+ val tasksupport = getTaskSupport
/* implicit conversions */