From 11dfc5a64dd8bbcb7fca7d608a23b513316de6cc Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Thu, 9 Dec 2010 10:08:24 +0000 Subject: Made parallel collections serializable. No review. --- .../collection/generic/GenericParTemplate.scala | 7 ------ .../scala/collection/parallel/Combiner.scala | 5 +--- .../collection/parallel/ParIterableLike.scala | 5 ++-- .../collection/parallel/ParIterableViewLike.scala | 1 - .../scala/collection/parallel/ParSeqLike.scala | 2 +- src/library/scala/collection/parallel/Tasks.scala | 9 ++++++- .../scala/collection/parallel/UnrolledBuffer.scala | 29 +++++++++++++++++++--- .../collection/parallel/immutable/ParHashMap.scala | 2 ++ .../collection/parallel/immutable/ParHashSet.scala | 2 ++ .../collection/parallel/immutable/ParRange.scala | 6 +++-- .../collection/parallel/mutable/ParArray.scala | 21 +++++++++++++--- .../collection/parallel/mutable/ParHashMap.scala | 10 ++++++++ .../collection/parallel/mutable/ParHashSet.scala | 12 +++++++++ .../scala/collection/parallel/package.scala | 2 ++ 14 files changed, 89 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/library/scala/collection/generic/GenericParTemplate.scala b/src/library/scala/collection/generic/GenericParTemplate.scala index 1c3c43269d..0d87a2d548 100644 --- a/src/library/scala/collection/generic/GenericParTemplate.scala +++ b/src/library/scala/collection/generic/GenericParTemplate.scala @@ -26,15 +26,12 @@ trait GenericParTemplate[+A, +CC[X] <: ParIterable[X]] extends GenericTraversableTemplate[A, CC] with HasNewCombiner[A, CC[A] @uncheckedVariance] { - private[collection] def tasksupport: TaskSupport - def companion: GenericCompanion[CC] with GenericParCompanion[CC] protected[this] override def newBuilder: collection.mutable.Builder[A, CC[A]] = newCombiner protected[this] override def newCombiner: Combiner[A, CC[A]] = { val cb = companion.newCombiner[A] - cb.tasksupport.environment = tasksupport.environment cb } @@ -42,7 +39,6 @@ extends GenericTraversableTemplate[A, CC] def genericCombiner[B]: Combiner[B, CC[B]] = { val cb = companion.newCombiner[B] - cb.tasksupport.environment = tasksupport.environment cb } @@ -51,13 +47,10 @@ extends GenericTraversableTemplate[A, CC] trait GenericParMapTemplate[K, +V, +CC[X, Y] <: ParMap[X, Y]] { - private[collection] def tasksupport: TaskSupport - def mapCompanion: GenericParMapCompanion[CC] def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = { val cb = mapCompanion.newCombiner[P, Q] - cb.tasksupport.environment = tasksupport.environment cb } } diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala index 93522185fb..7b133cdbba 100644 --- a/src/library/scala/collection/parallel/Combiner.scala +++ b/src/library/scala/collection/parallel/Combiner.scala @@ -57,10 +57,7 @@ self: EnvironmentPassingCombiner[Elem, To] => trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] { abstract override def result = { val res = super.result - res match { - case pc: ParIterableLike[_, _, _] => pc.tasksupport.environment = tasksupport.environment - case _ => - } + // res } } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index d3e6eb42d4..caa4af10c9 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -15,6 +15,8 @@ import scala.collection.generic._ import java.util.concurrent.atomic.AtomicBoolean +import annotation.unchecked.uncheckedStable + // TODO update docs!! /** A template trait for parallel collections of type `ParIterable[T]`. @@ -126,7 +128,6 @@ extends IterableLike[T, Repr] { self => - private[collection] final val tasksupport: TaskSupport = getTaskSupport import tasksupport._ /** Parallel iterators are split iterators that have additional accessor and @@ -476,7 +477,7 @@ self => val copythis = new Copy(() => pbf(repr), parallelIterator) val copythat = wrap { val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator) - other.tasksupport.executeAndWaitResult(othtask) + tasksupport.executeAndWaitResult(othtask) } val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala index 6fb924e57e..6e3d2ff9fe 100644 --- a/src/library/scala/collection/parallel/ParIterableViewLike.scala +++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala @@ -49,7 +49,6 @@ self => trait Transformed[+S] extends ParIterableView[S, Coll, CollSeq] with super.Transformed[S] { override def parallelIterator: ParIterableIterator[S] override def iterator = parallelIterator - tasksupport.environment = self.tasksupport.environment } trait Sliced extends super.Sliced with Transformed[T] { diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 91e15fa946..58e8bcd031 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -244,7 +244,7 @@ self => val copystart = new Copy[U, That](() => pbf(repr), pits(0)) val copymiddle = wrap { val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator) - that.tasksupport.executeAndWaitResult(tsk) + tasksupport.executeAndWaitResult(tsk) } val copyend = new Copy[U, That](() => pbf(repr), pits(2)) executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index b111ecb87c..964e01e8d1 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -264,7 +264,7 @@ trait ThreadPoolTasks extends Tasks { var environment: AnyRef = ThreadPoolTasks.defaultThreadPool def executor = environment.asInstanceOf[ThreadPoolExecutor] def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]] - var totaltasks = 0 + @volatile var totaltasks = 0 private def incrTasks = synchronized { totaltasks += 1 @@ -312,6 +312,13 @@ object ThreadPoolTasks { Int.MaxValue, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable], + new ThreadFactory { + def newThread(r: Runnable) = { + val t = new Thread(r) + t.setDaemon(true) + t + } + }, new ThreadPoolExecutor.CallerRunsPolicy ) } diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala index c7a8b388bd..7f81cf779d 100644 --- a/src/library/scala/collection/parallel/UnrolledBuffer.scala +++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala @@ -38,17 +38,19 @@ import annotation.tailrec * @coll unrolled buffer * @Coll UnrolledBuffer */ +@SerialVersionUID(1L) class UnrolledBuffer[T](implicit val manifest: ClassManifest[T]) extends collection.mutable.Buffer[T] with collection.mutable.BufferLike[T, UnrolledBuffer[T]] with GenericClassManifestTraversableTemplate[T, UnrolledBuffer] with collection.mutable.Builder[T, UnrolledBuffer[T]] + with Serializable { import UnrolledBuffer.Unrolled - private var headptr = newUnrolled - private var lastptr = headptr - private var sz = 0 + @transient private var headptr = newUnrolled + @transient private var lastptr = headptr + @transient private var sz = 0 private[parallel] def headPtr = headptr private[parallel] def headPtr_=(head: Unrolled[T]) = headptr = head @@ -146,6 +148,27 @@ extends collection.mutable.Buffer[T] sz += elems.size } else outofbounds(idx) + private def writeObject(out: java.io.ObjectOutputStream) { + out.defaultWriteObject + out.writeInt(sz) + for (elem <- this) out.writeObject(elem) + } + + private def readObject(in: java.io.ObjectInputStream) { + in.defaultReadObject + + val num = in.readInt + + headPtr = newUnrolled + lastPtr = headPtr + sz = 0 + var i = 0 + while (i < num) { + this += in.readObject.asInstanceOf[T] + i += 1 + } + } + override def stringPrefix = "UnrolledBuffer" } diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index a411a1cc44..4d9475038d 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -28,10 +28,12 @@ import annotation.unchecked.uncheckedVariance * * @author prokopec */ +@SerialVersionUID(1L) class ParHashMap[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) extends ParMap[K, V] with GenericParMapTemplate[K, V, ParHashMap] with ParMapLike[K, V, ParHashMap[K, V], HashMap[K, V]] + with Serializable { self => diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index 0b1f9c5b7e..d17b258be6 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -29,10 +29,12 @@ import scala.collection.immutable.HashSet * * @author prokopec */ +@SerialVersionUID(1L) class ParHashSet[T] private[immutable] (private[this] val trie: HashSet[T]) extends ParSet[T] with GenericParTemplate[T, ParHashSet] with ParSetLike[T, ParHashSet[T], HashSet[T]] + with Serializable { self => diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala index ab5e509515..ec5aeefc87 100644 --- a/src/library/scala/collection/parallel/immutable/ParRange.scala +++ b/src/library/scala/collection/parallel/immutable/ParRange.scala @@ -11,8 +11,10 @@ import scala.collection.parallel.ParIterableIterator -class ParRange(range: Range) -extends ParSeq[Int] { +@SerialVersionUID(1L) +class ParRange(val range: Range) +extends ParSeq[Int] + with Serializable { self => def seq = range diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 8f70547a03..536976f5e3 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -37,15 +37,17 @@ import scala.collection.Sequentializable * @define Coll ParArray * @define coll parallel array */ +@SerialVersionUID(1L) class ParArray[T] private[mutable] (val arrayseq: ArraySeq[T]) extends ParSeq[T] with GenericParTemplate[T, ParArray] with ParSeqLike[T, ParArray[T], ArraySeq[T]] + with Serializable { self => - import tasksupport._ + import collection.parallel.tasksupport._ - private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]] + @transient private var array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]] override def companion: GenericCompanion[ParArray] with GenericParCompanion[ParArray] = ParArray @@ -582,7 +584,7 @@ self => } else super.map(f)(bf) override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = - if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) { + if (parallelismLevel > 1 && buildsArray(cbf(repr))) { // reserve an array val targarrseq = new ArraySeq[U](length + 1) val targetarr = targarrseq.array.asInstanceOf[Array[Any]] @@ -655,6 +657,19 @@ self => def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(length, parallelismLevel) } + /* serialization */ + + private def writeObject(out: java.io.ObjectOutputStream) { + out.defaultWriteObject + } + + private def readObject(in: java.io.ObjectInputStream) { + in.defaultReadObject + + // get raw array from arrayseq + array = arrayseq.array.asInstanceOf[Array[Any]] + } + } diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index 537c442e23..d231068b6c 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -11,11 +11,13 @@ import collection.mutable.HashTable +@SerialVersionUID(1L) class ParHashMap[K, V] private[collection] (contents: HashTable.Contents[K, DefaultEntry[K, V]]) extends ParMap[K, V] with GenericParMapTemplate[K, V, ParHashMap] with ParMapLike[K, V, ParHashMap[K, V], collection.mutable.HashMap[K, V]] with ParHashTable[K, DefaultEntry[K, V]] + with Serializable { self => initWithContents(contents) @@ -77,6 +79,14 @@ self => new ParHashMapIterator(idxFrom, idxUntil, totalSz, es) with SCPI } + private def writeObject(out: java.io.ObjectOutputStream) { + serializeTo(out, _.value) + } + + private def readObject(in: java.io.ObjectInputStream) { + init[V](in, new Entry(_, _)) + } + private[parallel] override def brokenInvariants = { // bucket by bucket, count elements val buckets = for (i <- 0 until (table.length / sizeMapBucketSize)) yield checkBucket(i) diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala index 66303862d3..cff8eeb9c9 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala @@ -13,11 +13,13 @@ import collection.parallel.UnrolledBuffer +@SerialVersionUID(1L) class ParHashSet[T] private[collection] (contents: FlatHashTable.Contents[T]) extends ParSet[T] with GenericParTemplate[T, ParHashSet] with ParSetLike[T, ParHashSet[T], collection.mutable.HashSet[T]] with ParFlatHashTable[T] + with Serializable { initWithContents(contents) // println("----> new par hash set!") @@ -46,6 +48,8 @@ extends ParSet[T] this } + override def stringPrefix = "ParHashSet" + def contains(elem: T) = containsEntry(elem) def parallelIterator = new ParHashSetIterator(0, table.length, size) with SCPI @@ -58,6 +62,14 @@ extends ParSet[T] def newIterator(start: Int, until: Int, total: Int) = new ParHashSetIterator(start, until, total) with SCPI } + private def writeObject(s: java.io.ObjectOutputStream) { + serializeTo(s) + } + + private def readObject(in: java.io.ObjectInputStream) { + init(in, x => x) + } + import collection.DebugUtils._ override def debugInformation = buildString { append => diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 6b3f3bf448..5faf73c1db 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -40,6 +40,8 @@ package object parallel { if (util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport else new ThreadPoolTaskSupport + private[parallel] val tasksupport = getTaskSupport + /* implicit conversions */ /** An implicit conversion providing arrays with a `par` method, which -- cgit v1.2.3