From 37b6b48b5fabb804ec02d762df7d83577ccad2ac Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Tue, 27 Mar 2012 20:57:45 +0200 Subject: Rename ConcurrentTrieMap to concurrent.TrieMap. Introduced the collection.concurrent package and introduced the concurrent.Map trait there. Deprecated the mutable.ConcurrentMap trait. Pending work - introduce the appropriate changes to JavaConversions and JavaConverters. --- .../scala/collection/concurrent/BasicNode.java | 20 + .../scala/collection/concurrent/CNodeBase.java | 35 + src/library/scala/collection/concurrent/Gen.java | 18 + .../scala/collection/concurrent/INodeBase.java | 35 + .../scala/collection/concurrent/MainNode.java | 40 + src/library/scala/collection/concurrent/Map.scala | 88 ++ .../scala/collection/concurrent/TrieMap.scala | 1075 ++++++++++++++++++++ .../scala/collection/mutable/BasicNode.java | 20 - .../scala/collection/mutable/CNodeBase.java | 35 - .../scala/collection/mutable/ConcurrentMap.scala | 1 + .../collection/mutable/ConcurrentTrieMap.scala | 1075 -------------------- src/library/scala/collection/mutable/Gen.java | 18 - .../scala/collection/mutable/INodeBase.java | 35 - src/library/scala/collection/mutable/MainNode.java | 40 - .../parallel/mutable/ParConcurrentTrieMap.scala | 193 ---- .../collection/parallel/mutable/ParTrieMap.scala | 193 ++++ test/files/jvm/serialization.check | 8 +- test/files/jvm/serialization.scala | 15 +- test/files/run/ctries/concmap.scala | 24 +- test/files/run/ctries/iterator.scala | 20 +- test/files/run/ctries/lnode.scala | 14 +- test/files/run/ctries/snapshot.scala | 26 +- test/files/scalacheck/Ctrie.scala | 14 +- .../parallel-collections/ParallelCtrieCheck.scala | 8 +- 24 files changed, 1570 insertions(+), 1480 deletions(-) create mode 100644 src/library/scala/collection/concurrent/BasicNode.java create mode 100644 src/library/scala/collection/concurrent/CNodeBase.java create mode 100644 src/library/scala/collection/concurrent/Gen.java create mode 100644 src/library/scala/collection/concurrent/INodeBase.java create mode 100644 src/library/scala/collection/concurrent/MainNode.java create mode 100644 src/library/scala/collection/concurrent/Map.scala create mode 100644 src/library/scala/collection/concurrent/TrieMap.scala delete mode 100644 src/library/scala/collection/mutable/BasicNode.java delete mode 100644 src/library/scala/collection/mutable/CNodeBase.java delete mode 100644 src/library/scala/collection/mutable/ConcurrentTrieMap.scala delete mode 100644 src/library/scala/collection/mutable/Gen.java delete mode 100644 src/library/scala/collection/mutable/INodeBase.java delete mode 100644 src/library/scala/collection/mutable/MainNode.java delete mode 100644 src/library/scala/collection/parallel/mutable/ParConcurrentTrieMap.scala create mode 100644 src/library/scala/collection/parallel/mutable/ParTrieMap.scala diff --git a/src/library/scala/collection/concurrent/BasicNode.java b/src/library/scala/collection/concurrent/BasicNode.java new file mode 100644 index 0000000000..904ab169a8 --- /dev/null +++ b/src/library/scala/collection/concurrent/BasicNode.java @@ -0,0 +1,20 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection.concurrent; + + + + + + +public abstract class BasicNode { + + public abstract String string(int lev); + +} \ No newline at end of file diff --git a/src/library/scala/collection/concurrent/CNodeBase.java b/src/library/scala/collection/concurrent/CNodeBase.java new file mode 100644 index 0000000000..e343ba95ca --- /dev/null +++ b/src/library/scala/collection/concurrent/CNodeBase.java @@ -0,0 +1,35 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection.concurrent; + + + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + + + +abstract class CNodeBase extends MainNode { + + public static final AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(CNodeBase.class, "csize"); + + public volatile int csize = -1; + + public boolean CAS_SIZE(int oldval, int nval) { + return updater.compareAndSet(this, oldval, nval); + } + + public void WRITE_SIZE(int nval) { + updater.set(this, nval); + } + + public int READ_SIZE() { + return updater.get(this); + } + +} \ No newline at end of file diff --git a/src/library/scala/collection/concurrent/Gen.java b/src/library/scala/collection/concurrent/Gen.java new file mode 100644 index 0000000000..4fac4417eb --- /dev/null +++ b/src/library/scala/collection/concurrent/Gen.java @@ -0,0 +1,18 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection.concurrent; + + + + + + +final class Gen { +} + diff --git a/src/library/scala/collection/concurrent/INodeBase.java b/src/library/scala/collection/concurrent/INodeBase.java new file mode 100644 index 0000000000..96bc393b4f --- /dev/null +++ b/src/library/scala/collection/concurrent/INodeBase.java @@ -0,0 +1,35 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection.concurrent; + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + + + +abstract class INodeBase extends BasicNode { + + public static final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(INodeBase.class, MainNode.class, "mainnode"); + + public static final Object RESTART = new Object(); + + public volatile MainNode mainnode = null; + + public final Gen gen; + + public INodeBase(Gen generation) { + gen = generation; + } + + public BasicNode prev() { + return null; + } + +} \ No newline at end of file diff --git a/src/library/scala/collection/concurrent/MainNode.java b/src/library/scala/collection/concurrent/MainNode.java new file mode 100644 index 0000000000..3eea58f3bb --- /dev/null +++ b/src/library/scala/collection/concurrent/MainNode.java @@ -0,0 +1,40 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection.concurrent; + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + + + +abstract class MainNode extends BasicNode { + + public static final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(MainNode.class, MainNode.class, "prev"); + + public volatile MainNode prev = null; + + public abstract int cachedSize(Object ct); + + public boolean CAS_PREV(MainNode oldval, MainNode nval) { + return updater.compareAndSet(this, oldval, nval); + } + + public void WRITE_PREV(MainNode nval) { + updater.set(this, nval); + } + + // do we need this? unclear in the javadocs... + // apparently not - volatile reads are supposed to be safe + // irregardless of whether there are concurrent ARFU updates + public MainNode READ_PREV() { + return updater.get(this); + } + +} \ No newline at end of file diff --git a/src/library/scala/collection/concurrent/Map.scala b/src/library/scala/collection/concurrent/Map.scala new file mode 100644 index 0000000000..83445738d9 --- /dev/null +++ b/src/library/scala/collection/concurrent/Map.scala @@ -0,0 +1,88 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2010-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection.concurrent + +/** A template trait for mutable maps that allow concurrent access. + * + * $concurrentmapinfo + * + * @since 2.8 + * @see [[http://docs.scala-lang.org/overviews/collections/concrete-mutable-collection-classes.html#concurrent_maps "Scala's Collection Library overview"]] + * section on `Concurrent Maps` for more information. + * + * @tparam A the key type of the map + * @tparam B the value type of the map + * + * @define Coll ConcurrentMap + * @define coll concurrent map + * @define concurrentmapinfo + * This is a base trait for all Scala concurrent map implementations. It + * provides all of the methods a `Map` does, with the difference that all the + * changes are atomic. It also describes methods specific to concurrent maps. + * + * '''Note''': The concurrent maps do not accept `'''null'''` for keys or values. + * + * @define atomicop + * This is an atomic operation. + */ +trait Map[A, B] extends scala.collection.mutable.Map[A, B] { + + /** + * Associates the given key with a given value, unless the key was already + * associated with some other value. + * + * $atomicop + * + * @param k key with which the specified value is to be associated with + * @param v value to be associated with the specified key + * @return `Some(oldvalue)` if there was a value `oldvalue` previously + * associated with the specified key, or `None` if there was no + * mapping for the specified key + */ + def putIfAbsent(k: A, v: B): Option[B] + + /** + * Removes the entry for the specified key if its currently mapped to the + * specified value. + * + * $atomicop + * + * @param k key for which the entry should be removed + * @param v value expected to be associated with the specified key if + * the removal is to take place + * @return `true` if the removal took place, `false` otherwise + */ + def remove(k: A, v: B): Boolean + + /** + * Replaces the entry for the given key only if it was previously mapped to + * a given value. + * + * $atomicop + * + * @param k key for which the entry should be replaced + * @param oldvalue value expected to be associated with the specified key + * if replacing is to happen + * @param newvalue value to be associated with the specified key + * @return `true` if the entry was replaced, `false` otherwise + */ + def replace(k: A, oldvalue: B, newvalue: B): Boolean + + /** + * Replaces the entry for the given key only if it was previously mapped + * to some value. + * + * $atomicop + * + * @param k key for which the entry should be replaced + * @param v value to be associated with the specified key + * @return `Some(v)` if the given key was previously mapped to some value `v`, or `None` otherwise + */ + def replace(k: A, v: B): Option[B] +} diff --git a/src/library/scala/collection/concurrent/TrieMap.scala b/src/library/scala/collection/concurrent/TrieMap.scala new file mode 100644 index 0000000000..092598e260 --- /dev/null +++ b/src/library/scala/collection/concurrent/TrieMap.scala @@ -0,0 +1,1075 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection +package concurrent + + + +import java.util.concurrent.atomic._ +import collection.immutable.{ ListMap => ImmutableListMap } +import collection.parallel.mutable.ParTrieMap +import generic._ +import annotation.tailrec +import annotation.switch + + + +private[collection] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INodeBase[K, V](g) { + import INodeBase._ + + WRITE(bn) + + def this(g: Gen) = this(null, g) + + @inline final def WRITE(nval: MainNode[K, V]) = INodeBase.updater.set(this, nval) + + @inline final def CAS(old: MainNode[K, V], n: MainNode[K, V]) = INodeBase.updater.compareAndSet(this, old, n) + + final def gcasRead(ct: TrieMap[K, V]): MainNode[K, V] = GCAS_READ(ct) + + @inline final def GCAS_READ(ct: TrieMap[K, V]): MainNode[K, V] = { + val m = /*READ*/mainnode + val prevval = /*READ*/m.prev + if (prevval eq null) m + else GCAS_Complete(m, ct) + } + + @tailrec private def GCAS_Complete(m: MainNode[K, V], ct: TrieMap[K, V]): MainNode[K, V] = if (m eq null) null else { + // complete the GCAS + val prev = /*READ*/m.prev + val ctr = ct.readRoot(true) + + prev match { + case null => + m + case fn: FailedNode[_, _] => // try to commit to previous value + if (CAS(m, fn.prev)) fn.prev + else GCAS_Complete(/*READ*/mainnode, ct) + case vn: MainNode[_, _] => + // Assume that you've read the root from the generation G. + // Assume that the snapshot algorithm is correct. + // ==> you can only reach nodes in generations <= G. + // ==> `gen` is <= G. + // We know that `ctr.gen` is >= G. + // ==> if `ctr.gen` = `gen` then they are both equal to G. + // ==> otherwise, we know that either `ctr.gen` > G, `gen` < G, + // or both + if ((ctr.gen eq gen) && ct.nonReadOnly) { + // try to commit + if (m.CAS_PREV(prev, null)) m + else GCAS_Complete(m, ct) + } else { + // try to abort + m.CAS_PREV(prev, new FailedNode(prev)) + GCAS_Complete(/*READ*/mainnode, ct) + } + } + } + + @inline final def GCAS(old: MainNode[K, V], n: MainNode[K, V], ct: TrieMap[K, V]): Boolean = { + n.WRITE_PREV(old) + if (CAS(old, n)) { + GCAS_Complete(n, ct) + /*READ*/n.prev eq null + } else false + } + + @inline private def inode(cn: MainNode[K, V]) = { + val nin = new INode[K, V](gen) + nin.WRITE(cn) + nin + } + + final def copyToGen(ngen: Gen, ct: TrieMap[K, V]) = { + val nin = new INode[K, V](ngen) + val main = GCAS_READ(ct) + nin.WRITE(main) + nin + } + + /** Inserts a key value pair, overwriting the old pair if the keys match. + * + * @return true if successful, false otherwise + */ + @tailrec final def rec_insert(k: K, v: V, hc: Int, lev: Int, parent: INode[K, V], startgen: Gen, ct: TrieMap[K, V]): Boolean = { + val m = GCAS_READ(ct) // use -Yinline! + + m match { + case cn: CNode[K, V] => // 1) a multiway node + val idx = (hc >>> lev) & 0x1f + val flag = 1 << idx + val bmp = cn.bitmap + val mask = flag - 1 + val pos = Integer.bitCount(bmp & mask) + if ((bmp & flag) != 0) { + // 1a) insert below + cn.array(pos) match { + case in: INode[K, V] => + if (startgen eq in.gen) in.rec_insert(k, v, hc, lev + 5, this, startgen, ct) + else { + if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_insert(k, v, hc, lev, parent, startgen, ct) + else false + } + case sn: SNode[K, V] => + if (sn.hc == hc && sn.k == k) GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct) + else { + val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) + val nn = rn.updatedAt(pos, inode(CNode.dual(sn, sn.hc, new SNode(k, v, hc), hc, lev + 5, gen)), gen) + GCAS(cn, nn, ct) + } + } + } else { + val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) + val ncnode = rn.insertedAt(pos, flag, new SNode(k, v, hc), gen) + GCAS(cn, ncnode, ct) + } + case tn: TNode[K, V] => + clean(parent, ct, lev - 5) + false + case ln: LNode[K, V] => // 3) an l-node + val nn = ln.inserted(k, v) + GCAS(ln, nn, ct) + } + } + + /** Inserts a new key value pair, given that a specific condition is met. + * + * @param cond null - don't care if the key was there; KEY_ABSENT - key wasn't there; KEY_PRESENT - key was there; other value `v` - key must be bound to `v` + * @return null if unsuccessful, Option[V] otherwise (indicating previous value bound to the key) + */ + @tailrec final def rec_insertif(k: K, v: V, hc: Int, cond: AnyRef, lev: Int, parent: INode[K, V], startgen: Gen, ct: TrieMap[K, V]): Option[V] = { + val m = GCAS_READ(ct) // use -Yinline! + + m match { + case cn: CNode[K, V] => // 1) a multiway node + val idx = (hc >>> lev) & 0x1f + val flag = 1 << idx + val bmp = cn.bitmap + val mask = flag - 1 + val pos = Integer.bitCount(bmp & mask) + if ((bmp & flag) != 0) { + // 1a) insert below + cn.array(pos) match { + case in: INode[K, V] => + if (startgen eq in.gen) in.rec_insertif(k, v, hc, cond, lev + 5, this, startgen, ct) + else { + if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_insertif(k, v, hc, cond, lev, parent, startgen, ct) + else null + } + case sn: SNode[K, V] => cond match { + case null => + if (sn.hc == hc && sn.k == k) { + if (GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct)) Some(sn.v) else null + } else { + val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) + val nn = rn.updatedAt(pos, inode(CNode.dual(sn, sn.hc, new SNode(k, v, hc), hc, lev + 5, gen)), gen) + if (GCAS(cn, nn, ct)) None + else null + } + case INode.KEY_ABSENT => + if (sn.hc == hc && sn.k == k) Some(sn.v) + else { + val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) + val nn = rn.updatedAt(pos, inode(CNode.dual(sn, sn.hc, new SNode(k, v, hc), hc, lev + 5, gen)), gen) + if (GCAS(cn, nn, ct)) None + else null + } + case INode.KEY_PRESENT => + if (sn.hc == hc && sn.k == k) { + if (GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct)) Some(sn.v) else null + } else None + case otherv: V => + if (sn.hc == hc && sn.k == k && sn.v == otherv) { + if (GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct)) Some(sn.v) else null + } else None + } + } + } else cond match { + case null | INode.KEY_ABSENT => + val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) + val ncnode = rn.insertedAt(pos, flag, new SNode(k, v, hc), gen) + if (GCAS(cn, ncnode, ct)) None else null + case INode.KEY_PRESENT => None + case otherv: V => None + } + case sn: TNode[K, V] => + clean(parent, ct, lev - 5) + null + case ln: LNode[K, V] => // 3) an l-node + @inline def insertln() = { + val nn = ln.inserted(k, v) + GCAS(ln, nn, ct) + } + cond match { + case null => + val optv = ln.get(k) + if (insertln()) optv else null + case INode.KEY_ABSENT => + ln.get(k) match { + case None => if (insertln()) None else null + case optv => optv + } + case INode.KEY_PRESENT => + ln.get(k) match { + case Some(v0) => if (insertln()) Some(v0) else null + case None => None + } + case otherv: V => + ln.get(k) match { + case Some(v0) if v0 == otherv => if (insertln()) Some(otherv) else null + case _ => None + } + } + } + } + + /** Looks up the value associated with the key. + * + * @return null if no value has been found, RESTART if the operation wasn't successful, or any other value otherwise + */ + @tailrec final def rec_lookup(k: K, hc: Int, lev: Int, parent: INode[K, V], startgen: Gen, ct: TrieMap[K, V]): AnyRef = { + val m = GCAS_READ(ct) // use -Yinline! + + m match { + case cn: CNode[K, V] => // 1) a multinode + val idx = (hc >>> lev) & 0x1f + val flag = 1 << idx + val bmp = cn.bitmap + if ((bmp & flag) == 0) null // 1a) bitmap shows no binding + else { // 1b) bitmap contains a value - descend + val pos = if (bmp == 0xffffffff) idx else Integer.bitCount(bmp & (flag - 1)) + val sub = cn.array(pos) + sub match { + case in: INode[K, V] => + if (ct.isReadOnly || (startgen eq in.gen)) in.rec_lookup(k, hc, lev + 5, this, startgen, ct) + else { + if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_lookup(k, hc, lev, parent, startgen, ct) + else return RESTART // used to be throw RestartException + } + case sn: SNode[K, V] => // 2) singleton node + if (sn.hc == hc && sn.k == k) sn.v.asInstanceOf[AnyRef] + else null + } + } + case tn: TNode[K, V] => // 3) non-live node + def cleanReadOnly(tn: TNode[K, V]) = if (ct.nonReadOnly) { + clean(parent, ct, lev - 5) + RESTART // used to be throw RestartException + } else { + if (tn.hc == hc && tn.k == k) tn.v.asInstanceOf[AnyRef] + else null + } + cleanReadOnly(tn) + case ln: LNode[K, V] => // 5) an l-node + ln.get(k).asInstanceOf[Option[AnyRef]].orNull + } + } + + /** Removes the key associated with the given value. + * + * @param v if null, will remove the key irregardless of the value; otherwise removes only if binding contains that exact key and value + * @return null if not successful, an Option[V] indicating the previous value otherwise + */ + final def rec_remove(k: K, v: V, hc: Int, lev: Int, parent: INode[K, V], startgen: Gen, ct: TrieMap[K, V]): Option[V] = { + val m = GCAS_READ(ct) // use -Yinline! + + m match { + case cn: CNode[K, V] => + val idx = (hc >>> lev) & 0x1f + val bmp = cn.bitmap + val flag = 1 << idx + if ((bmp & flag) == 0) None + else { + val pos = Integer.bitCount(bmp & (flag - 1)) + val sub = cn.array(pos) + val res = sub match { + case in: INode[K, V] => + if (startgen eq in.gen) in.rec_remove(k, v, hc, lev + 5, this, startgen, ct) + else { + if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_remove(k, v, hc, lev, parent, startgen, ct) + else null + } + case sn: SNode[K, V] => + if (sn.hc == hc && sn.k == k && (v == null || sn.v == v)) { + val ncn = cn.removedAt(pos, flag, gen).toContracted(lev) + if (GCAS(cn, ncn, ct)) Some(sn.v) else null + } else None + } + + if (res == None || (res eq null)) res + else { + @tailrec def cleanParent(nonlive: AnyRef) { + val pm = parent.GCAS_READ(ct) + pm match { + case cn: CNode[K, V] => + val idx = (hc >>> (lev - 5)) & 0x1f + val bmp = cn.bitmap + val flag = 1 << idx + if ((bmp & flag) == 0) {} // somebody already removed this i-node, we're done + else { + val pos = Integer.bitCount(bmp & (flag - 1)) + val sub = cn.array(pos) + if (sub eq this) nonlive match { + case tn: TNode[K, V] => + val ncn = cn.updatedAt(pos, tn.copyUntombed, gen).toContracted(lev - 5) + if (!parent.GCAS(cn, ncn, ct)) + if (ct.readRoot().gen == startgen) cleanParent(nonlive) + } + } + case _ => // parent is no longer a cnode, we're done + } + } + + if (parent ne null) { // never tomb at root + val n = GCAS_READ(ct) + if (n.isInstanceOf[TNode[_, _]]) + cleanParent(n) + } + + res + } + } + case tn: TNode[K, V] => + clean(parent, ct, lev - 5) + null + case ln: LNode[K, V] => + if (v == null) { + val optv = ln.get(k) + val nn = ln.removed(k) + if (GCAS(ln, nn, ct)) optv else null + } else ln.get(k) match { + case optv @ Some(v0) if v0 == v => + val nn = ln.removed(k) + if (GCAS(ln, nn, ct)) optv else null + case _ => None + } + } + } + + private def clean(nd: INode[K, V], ct: TrieMap[K, V], lev: Int) { + val m = nd.GCAS_READ(ct) + m match { + case cn: CNode[K, V] => nd.GCAS(cn, cn.toCompressed(ct, lev, gen), ct) + case _ => + } + } + + final def isNullInode(ct: TrieMap[K, V]) = GCAS_READ(ct) eq null + + final def cachedSize(ct: TrieMap[K, V]): Int = { + val m = GCAS_READ(ct) + m.cachedSize(ct) + } + + /* this is a quiescent method! */ + def string(lev: Int) = "%sINode -> %s".format(" " * lev, mainnode match { + case null => "" + case tn: TNode[_, _] => "TNode(%s, %s, %d, !)".format(tn.k, tn.v, tn.hc) + case cn: CNode[_, _] => cn.string(lev) + case ln: LNode[_, _] => ln.string(lev) + case x => "".format(x) + }) + +} + + +private[concurrent] object INode { + val KEY_PRESENT = new AnyRef + val KEY_ABSENT = new AnyRef + + def newRootNode[K, V] = { + val gen = new Gen + val cn = new CNode[K, V](0, new Array(0), gen) + new INode[K, V](cn, gen) + } +} + + +private[concurrent] final class FailedNode[K, V](p: MainNode[K, V]) extends MainNode[K, V] { + WRITE_PREV(p) + + def string(lev: Int) = throw new UnsupportedOperationException + + def cachedSize(ct: AnyRef): Int = throw new UnsupportedOperationException + + override def toString = "FailedNode(%s)".format(p) +} + + +private[concurrent] trait KVNode[K, V] { + def kvPair: (K, V) +} + + +private[collection] final class SNode[K, V](final val k: K, final val v: V, final val hc: Int) +extends BasicNode with KVNode[K, V] { + final def copy = new SNode(k, v, hc) + final def copyTombed = new TNode(k, v, hc) + final def copyUntombed = new SNode(k, v, hc) + final def kvPair = (k, v) + final def string(lev: Int) = (" " * lev) + "SNode(%s, %s, %x)".format(k, v, hc) +} + + +private[collection] final class TNode[K, V](final val k: K, final val v: V, final val hc: Int) +extends MainNode[K, V] with KVNode[K, V] { + final def copy = new TNode(k, v, hc) + final def copyTombed = new TNode(k, v, hc) + final def copyUntombed = new SNode(k, v, hc) + final def kvPair = (k, v) + final def cachedSize(ct: AnyRef): Int = 1 + final def string(lev: Int) = (" " * lev) + "TNode(%s, %s, %x, !)".format(k, v, hc) +} + + +private[collection] final class LNode[K, V](final val listmap: ImmutableListMap[K, V]) +extends MainNode[K, V] { + def this(k: K, v: V) = this(ImmutableListMap(k -> v)) + def this(k1: K, v1: V, k2: K, v2: V) = this(ImmutableListMap(k1 -> v1, k2 -> v2)) + def inserted(k: K, v: V) = new LNode(listmap + ((k, v))) + def removed(k: K): MainNode[K, V] = { + val updmap = listmap - k + if (updmap.size > 1) new LNode(updmap) + else { + val (k, v) = updmap.iterator.next + new TNode(k, v, TrieMap.computeHash(k)) // create it tombed so that it gets compressed on subsequent accesses + } + } + def get(k: K) = listmap.get(k) + def cachedSize(ct: AnyRef): Int = listmap.size + def string(lev: Int) = (" " * lev) + "LNode(%s)".format(listmap.mkString(", ")) +} + + +private[collection] final class CNode[K, V](final val bitmap: Int, final val array: Array[BasicNode], final val gen: Gen) +extends CNodeBase[K, V] { + + // this should only be called from within read-only snapshots + final def cachedSize(ct: AnyRef) = { + val currsz = READ_SIZE() + if (currsz != -1) currsz + else { + val sz = computeSize(ct.asInstanceOf[TrieMap[K, V]]) + while (READ_SIZE() == -1) CAS_SIZE(-1, sz) + READ_SIZE() + } + } + + // lends itself towards being parallelizable by choosing + // a random starting offset in the array + // => if there are concurrent size computations, they start + // at different positions, so they are more likely to + // to be independent + private def computeSize(ct: TrieMap[K, V]): Int = { + var i = 0 + var sz = 0 + val offset = math.abs(util.Random.nextInt()) % array.length + while (i < array.length) { + val pos = (i + offset) % array.length + array(pos) match { + case sn: SNode[_, _] => sz += 1 + case in: INode[K, V] => sz += in.cachedSize(ct) + } + i += 1 + } + sz + } + + final def updatedAt(pos: Int, nn: BasicNode, gen: Gen) = { + val len = array.length + val narr = new Array[BasicNode](len) + Array.copy(array, 0, narr, 0, len) + narr(pos) = nn + new CNode[K, V](bitmap, narr, gen) + } + + final def removedAt(pos: Int, flag: Int, gen: Gen) = { + val arr = array + val len = arr.length + val narr = new Array[BasicNode](len - 1) + Array.copy(arr, 0, narr, 0, pos) + Array.copy(arr, pos + 1, narr, pos, len - pos - 1) + new CNode[K, V](bitmap ^ flag, narr, gen) + } + + final def insertedAt(pos: Int, flag: Int, nn: BasicNode, gen: Gen) = { + val len = array.length + val bmp = bitmap + val narr = new Array[BasicNode](len + 1) + Array.copy(array, 0, narr, 0, pos) + narr(pos) = nn + Array.copy(array, pos, narr, pos + 1, len - pos) + new CNode[K, V](bmp | flag, narr, gen) + } + + /** Returns a copy of this cnode such that all the i-nodes below it are copied + * to the specified generation `ngen`. + */ + final def renewed(ngen: Gen, ct: TrieMap[K, V]) = { + var i = 0 + val arr = array + val len = arr.length + val narr = new Array[BasicNode](len) + while (i < len) { + arr(i) match { + case in: INode[K, V] => narr(i) = in.copyToGen(ngen, ct) + case bn: BasicNode => narr(i) = bn + } + i += 1 + } + new CNode[K, V](bitmap, narr, ngen) + } + + private def resurrect(inode: INode[K, V], inodemain: AnyRef): BasicNode = inodemain match { + case tn: TNode[_, _] => tn.copyUntombed + case _ => inode + } + + final def toContracted(lev: Int): MainNode[K, V] = if (array.length == 1 && lev > 0) array(0) match { + case sn: SNode[K, V] => sn.copyTombed + case _ => this + } else this + + // - if the branching factor is 1 for this CNode, and the child + // is a tombed SNode, returns its tombed version + // - otherwise, if there is at least one non-null node below, + // returns the version of this node with at least some null-inodes + // removed (those existing when the op began) + // - if there are only null-i-nodes below, returns null + final def toCompressed(ct: TrieMap[K, V], lev: Int, gen: Gen) = { + var bmp = bitmap + var i = 0 + val arr = array + val tmparray = new Array[BasicNode](arr.length) + while (i < arr.length) { // construct new bitmap + val sub = arr(i) + sub match { + case in: INode[K, V] => + val inodemain = in.gcasRead(ct) + assert(inodemain ne null) + tmparray(i) = resurrect(in, inodemain) + case sn: SNode[K, V] => + tmparray(i) = sn + } + i += 1 + } + + new CNode[K, V](bmp, tmparray, gen).toContracted(lev) + } + + private[concurrent] def string(lev: Int): String = "CNode %x\n%s".format(bitmap, array.map(_.string(lev + 1)).mkString("\n")) + + /* quiescently consistent - don't call concurrently to anything involving a GCAS!! */ + protected def collectElems: Seq[(K, V)] = array flatMap { + case sn: SNode[K, V] => Some(sn.kvPair) + case in: INode[K, V] => in.mainnode match { + case tn: TNode[K, V] => Some(tn.kvPair) + case ln: LNode[K, V] => ln.listmap.toList + case cn: CNode[K, V] => cn.collectElems + } + } + + protected def collectLocalElems: Seq[String] = array flatMap { + case sn: SNode[K, V] => Some(sn.kvPair._2.toString) + case in: INode[K, V] => Some(in.toString.drop(14) + "(" + in.gen + ")") + } + + override def toString = { + val elems = collectLocalElems + "CNode(sz: %d; %s)".format(elems.size, elems.sorted.mkString(", ")) + } +} + + +private[concurrent] object CNode { + + def dual[K, V](x: SNode[K, V], xhc: Int, y: SNode[K, V], yhc: Int, lev: Int, gen: Gen): MainNode[K, V] = if (lev < 35) { + val xidx = (xhc >>> lev) & 0x1f + val yidx = (yhc >>> lev) & 0x1f + val bmp = (1 << xidx) | (1 << yidx) + if (xidx == yidx) { + val subinode = new INode[K, V](gen)//(TrieMap.inodeupdater) + subinode.mainnode = dual(x, xhc, y, yhc, lev + 5, gen) + new CNode(bmp, Array(subinode), gen) + } else { + if (xidx < yidx) new CNode(bmp, Array(x, y), gen) + else new CNode(bmp, Array(y, x), gen) + } + } else { + new LNode(x.k, x.v, y.k, y.v) + } + +} + + +private[concurrent] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmain: MainNode[K, V], nv: INode[K, V]) { + @volatile var committed = false +} + + +/** A concurrent hash-trie or TrieMap is a concurrent thread-safe lock-free + * implementation of a hash array mapped trie. It is used to implement the + * concurrent map abstraction. It has particularly scalable concurrent insert + * and remove operations and is memory-efficient. It supports O(1), atomic, + * lock-free snapshots which are used to implement linearizable lock-free size, + * iterator and clear operations. The cost of evaluating the (lazy) snapshot is + * distributed across subsequent updates, thus making snapshot evaluation horizontally scalable. + * + * For details, see: http://lampwww.epfl.ch/~prokopec/ctries-snapshot.pdf + * + * @author Aleksandar Prokopec + * @since 2.10 + */ +@SerialVersionUID(0L - 6402774413839597105L) +final class TrieMap[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[TrieMap[K, V], AnyRef]) +extends scala.collection.concurrent.Map[K, V] + with scala.collection.mutable.MapLike[K, V, TrieMap[K, V]] + with CustomParallelizable[(K, V), ParTrieMap[K, V]] + with Serializable +{ + import TrieMap.computeHash + + private var rootupdater = rtupd + @volatile var root = r + + def this() = this( + INode.newRootNode, + AtomicReferenceFieldUpdater.newUpdater(classOf[TrieMap[K, V]], classOf[AnyRef], "root") + ) + + /* internal methods */ + + private def writeObject(out: java.io.ObjectOutputStream) { + val it = iterator + while (it.hasNext) { + val (k, v) = it.next() + out.writeObject(k) + out.writeObject(v) + } + out.writeObject(TrieMapSerializationEnd) + } + + private def readObject(in: java.io.ObjectInputStream) { + root = INode.newRootNode + rootupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[TrieMap[K, V]], classOf[AnyRef], "root") + + var obj: AnyRef = null + do { + obj = in.readObject() + if (obj != TrieMapSerializationEnd) { + val k = obj.asInstanceOf[K] + val v = in.readObject().asInstanceOf[V] + update(k, v) + } + } while (obj != TrieMapSerializationEnd) + } + + @inline final def CAS_ROOT(ov: AnyRef, nv: AnyRef) = rootupdater.compareAndSet(this, ov, nv) + + final def readRoot(abort: Boolean = false): INode[K, V] = RDCSS_READ_ROOT(abort) + + @inline final def RDCSS_READ_ROOT(abort: Boolean = false): INode[K, V] = { + val r = /*READ*/root + r match { + case in: INode[K, V] => in + case desc: RDCSS_Descriptor[K, V] => RDCSS_Complete(abort) + } + } + + @tailrec private def RDCSS_Complete(abort: Boolean): INode[K, V] = { + val v = /*READ*/root + v match { + case in: INode[K, V] => in + case desc: RDCSS_Descriptor[K, V] => + val RDCSS_Descriptor(ov, exp, nv) = desc + if (abort) { + if (CAS_ROOT(desc, ov)) ov + else RDCSS_Complete(abort) + } else { + val oldmain = ov.gcasRead(this) + if (oldmain eq exp) { + if (CAS_ROOT(desc, nv)) { + desc.committed = true + nv + } else RDCSS_Complete(abort) + } else { + if (CAS_ROOT(desc, ov)) ov + else RDCSS_Complete(abort) + } + } + } + } + + private def RDCSS_ROOT(ov: INode[K, V], expectedmain: MainNode[K, V], nv: INode[K, V]): Boolean = { + val desc = RDCSS_Descriptor(ov, expectedmain, nv) + if (CAS_ROOT(ov, desc)) { + RDCSS_Complete(false) + /*READ*/desc.committed + } else false + } + + @tailrec private def inserthc(k: K, hc: Int, v: V) { + val r = RDCSS_READ_ROOT() + if (!r.rec_insert(k, v, hc, 0, null, r.gen, this)) inserthc(k, hc, v) + } + + @tailrec private def insertifhc(k: K, hc: Int, v: V, cond: AnyRef): Option[V] = { + val r = RDCSS_READ_ROOT() + + val ret = r.rec_insertif(k, v, hc, cond, 0, null, r.gen, this) + if (ret eq null) insertifhc(k, hc, v, cond) + else ret + } + + @tailrec private def lookuphc(k: K, hc: Int): AnyRef = { + val r = RDCSS_READ_ROOT() + val res = r.rec_lookup(k, hc, 0, null, r.gen, this) + if (res eq INodeBase.RESTART) lookuphc(k, hc) + else res + } + + /* slower: + //@tailrec + private def lookuphc(k: K, hc: Int): AnyRef = { + val r = RDCSS_READ_ROOT() + try { + r.rec_lookup(k, hc, 0, null, r.gen, this) + } catch { + case RestartException => + lookuphc(k, hc) + } + } + */ + + @tailrec private def removehc(k: K, v: V, hc: Int): Option[V] = { + val r = RDCSS_READ_ROOT() + val res = r.rec_remove(k, v, hc, 0, null, r.gen, this) + if (res ne null) res + else removehc(k, v, hc) + } + + def string = RDCSS_READ_ROOT().string(0) + + /* public methods */ + + override def seq = this + + override def par = new ParTrieMap(this) + + override def empty: TrieMap[K, V] = new TrieMap[K, V] + + final def isReadOnly = rootupdater eq null + + final def nonReadOnly = rootupdater ne null + + /** Returns a snapshot of this TrieMap. + * This operation is lock-free and linearizable. + * + * The snapshot is lazily updated - the first time some branch + * in the snapshot or this TrieMap are accessed, they are rewritten. + * This means that the work of rebuilding both the snapshot and this + * TrieMap is distributed across all the threads doing updates or accesses + * subsequent to the snapshot creation. + */ + @tailrec final def snapshot(): TrieMap[K, V] = { + val r = RDCSS_READ_ROOT() + val expmain = r.gcasRead(this) + if (RDCSS_ROOT(r, expmain, r.copyToGen(new Gen, this))) new TrieMap(r.copyToGen(new Gen, this), rootupdater) + else snapshot() + } + + /** Returns a read-only snapshot of this TrieMap. + * This operation is lock-free and linearizable. + * + * The snapshot is lazily updated - the first time some branch + * of this TrieMap are accessed, it is rewritten. The work of creating + * the snapshot is thus distributed across subsequent updates + * and accesses on this TrieMap by all threads. + * Note that the snapshot itself is never rewritten unlike when calling + * the `snapshot` method, but the obtained snapshot cannot be modified. + * + * This method is used by other methods such as `size` and `iterator`. + */ + @tailrec final def readOnlySnapshot(): collection.Map[K, V] = { + val r = RDCSS_READ_ROOT() + val expmain = r.gcasRead(this) + if (RDCSS_ROOT(r, expmain, r.copyToGen(new Gen, this))) new TrieMap(r, null) + else readOnlySnapshot() + } + + @tailrec final override def clear() { + val r = RDCSS_READ_ROOT() + if (!RDCSS_ROOT(r, r.gcasRead(this), INode.newRootNode[K, V])) clear() + } + + final def lookup(k: K): V = { + val hc = computeHash(k) + lookuphc(k, hc).asInstanceOf[V] + } + + final override def apply(k: K): V = { + val hc = computeHash(k) + val res = lookuphc(k, hc) + if (res eq null) throw new NoSuchElementException + else res.asInstanceOf[V] + } + + final def get(k: K): Option[V] = { + val hc = computeHash(k) + Option(lookuphc(k, hc)).asInstanceOf[Option[V]] + } + + override def put(key: K, value: V): Option[V] = { + val hc = computeHash(key) + insertifhc(key, hc, value, null) + } + + final override def update(k: K, v: V) { + val hc = computeHash(k) + inserthc(k, hc, v) + } + + final def +=(kv: (K, V)) = { + update(kv._1, kv._2) + this + } + + final override def remove(k: K): Option[V] = { + val hc = computeHash(k) + removehc(k, null.asInstanceOf[V], hc) + } + + final def -=(k: K) = { + remove(k) + this + } + + def putIfAbsent(k: K, v: V): Option[V] = { + val hc = computeHash(k) + insertifhc(k, hc, v, INode.KEY_ABSENT) + } + + def remove(k: K, v: V): Boolean = { + val hc = computeHash(k) + removehc(k, v, hc).nonEmpty + } + + def replace(k: K, oldvalue: V, newvalue: V): Boolean = { + val hc = computeHash(k) + insertifhc(k, hc, newvalue, oldvalue.asInstanceOf[AnyRef]).nonEmpty + } + + def replace(k: K, v: V): Option[V] = { + val hc = computeHash(k) + insertifhc(k, hc, v, INode.KEY_PRESENT) + } + + def iterator: Iterator[(K, V)] = + if (nonReadOnly) readOnlySnapshot().iterator + else new TrieMapIterator(0, this) + + private def cachedSize() = { + val r = RDCSS_READ_ROOT() + r.cachedSize(this) + } + + override def size: Int = + if (nonReadOnly) readOnlySnapshot().size + else cachedSize() + + override def stringPrefix = "TrieMap" + +} + + +object TrieMap extends MutableMapFactory[TrieMap] { + val inodeupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[INodeBase[_, _]], classOf[MainNode[_, _]], "mainnode") + + implicit def canBuildFrom[K, V]: CanBuildFrom[Coll, (K, V), TrieMap[K, V]] = new MapCanBuildFrom[K, V] + + def empty[K, V]: TrieMap[K, V] = new TrieMap[K, V] + + @inline final def computeHash[K](k: K): Int = { + var hcode = k.hashCode + hcode = hcode * 0x9e3775cd + hcode = java.lang.Integer.reverseBytes(hcode) + hcode * 0x9e3775cd + } + +} + + +private[collection] class TrieMapIterator[K, V](var level: Int, private var ct: TrieMap[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] { + var stack = new Array[Array[BasicNode]](7) + var stackpos = new Array[Int](7) + var depth = -1 + var subiter: Iterator[(K, V)] = null + var current: KVNode[K, V] = null + + if (mustInit) initialize() + + def hasNext = (current ne null) || (subiter ne null) + + def next() = if (hasNext) { + var r: (K, V) = null + if (subiter ne null) { + r = subiter.next() + checkSubiter() + } else { + r = current.kvPair + advance() + } + r + } else Iterator.empty.next() + + private def readin(in: INode[K, V]) = in.gcasRead(ct) match { + case cn: CNode[K, V] => + depth += 1 + stack(depth) = cn.array + stackpos(depth) = -1 + advance() + case tn: TNode[K, V] => + current = tn + case ln: LNode[K, V] => + subiter = ln.listmap.iterator + checkSubiter() + case null => + current = null + } + + @inline private def checkSubiter() = if (!subiter.hasNext) { + subiter = null + advance() + } + + @inline private def initialize() { + assert(ct.isReadOnly) + + val r = ct.RDCSS_READ_ROOT() + readin(r) + } + + def advance(): Unit = if (depth >= 0) { + val npos = stackpos(depth) + 1 + if (npos < stack(depth).length) { + stackpos(depth) = npos + stack(depth)(npos) match { + case sn: SNode[K, V] => + current = sn + case in: INode[K, V] => + readin(in) + } + } else { + depth -= 1 + advance() + } + } else current = null + + protected def newIterator(_lev: Int, _ct: TrieMap[K, V], _mustInit: Boolean) = new TrieMapIterator[K, V](_lev, _ct, _mustInit) + + protected def dupTo(it: TrieMapIterator[K, V]) = { + it.level = this.level + it.ct = this.ct + it.depth = this.depth + it.current = this.current + + // these need a deep copy + Array.copy(this.stack, 0, it.stack, 0, 7) + Array.copy(this.stackpos, 0, it.stackpos, 0, 7) + + // this one needs to be evaluated + if (this.subiter == null) it.subiter = null + else { + val lst = this.subiter.toList + this.subiter = lst.iterator + it.subiter = lst.iterator + } + } + + /** Returns a sequence of iterators over subsets of this iterator. + * It's used to ease the implementation of splitters for a parallel version of the TrieMap. + */ + protected def subdivide(): Seq[Iterator[(K, V)]] = if (subiter ne null) { + // the case where an LNode is being iterated + val it = subiter + subiter = null + advance() + this.level += 1 + Seq(it, this) + } else if (depth == -1) { + this.level += 1 + Seq(this) + } else { + var d = 0 + while (d <= depth) { + val rem = stack(d).length - 1 - stackpos(d) + if (rem > 0) { + val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2) + stack(d) = arr1 + stackpos(d) = -1 + val it = newIterator(level + 1, ct, false) + it.stack(0) = arr2 + it.stackpos(0) = -1 + it.depth = 0 + it.advance() // <-- fix it + this.level += 1 + return Seq(this, it) + } + d += 1 + } + this.level += 1 + Seq(this) + } + + def printDebug { + println("ctrie iterator") + println(stackpos.mkString(",")) + println("depth: " + depth) + println("curr.: " + current) + println(stack.mkString("\n")) + } + +} + + +private[concurrent] object RestartException extends util.control.ControlThrowable + + +/** Only used for ctrie serialization. */ +@SerialVersionUID(0L - 7237891413820527142L) +private[concurrent] case object TrieMapSerializationEnd + + +private[concurrent] object Debug { + import collection._ + + lazy val logbuffer = new java.util.concurrent.ConcurrentLinkedQueue[AnyRef] + + def log(s: AnyRef) = logbuffer.add(s) + + def flush() { + for (s <- JavaConversions.asScalaIterator(logbuffer.iterator())) Console.out.println(s.toString) + logbuffer.clear() + } + + def clear() { + logbuffer.clear() + } + +} + + + + + + + + + + diff --git a/src/library/scala/collection/mutable/BasicNode.java b/src/library/scala/collection/mutable/BasicNode.java deleted file mode 100644 index c05009470a..0000000000 --- a/src/library/scala/collection/mutable/BasicNode.java +++ /dev/null @@ -1,20 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.collection.mutable; - - - - - - -public abstract class BasicNode { - - public abstract String string(int lev); - -} \ No newline at end of file diff --git a/src/library/scala/collection/mutable/CNodeBase.java b/src/library/scala/collection/mutable/CNodeBase.java deleted file mode 100644 index 4374943b8d..0000000000 --- a/src/library/scala/collection/mutable/CNodeBase.java +++ /dev/null @@ -1,35 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.collection.mutable; - - - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - - - -abstract class CNodeBase extends MainNode { - - public static final AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(CNodeBase.class, "csize"); - - public volatile int csize = -1; - - public boolean CAS_SIZE(int oldval, int nval) { - return updater.compareAndSet(this, oldval, nval); - } - - public void WRITE_SIZE(int nval) { - updater.set(this, nval); - } - - public int READ_SIZE() { - return updater.get(this); - } - -} \ No newline at end of file diff --git a/src/library/scala/collection/mutable/ConcurrentMap.scala b/src/library/scala/collection/mutable/ConcurrentMap.scala index fbb356ffb3..f2b44d6737 100644 --- a/src/library/scala/collection/mutable/ConcurrentMap.scala +++ b/src/library/scala/collection/mutable/ConcurrentMap.scala @@ -32,6 +32,7 @@ package mutable * @define atomicop * This is an atomic operation. */ +@deprecated("Use `scala.collection.concurrent.Map` instead.", "2.10.0") trait ConcurrentMap[A, B] extends Map[A, B] { /** diff --git a/src/library/scala/collection/mutable/ConcurrentTrieMap.scala b/src/library/scala/collection/mutable/ConcurrentTrieMap.scala deleted file mode 100644 index 1a44c8e423..0000000000 --- a/src/library/scala/collection/mutable/ConcurrentTrieMap.scala +++ /dev/null @@ -1,1075 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.collection -package mutable - - - -import java.util.concurrent.atomic._ -import collection.immutable.{ ListMap => ImmutableListMap } -import collection.parallel.mutable.ParConcurrentTrieMap -import generic._ -import annotation.tailrec -import annotation.switch - - - -private[collection] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INodeBase[K, V](g) { - import INodeBase._ - - WRITE(bn) - - def this(g: Gen) = this(null, g) - - @inline final def WRITE(nval: MainNode[K, V]) = INodeBase.updater.set(this, nval) - - @inline final def CAS(old: MainNode[K, V], n: MainNode[K, V]) = INodeBase.updater.compareAndSet(this, old, n) - - final def gcasRead(ct: ConcurrentTrieMap[K, V]): MainNode[K, V] = GCAS_READ(ct) - - @inline final def GCAS_READ(ct: ConcurrentTrieMap[K, V]): MainNode[K, V] = { - val m = /*READ*/mainnode - val prevval = /*READ*/m.prev - if (prevval eq null) m - else GCAS_Complete(m, ct) - } - - @tailrec private def GCAS_Complete(m: MainNode[K, V], ct: ConcurrentTrieMap[K, V]): MainNode[K, V] = if (m eq null) null else { - // complete the GCAS - val prev = /*READ*/m.prev - val ctr = ct.readRoot(true) - - prev match { - case null => - m - case fn: FailedNode[_, _] => // try to commit to previous value - if (CAS(m, fn.prev)) fn.prev - else GCAS_Complete(/*READ*/mainnode, ct) - case vn: MainNode[_, _] => - // Assume that you've read the root from the generation G. - // Assume that the snapshot algorithm is correct. - // ==> you can only reach nodes in generations <= G. - // ==> `gen` is <= G. - // We know that `ctr.gen` is >= G. - // ==> if `ctr.gen` = `gen` then they are both equal to G. - // ==> otherwise, we know that either `ctr.gen` > G, `gen` < G, - // or both - if ((ctr.gen eq gen) && ct.nonReadOnly) { - // try to commit - if (m.CAS_PREV(prev, null)) m - else GCAS_Complete(m, ct) - } else { - // try to abort - m.CAS_PREV(prev, new FailedNode(prev)) - GCAS_Complete(/*READ*/mainnode, ct) - } - } - } - - @inline final def GCAS(old: MainNode[K, V], n: MainNode[K, V], ct: ConcurrentTrieMap[K, V]): Boolean = { - n.WRITE_PREV(old) - if (CAS(old, n)) { - GCAS_Complete(n, ct) - /*READ*/n.prev eq null - } else false - } - - @inline private def inode(cn: MainNode[K, V]) = { - val nin = new INode[K, V](gen) - nin.WRITE(cn) - nin - } - - final def copyToGen(ngen: Gen, ct: ConcurrentTrieMap[K, V]) = { - val nin = new INode[K, V](ngen) - val main = GCAS_READ(ct) - nin.WRITE(main) - nin - } - - /** Inserts a key value pair, overwriting the old pair if the keys match. - * - * @return true if successful, false otherwise - */ - @tailrec final def rec_insert(k: K, v: V, hc: Int, lev: Int, parent: INode[K, V], startgen: Gen, ct: ConcurrentTrieMap[K, V]): Boolean = { - val m = GCAS_READ(ct) // use -Yinline! - - m match { - case cn: CNode[K, V] => // 1) a multiway node - val idx = (hc >>> lev) & 0x1f - val flag = 1 << idx - val bmp = cn.bitmap - val mask = flag - 1 - val pos = Integer.bitCount(bmp & mask) - if ((bmp & flag) != 0) { - // 1a) insert below - cn.array(pos) match { - case in: INode[K, V] => - if (startgen eq in.gen) in.rec_insert(k, v, hc, lev + 5, this, startgen, ct) - else { - if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_insert(k, v, hc, lev, parent, startgen, ct) - else false - } - case sn: SNode[K, V] => - if (sn.hc == hc && sn.k == k) GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct) - else { - val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) - val nn = rn.updatedAt(pos, inode(CNode.dual(sn, sn.hc, new SNode(k, v, hc), hc, lev + 5, gen)), gen) - GCAS(cn, nn, ct) - } - } - } else { - val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) - val ncnode = rn.insertedAt(pos, flag, new SNode(k, v, hc), gen) - GCAS(cn, ncnode, ct) - } - case tn: TNode[K, V] => - clean(parent, ct, lev - 5) - false - case ln: LNode[K, V] => // 3) an l-node - val nn = ln.inserted(k, v) - GCAS(ln, nn, ct) - } - } - - /** Inserts a new key value pair, given that a specific condition is met. - * - * @param cond null - don't care if the key was there; KEY_ABSENT - key wasn't there; KEY_PRESENT - key was there; other value `v` - key must be bound to `v` - * @return null if unsuccessful, Option[V] otherwise (indicating previous value bound to the key) - */ - @tailrec final def rec_insertif(k: K, v: V, hc: Int, cond: AnyRef, lev: Int, parent: INode[K, V], startgen: Gen, ct: ConcurrentTrieMap[K, V]): Option[V] = { - val m = GCAS_READ(ct) // use -Yinline! - - m match { - case cn: CNode[K, V] => // 1) a multiway node - val idx = (hc >>> lev) & 0x1f - val flag = 1 << idx - val bmp = cn.bitmap - val mask = flag - 1 - val pos = Integer.bitCount(bmp & mask) - if ((bmp & flag) != 0) { - // 1a) insert below - cn.array(pos) match { - case in: INode[K, V] => - if (startgen eq in.gen) in.rec_insertif(k, v, hc, cond, lev + 5, this, startgen, ct) - else { - if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_insertif(k, v, hc, cond, lev, parent, startgen, ct) - else null - } - case sn: SNode[K, V] => cond match { - case null => - if (sn.hc == hc && sn.k == k) { - if (GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct)) Some(sn.v) else null - } else { - val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) - val nn = rn.updatedAt(pos, inode(CNode.dual(sn, sn.hc, new SNode(k, v, hc), hc, lev + 5, gen)), gen) - if (GCAS(cn, nn, ct)) None - else null - } - case INode.KEY_ABSENT => - if (sn.hc == hc && sn.k == k) Some(sn.v) - else { - val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) - val nn = rn.updatedAt(pos, inode(CNode.dual(sn, sn.hc, new SNode(k, v, hc), hc, lev + 5, gen)), gen) - if (GCAS(cn, nn, ct)) None - else null - } - case INode.KEY_PRESENT => - if (sn.hc == hc && sn.k == k) { - if (GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct)) Some(sn.v) else null - } else None - case otherv: V => - if (sn.hc == hc && sn.k == k && sn.v == otherv) { - if (GCAS(cn, cn.updatedAt(pos, new SNode(k, v, hc), gen), ct)) Some(sn.v) else null - } else None - } - } - } else cond match { - case null | INode.KEY_ABSENT => - val rn = if (cn.gen eq gen) cn else cn.renewed(gen, ct) - val ncnode = rn.insertedAt(pos, flag, new SNode(k, v, hc), gen) - if (GCAS(cn, ncnode, ct)) None else null - case INode.KEY_PRESENT => None - case otherv: V => None - } - case sn: TNode[K, V] => - clean(parent, ct, lev - 5) - null - case ln: LNode[K, V] => // 3) an l-node - @inline def insertln() = { - val nn = ln.inserted(k, v) - GCAS(ln, nn, ct) - } - cond match { - case null => - val optv = ln.get(k) - if (insertln()) optv else null - case INode.KEY_ABSENT => - ln.get(k) match { - case None => if (insertln()) None else null - case optv => optv - } - case INode.KEY_PRESENT => - ln.get(k) match { - case Some(v0) => if (insertln()) Some(v0) else null - case None => None - } - case otherv: V => - ln.get(k) match { - case Some(v0) if v0 == otherv => if (insertln()) Some(otherv) else null - case _ => None - } - } - } - } - - /** Looks up the value associated with the key. - * - * @return null if no value has been found, RESTART if the operation wasn't successful, or any other value otherwise - */ - @tailrec final def rec_lookup(k: K, hc: Int, lev: Int, parent: INode[K, V], startgen: Gen, ct: ConcurrentTrieMap[K, V]): AnyRef = { - val m = GCAS_READ(ct) // use -Yinline! - - m match { - case cn: CNode[K, V] => // 1) a multinode - val idx = (hc >>> lev) & 0x1f - val flag = 1 << idx - val bmp = cn.bitmap - if ((bmp & flag) == 0) null // 1a) bitmap shows no binding - else { // 1b) bitmap contains a value - descend - val pos = if (bmp == 0xffffffff) idx else Integer.bitCount(bmp & (flag - 1)) - val sub = cn.array(pos) - sub match { - case in: INode[K, V] => - if (ct.isReadOnly || (startgen eq in.gen)) in.rec_lookup(k, hc, lev + 5, this, startgen, ct) - else { - if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_lookup(k, hc, lev, parent, startgen, ct) - else return RESTART // used to be throw RestartException - } - case sn: SNode[K, V] => // 2) singleton node - if (sn.hc == hc && sn.k == k) sn.v.asInstanceOf[AnyRef] - else null - } - } - case tn: TNode[K, V] => // 3) non-live node - def cleanReadOnly(tn: TNode[K, V]) = if (ct.nonReadOnly) { - clean(parent, ct, lev - 5) - RESTART // used to be throw RestartException - } else { - if (tn.hc == hc && tn.k == k) tn.v.asInstanceOf[AnyRef] - else null - } - cleanReadOnly(tn) - case ln: LNode[K, V] => // 5) an l-node - ln.get(k).asInstanceOf[Option[AnyRef]].orNull - } - } - - /** Removes the key associated with the given value. - * - * @param v if null, will remove the key irregardless of the value; otherwise removes only if binding contains that exact key and value - * @return null if not successful, an Option[V] indicating the previous value otherwise - */ - final def rec_remove(k: K, v: V, hc: Int, lev: Int, parent: INode[K, V], startgen: Gen, ct: ConcurrentTrieMap[K, V]): Option[V] = { - val m = GCAS_READ(ct) // use -Yinline! - - m match { - case cn: CNode[K, V] => - val idx = (hc >>> lev) & 0x1f - val bmp = cn.bitmap - val flag = 1 << idx - if ((bmp & flag) == 0) None - else { - val pos = Integer.bitCount(bmp & (flag - 1)) - val sub = cn.array(pos) - val res = sub match { - case in: INode[K, V] => - if (startgen eq in.gen) in.rec_remove(k, v, hc, lev + 5, this, startgen, ct) - else { - if (GCAS(cn, cn.renewed(startgen, ct), ct)) rec_remove(k, v, hc, lev, parent, startgen, ct) - else null - } - case sn: SNode[K, V] => - if (sn.hc == hc && sn.k == k && (v == null || sn.v == v)) { - val ncn = cn.removedAt(pos, flag, gen).toContracted(lev) - if (GCAS(cn, ncn, ct)) Some(sn.v) else null - } else None - } - - if (res == None || (res eq null)) res - else { - @tailrec def cleanParent(nonlive: AnyRef) { - val pm = parent.GCAS_READ(ct) - pm match { - case cn: CNode[K, V] => - val idx = (hc >>> (lev - 5)) & 0x1f - val bmp = cn.bitmap - val flag = 1 << idx - if ((bmp & flag) == 0) {} // somebody already removed this i-node, we're done - else { - val pos = Integer.bitCount(bmp & (flag - 1)) - val sub = cn.array(pos) - if (sub eq this) nonlive match { - case tn: TNode[K, V] => - val ncn = cn.updatedAt(pos, tn.copyUntombed, gen).toContracted(lev - 5) - if (!parent.GCAS(cn, ncn, ct)) - if (ct.readRoot().gen == startgen) cleanParent(nonlive) - } - } - case _ => // parent is no longer a cnode, we're done - } - } - - if (parent ne null) { // never tomb at root - val n = GCAS_READ(ct) - if (n.isInstanceOf[TNode[_, _]]) - cleanParent(n) - } - - res - } - } - case tn: TNode[K, V] => - clean(parent, ct, lev - 5) - null - case ln: LNode[K, V] => - if (v == null) { - val optv = ln.get(k) - val nn = ln.removed(k) - if (GCAS(ln, nn, ct)) optv else null - } else ln.get(k) match { - case optv @ Some(v0) if v0 == v => - val nn = ln.removed(k) - if (GCAS(ln, nn, ct)) optv else null - case _ => None - } - } - } - - private def clean(nd: INode[K, V], ct: ConcurrentTrieMap[K, V], lev: Int) { - val m = nd.GCAS_READ(ct) - m match { - case cn: CNode[K, V] => nd.GCAS(cn, cn.toCompressed(ct, lev, gen), ct) - case _ => - } - } - - final def isNullInode(ct: ConcurrentTrieMap[K, V]) = GCAS_READ(ct) eq null - - final def cachedSize(ct: ConcurrentTrieMap[K, V]): Int = { - val m = GCAS_READ(ct) - m.cachedSize(ct) - } - - /* this is a quiescent method! */ - def string(lev: Int) = "%sINode -> %s".format(" " * lev, mainnode match { - case null => "" - case tn: TNode[_, _] => "TNode(%s, %s, %d, !)".format(tn.k, tn.v, tn.hc) - case cn: CNode[_, _] => cn.string(lev) - case ln: LNode[_, _] => ln.string(lev) - case x => "".format(x) - }) - -} - - -private[mutable] object INode { - val KEY_PRESENT = new AnyRef - val KEY_ABSENT = new AnyRef - - def newRootNode[K, V] = { - val gen = new Gen - val cn = new CNode[K, V](0, new Array(0), gen) - new INode[K, V](cn, gen) - } -} - - -private[mutable] final class FailedNode[K, V](p: MainNode[K, V]) extends MainNode[K, V] { - WRITE_PREV(p) - - def string(lev: Int) = throw new UnsupportedOperationException - - def cachedSize(ct: AnyRef): Int = throw new UnsupportedOperationException - - override def toString = "FailedNode(%s)".format(p) -} - - -private[mutable] trait KVNode[K, V] { - def kvPair: (K, V) -} - - -private[collection] final class SNode[K, V](final val k: K, final val v: V, final val hc: Int) -extends BasicNode with KVNode[K, V] { - final def copy = new SNode(k, v, hc) - final def copyTombed = new TNode(k, v, hc) - final def copyUntombed = new SNode(k, v, hc) - final def kvPair = (k, v) - final def string(lev: Int) = (" " * lev) + "SNode(%s, %s, %x)".format(k, v, hc) -} - - -private[collection] final class TNode[K, V](final val k: K, final val v: V, final val hc: Int) -extends MainNode[K, V] with KVNode[K, V] { - final def copy = new TNode(k, v, hc) - final def copyTombed = new TNode(k, v, hc) - final def copyUntombed = new SNode(k, v, hc) - final def kvPair = (k, v) - final def cachedSize(ct: AnyRef): Int = 1 - final def string(lev: Int) = (" " * lev) + "TNode(%s, %s, %x, !)".format(k, v, hc) -} - - -private[collection] final class LNode[K, V](final val listmap: ImmutableListMap[K, V]) -extends MainNode[K, V] { - def this(k: K, v: V) = this(ImmutableListMap(k -> v)) - def this(k1: K, v1: V, k2: K, v2: V) = this(ImmutableListMap(k1 -> v1, k2 -> v2)) - def inserted(k: K, v: V) = new LNode(listmap + ((k, v))) - def removed(k: K): MainNode[K, V] = { - val updmap = listmap - k - if (updmap.size > 1) new LNode(updmap) - else { - val (k, v) = updmap.iterator.next - new TNode(k, v, ConcurrentTrieMap.computeHash(k)) // create it tombed so that it gets compressed on subsequent accesses - } - } - def get(k: K) = listmap.get(k) - def cachedSize(ct: AnyRef): Int = listmap.size - def string(lev: Int) = (" " * lev) + "LNode(%s)".format(listmap.mkString(", ")) -} - - -private[collection] final class CNode[K, V](final val bitmap: Int, final val array: Array[BasicNode], final val gen: Gen) -extends CNodeBase[K, V] { - - // this should only be called from within read-only snapshots - final def cachedSize(ct: AnyRef) = { - val currsz = READ_SIZE() - if (currsz != -1) currsz - else { - val sz = computeSize(ct.asInstanceOf[ConcurrentTrieMap[K, V]]) - while (READ_SIZE() == -1) CAS_SIZE(-1, sz) - READ_SIZE() - } - } - - // lends itself towards being parallelizable by choosing - // a random starting offset in the array - // => if there are concurrent size computations, they start - // at different positions, so they are more likely to - // to be independent - private def computeSize(ct: ConcurrentTrieMap[K, V]): Int = { - var i = 0 - var sz = 0 - val offset = math.abs(util.Random.nextInt()) % array.length - while (i < array.length) { - val pos = (i + offset) % array.length - array(pos) match { - case sn: SNode[_, _] => sz += 1 - case in: INode[K, V] => sz += in.cachedSize(ct) - } - i += 1 - } - sz - } - - final def updatedAt(pos: Int, nn: BasicNode, gen: Gen) = { - val len = array.length - val narr = new Array[BasicNode](len) - Array.copy(array, 0, narr, 0, len) - narr(pos) = nn - new CNode[K, V](bitmap, narr, gen) - } - - final def removedAt(pos: Int, flag: Int, gen: Gen) = { - val arr = array - val len = arr.length - val narr = new Array[BasicNode](len - 1) - Array.copy(arr, 0, narr, 0, pos) - Array.copy(arr, pos + 1, narr, pos, len - pos - 1) - new CNode[K, V](bitmap ^ flag, narr, gen) - } - - final def insertedAt(pos: Int, flag: Int, nn: BasicNode, gen: Gen) = { - val len = array.length - val bmp = bitmap - val narr = new Array[BasicNode](len + 1) - Array.copy(array, 0, narr, 0, pos) - narr(pos) = nn - Array.copy(array, pos, narr, pos + 1, len - pos) - new CNode[K, V](bmp | flag, narr, gen) - } - - /** Returns a copy of this cnode such that all the i-nodes below it are copied - * to the specified generation `ngen`. - */ - final def renewed(ngen: Gen, ct: ConcurrentTrieMap[K, V]) = { - var i = 0 - val arr = array - val len = arr.length - val narr = new Array[BasicNode](len) - while (i < len) { - arr(i) match { - case in: INode[K, V] => narr(i) = in.copyToGen(ngen, ct) - case bn: BasicNode => narr(i) = bn - } - i += 1 - } - new CNode[K, V](bitmap, narr, ngen) - } - - private def resurrect(inode: INode[K, V], inodemain: AnyRef): BasicNode = inodemain match { - case tn: TNode[_, _] => tn.copyUntombed - case _ => inode - } - - final def toContracted(lev: Int): MainNode[K, V] = if (array.length == 1 && lev > 0) array(0) match { - case sn: SNode[K, V] => sn.copyTombed - case _ => this - } else this - - // - if the branching factor is 1 for this CNode, and the child - // is a tombed SNode, returns its tombed version - // - otherwise, if there is at least one non-null node below, - // returns the version of this node with at least some null-inodes - // removed (those existing when the op began) - // - if there are only null-i-nodes below, returns null - final def toCompressed(ct: ConcurrentTrieMap[K, V], lev: Int, gen: Gen) = { - var bmp = bitmap - var i = 0 - val arr = array - val tmparray = new Array[BasicNode](arr.length) - while (i < arr.length) { // construct new bitmap - val sub = arr(i) - sub match { - case in: INode[K, V] => - val inodemain = in.gcasRead(ct) - assert(inodemain ne null) - tmparray(i) = resurrect(in, inodemain) - case sn: SNode[K, V] => - tmparray(i) = sn - } - i += 1 - } - - new CNode[K, V](bmp, tmparray, gen).toContracted(lev) - } - - private[mutable] def string(lev: Int): String = "CNode %x\n%s".format(bitmap, array.map(_.string(lev + 1)).mkString("\n")) - - /* quiescently consistent - don't call concurrently to anything involving a GCAS!! */ - protected def collectElems: Seq[(K, V)] = array flatMap { - case sn: SNode[K, V] => Some(sn.kvPair) - case in: INode[K, V] => in.mainnode match { - case tn: TNode[K, V] => Some(tn.kvPair) - case ln: LNode[K, V] => ln.listmap.toList - case cn: CNode[K, V] => cn.collectElems - } - } - - protected def collectLocalElems: Seq[String] = array flatMap { - case sn: SNode[K, V] => Some(sn.kvPair._2.toString) - case in: INode[K, V] => Some(in.toString.drop(14) + "(" + in.gen + ")") - } - - override def toString = { - val elems = collectLocalElems - "CNode(sz: %d; %s)".format(elems.size, elems.sorted.mkString(", ")) - } -} - - -private[mutable] object CNode { - - def dual[K, V](x: SNode[K, V], xhc: Int, y: SNode[K, V], yhc: Int, lev: Int, gen: Gen): MainNode[K, V] = if (lev < 35) { - val xidx = (xhc >>> lev) & 0x1f - val yidx = (yhc >>> lev) & 0x1f - val bmp = (1 << xidx) | (1 << yidx) - if (xidx == yidx) { - val subinode = new INode[K, V](gen)//(ConcurrentTrieMap.inodeupdater) - subinode.mainnode = dual(x, xhc, y, yhc, lev + 5, gen) - new CNode(bmp, Array(subinode), gen) - } else { - if (xidx < yidx) new CNode(bmp, Array(x, y), gen) - else new CNode(bmp, Array(y, x), gen) - } - } else { - new LNode(x.k, x.v, y.k, y.v) - } - -} - - -private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmain: MainNode[K, V], nv: INode[K, V]) { - @volatile var committed = false -} - - -/** A concurrent hash-trie or ConcurrentTrieMap is a concurrent thread-safe lock-free - * implementation of a hash array mapped trie. It is used to implement the - * concurrent map abstraction. It has particularly scalable concurrent insert - * and remove operations and is memory-efficient. It supports O(1), atomic, - * lock-free snapshots which are used to implement linearizable lock-free size, - * iterator and clear operations. The cost of evaluating the (lazy) snapshot is - * distributed across subsequent updates, thus making snapshot evaluation horizontally scalable. - * - * For details, see: http://lampwww.epfl.ch/~prokopec/ctries-snapshot.pdf - * - * @author Aleksandar Prokopec - * @since 2.10 - */ -@SerialVersionUID(0L - 6402774413839597105L) -final class ConcurrentTrieMap[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[ConcurrentTrieMap[K, V], AnyRef]) -extends ConcurrentMap[K, V] - with MapLike[K, V, ConcurrentTrieMap[K, V]] - with CustomParallelizable[(K, V), ParConcurrentTrieMap[K, V]] - with Serializable -{ - import ConcurrentTrieMap.computeHash - - private var rootupdater = rtupd - @volatile var root = r - - def this() = this( - INode.newRootNode, - AtomicReferenceFieldUpdater.newUpdater(classOf[ConcurrentTrieMap[K, V]], classOf[AnyRef], "root") - ) - - /* internal methods */ - - private def writeObject(out: java.io.ObjectOutputStream) { - val it = iterator - while (it.hasNext) { - val (k, v) = it.next() - out.writeObject(k) - out.writeObject(v) - } - out.writeObject(ConcurrentTrieMapSerializationEnd) - } - - private def readObject(in: java.io.ObjectInputStream) { - root = INode.newRootNode - rootupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[ConcurrentTrieMap[K, V]], classOf[AnyRef], "root") - - var obj: AnyRef = null - do { - obj = in.readObject() - if (obj != ConcurrentTrieMapSerializationEnd) { - val k = obj.asInstanceOf[K] - val v = in.readObject().asInstanceOf[V] - update(k, v) - } - } while (obj != ConcurrentTrieMapSerializationEnd) - } - - @inline final def CAS_ROOT(ov: AnyRef, nv: AnyRef) = rootupdater.compareAndSet(this, ov, nv) - - final def readRoot(abort: Boolean = false): INode[K, V] = RDCSS_READ_ROOT(abort) - - @inline final def RDCSS_READ_ROOT(abort: Boolean = false): INode[K, V] = { - val r = /*READ*/root - r match { - case in: INode[K, V] => in - case desc: RDCSS_Descriptor[K, V] => RDCSS_Complete(abort) - } - } - - @tailrec private def RDCSS_Complete(abort: Boolean): INode[K, V] = { - val v = /*READ*/root - v match { - case in: INode[K, V] => in - case desc: RDCSS_Descriptor[K, V] => - val RDCSS_Descriptor(ov, exp, nv) = desc - if (abort) { - if (CAS_ROOT(desc, ov)) ov - else RDCSS_Complete(abort) - } else { - val oldmain = ov.gcasRead(this) - if (oldmain eq exp) { - if (CAS_ROOT(desc, nv)) { - desc.committed = true - nv - } else RDCSS_Complete(abort) - } else { - if (CAS_ROOT(desc, ov)) ov - else RDCSS_Complete(abort) - } - } - } - } - - private def RDCSS_ROOT(ov: INode[K, V], expectedmain: MainNode[K, V], nv: INode[K, V]): Boolean = { - val desc = RDCSS_Descriptor(ov, expectedmain, nv) - if (CAS_ROOT(ov, desc)) { - RDCSS_Complete(false) - /*READ*/desc.committed - } else false - } - - @tailrec private def inserthc(k: K, hc: Int, v: V) { - val r = RDCSS_READ_ROOT() - if (!r.rec_insert(k, v, hc, 0, null, r.gen, this)) inserthc(k, hc, v) - } - - @tailrec private def insertifhc(k: K, hc: Int, v: V, cond: AnyRef): Option[V] = { - val r = RDCSS_READ_ROOT() - - val ret = r.rec_insertif(k, v, hc, cond, 0, null, r.gen, this) - if (ret eq null) insertifhc(k, hc, v, cond) - else ret - } - - @tailrec private def lookuphc(k: K, hc: Int): AnyRef = { - val r = RDCSS_READ_ROOT() - val res = r.rec_lookup(k, hc, 0, null, r.gen, this) - if (res eq INodeBase.RESTART) lookuphc(k, hc) - else res - } - - /* slower: - //@tailrec - private def lookuphc(k: K, hc: Int): AnyRef = { - val r = RDCSS_READ_ROOT() - try { - r.rec_lookup(k, hc, 0, null, r.gen, this) - } catch { - case RestartException => - lookuphc(k, hc) - } - } - */ - - @tailrec private def removehc(k: K, v: V, hc: Int): Option[V] = { - val r = RDCSS_READ_ROOT() - val res = r.rec_remove(k, v, hc, 0, null, r.gen, this) - if (res ne null) res - else removehc(k, v, hc) - } - - def string = RDCSS_READ_ROOT().string(0) - - /* public methods */ - - override def seq = this - - override def par = new ParConcurrentTrieMap(this) - - override def empty: ConcurrentTrieMap[K, V] = new ConcurrentTrieMap[K, V] - - final def isReadOnly = rootupdater eq null - - final def nonReadOnly = rootupdater ne null - - /** Returns a snapshot of this ConcurrentTrieMap. - * This operation is lock-free and linearizable. - * - * The snapshot is lazily updated - the first time some branch - * in the snapshot or this ConcurrentTrieMap are accessed, they are rewritten. - * This means that the work of rebuilding both the snapshot and this - * ConcurrentTrieMap is distributed across all the threads doing updates or accesses - * subsequent to the snapshot creation. - */ - @tailrec final def snapshot(): ConcurrentTrieMap[K, V] = { - val r = RDCSS_READ_ROOT() - val expmain = r.gcasRead(this) - if (RDCSS_ROOT(r, expmain, r.copyToGen(new Gen, this))) new ConcurrentTrieMap(r.copyToGen(new Gen, this), rootupdater) - else snapshot() - } - - /** Returns a read-only snapshot of this ConcurrentTrieMap. - * This operation is lock-free and linearizable. - * - * The snapshot is lazily updated - the first time some branch - * of this ConcurrentTrieMap are accessed, it is rewritten. The work of creating - * the snapshot is thus distributed across subsequent updates - * and accesses on this ConcurrentTrieMap by all threads. - * Note that the snapshot itself is never rewritten unlike when calling - * the `snapshot` method, but the obtained snapshot cannot be modified. - * - * This method is used by other methods such as `size` and `iterator`. - */ - @tailrec final def readOnlySnapshot(): collection.Map[K, V] = { - val r = RDCSS_READ_ROOT() - val expmain = r.gcasRead(this) - if (RDCSS_ROOT(r, expmain, r.copyToGen(new Gen, this))) new ConcurrentTrieMap(r, null) - else readOnlySnapshot() - } - - @tailrec final override def clear() { - val r = RDCSS_READ_ROOT() - if (!RDCSS_ROOT(r, r.gcasRead(this), INode.newRootNode[K, V])) clear() - } - - final def lookup(k: K): V = { - val hc = computeHash(k) - lookuphc(k, hc).asInstanceOf[V] - } - - final override def apply(k: K): V = { - val hc = computeHash(k) - val res = lookuphc(k, hc) - if (res eq null) throw new NoSuchElementException - else res.asInstanceOf[V] - } - - final def get(k: K): Option[V] = { - val hc = computeHash(k) - Option(lookuphc(k, hc)).asInstanceOf[Option[V]] - } - - override def put(key: K, value: V): Option[V] = { - val hc = computeHash(key) - insertifhc(key, hc, value, null) - } - - final override def update(k: K, v: V) { - val hc = computeHash(k) - inserthc(k, hc, v) - } - - final def +=(kv: (K, V)) = { - update(kv._1, kv._2) - this - } - - final override def remove(k: K): Option[V] = { - val hc = computeHash(k) - removehc(k, null.asInstanceOf[V], hc) - } - - final def -=(k: K) = { - remove(k) - this - } - - def putIfAbsent(k: K, v: V): Option[V] = { - val hc = computeHash(k) - insertifhc(k, hc, v, INode.KEY_ABSENT) - } - - def remove(k: K, v: V): Boolean = { - val hc = computeHash(k) - removehc(k, v, hc).nonEmpty - } - - def replace(k: K, oldvalue: V, newvalue: V): Boolean = { - val hc = computeHash(k) - insertifhc(k, hc, newvalue, oldvalue.asInstanceOf[AnyRef]).nonEmpty - } - - def replace(k: K, v: V): Option[V] = { - val hc = computeHash(k) - insertifhc(k, hc, v, INode.KEY_PRESENT) - } - - def iterator: Iterator[(K, V)] = - if (nonReadOnly) readOnlySnapshot().iterator - else new ConcurrentTrieMapIterator(0, this) - - private def cachedSize() = { - val r = RDCSS_READ_ROOT() - r.cachedSize(this) - } - - override def size: Int = - if (nonReadOnly) readOnlySnapshot().size - else cachedSize() - - override def stringPrefix = "ConcurrentTrieMap" - -} - - -object ConcurrentTrieMap extends MutableMapFactory[ConcurrentTrieMap] { - val inodeupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[INodeBase[_, _]], classOf[MainNode[_, _]], "mainnode") - - implicit def canBuildFrom[K, V]: CanBuildFrom[Coll, (K, V), ConcurrentTrieMap[K, V]] = new MapCanBuildFrom[K, V] - - def empty[K, V]: ConcurrentTrieMap[K, V] = new ConcurrentTrieMap[K, V] - - @inline final def computeHash[K](k: K): Int = { - var hcode = k.hashCode - hcode = hcode * 0x9e3775cd - hcode = java.lang.Integer.reverseBytes(hcode) - hcode * 0x9e3775cd - } - -} - - -private[collection] class ConcurrentTrieMapIterator[K, V](var level: Int, private var ct: ConcurrentTrieMap[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] { - var stack = new Array[Array[BasicNode]](7) - var stackpos = new Array[Int](7) - var depth = -1 - var subiter: Iterator[(K, V)] = null - var current: KVNode[K, V] = null - - if (mustInit) initialize() - - def hasNext = (current ne null) || (subiter ne null) - - def next() = if (hasNext) { - var r: (K, V) = null - if (subiter ne null) { - r = subiter.next() - checkSubiter() - } else { - r = current.kvPair - advance() - } - r - } else Iterator.empty.next() - - private def readin(in: INode[K, V]) = in.gcasRead(ct) match { - case cn: CNode[K, V] => - depth += 1 - stack(depth) = cn.array - stackpos(depth) = -1 - advance() - case tn: TNode[K, V] => - current = tn - case ln: LNode[K, V] => - subiter = ln.listmap.iterator - checkSubiter() - case null => - current = null - } - - @inline private def checkSubiter() = if (!subiter.hasNext) { - subiter = null - advance() - } - - @inline private def initialize() { - assert(ct.isReadOnly) - - val r = ct.RDCSS_READ_ROOT() - readin(r) - } - - def advance(): Unit = if (depth >= 0) { - val npos = stackpos(depth) + 1 - if (npos < stack(depth).length) { - stackpos(depth) = npos - stack(depth)(npos) match { - case sn: SNode[K, V] => - current = sn - case in: INode[K, V] => - readin(in) - } - } else { - depth -= 1 - advance() - } - } else current = null - - protected def newIterator(_lev: Int, _ct: ConcurrentTrieMap[K, V], _mustInit: Boolean) = new ConcurrentTrieMapIterator[K, V](_lev, _ct, _mustInit) - - protected def dupTo(it: ConcurrentTrieMapIterator[K, V]) = { - it.level = this.level - it.ct = this.ct - it.depth = this.depth - it.current = this.current - - // these need a deep copy - Array.copy(this.stack, 0, it.stack, 0, 7) - Array.copy(this.stackpos, 0, it.stackpos, 0, 7) - - // this one needs to be evaluated - if (this.subiter == null) it.subiter = null - else { - val lst = this.subiter.toList - this.subiter = lst.iterator - it.subiter = lst.iterator - } - } - - /** Returns a sequence of iterators over subsets of this iterator. - * It's used to ease the implementation of splitters for a parallel version of the ConcurrentTrieMap. - */ - protected def subdivide(): Seq[Iterator[(K, V)]] = if (subiter ne null) { - // the case where an LNode is being iterated - val it = subiter - subiter = null - advance() - this.level += 1 - Seq(it, this) - } else if (depth == -1) { - this.level += 1 - Seq(this) - } else { - var d = 0 - while (d <= depth) { - val rem = stack(d).length - 1 - stackpos(d) - if (rem > 0) { - val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2) - stack(d) = arr1 - stackpos(d) = -1 - val it = newIterator(level + 1, ct, false) - it.stack(0) = arr2 - it.stackpos(0) = -1 - it.depth = 0 - it.advance() // <-- fix it - this.level += 1 - return Seq(this, it) - } - d += 1 - } - this.level += 1 - Seq(this) - } - - def printDebug { - println("ctrie iterator") - println(stackpos.mkString(",")) - println("depth: " + depth) - println("curr.: " + current) - println(stack.mkString("\n")) - } - -} - - -private[mutable] object RestartException extends util.control.ControlThrowable - - -/** Only used for ctrie serialization. */ -@SerialVersionUID(0L - 7237891413820527142L) -private[mutable] case object ConcurrentTrieMapSerializationEnd - - -private[mutable] object Debug { - import collection._ - - lazy val logbuffer = new java.util.concurrent.ConcurrentLinkedQueue[AnyRef] - - def log(s: AnyRef) = logbuffer.add(s) - - def flush() { - for (s <- JavaConversions.asScalaIterator(logbuffer.iterator())) Console.out.println(s.toString) - logbuffer.clear() - } - - def clear() { - logbuffer.clear() - } - -} - - - - - - - - - - diff --git a/src/library/scala/collection/mutable/Gen.java b/src/library/scala/collection/mutable/Gen.java deleted file mode 100644 index 0c9a30d198..0000000000 --- a/src/library/scala/collection/mutable/Gen.java +++ /dev/null @@ -1,18 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.collection.mutable; - - - - - - -final class Gen { -} - diff --git a/src/library/scala/collection/mutable/INodeBase.java b/src/library/scala/collection/mutable/INodeBase.java deleted file mode 100644 index 487b5cfc28..0000000000 --- a/src/library/scala/collection/mutable/INodeBase.java +++ /dev/null @@ -1,35 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.collection.mutable; - - - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - - - -abstract class INodeBase extends BasicNode { - - public static final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(INodeBase.class, MainNode.class, "mainnode"); - - public static final Object RESTART = new Object(); - - public volatile MainNode mainnode = null; - - public final Gen gen; - - public INodeBase(Gen generation) { - gen = generation; - } - - public BasicNode prev() { - return null; - } - -} \ No newline at end of file diff --git a/src/library/scala/collection/mutable/MainNode.java b/src/library/scala/collection/mutable/MainNode.java deleted file mode 100644 index 0578de676d..0000000000 --- a/src/library/scala/collection/mutable/MainNode.java +++ /dev/null @@ -1,40 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.collection.mutable; - - - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - - - -abstract class MainNode extends BasicNode { - - public static final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(MainNode.class, MainNode.class, "prev"); - - public volatile MainNode prev = null; - - public abstract int cachedSize(Object ct); - - public boolean CAS_PREV(MainNode oldval, MainNode nval) { - return updater.compareAndSet(this, oldval, nval); - } - - public void WRITE_PREV(MainNode nval) { - updater.set(this, nval); - } - - // do we need this? unclear in the javadocs... - // apparently not - volatile reads are supposed to be safe - // irregardless of whether there are concurrent ARFU updates - public MainNode READ_PREV() { - return updater.get(this); - } - -} \ No newline at end of file diff --git a/src/library/scala/collection/parallel/mutable/ParConcurrentTrieMap.scala b/src/library/scala/collection/parallel/mutable/ParConcurrentTrieMap.scala deleted file mode 100644 index a6495161ea..0000000000 --- a/src/library/scala/collection/parallel/mutable/ParConcurrentTrieMap.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.collection.parallel.mutable - - - -import scala.collection.generic._ -import scala.collection.parallel.Combiner -import scala.collection.parallel.IterableSplitter -import scala.collection.parallel.Task -import scala.collection.mutable.BasicNode -import scala.collection.mutable.TNode -import scala.collection.mutable.LNode -import scala.collection.mutable.CNode -import scala.collection.mutable.SNode -import scala.collection.mutable.INode -import scala.collection.mutable.ConcurrentTrieMap -import scala.collection.mutable.ConcurrentTrieMapIterator - - - -/** Parallel ConcurrentTrieMap collection. - * - * It has its bulk operations parallelized, but uses the snapshot operation - * to create the splitter. This means that parallel bulk operations can be - * called concurrently with the modifications. - * - * @author Aleksandar Prokopec - * @since 2.10 - */ -final class ParConcurrentTrieMap[K, V] private[collection] (private val ctrie: ConcurrentTrieMap[K, V]) -extends ParMap[K, V] - with GenericParMapTemplate[K, V, ParConcurrentTrieMap] - with ParMapLike[K, V, ParConcurrentTrieMap[K, V], ConcurrentTrieMap[K, V]] - with ParConcurrentTrieMapCombiner[K, V] - with Serializable -{ - def this() = this(new ConcurrentTrieMap) - - override def mapCompanion: GenericParMapCompanion[ParConcurrentTrieMap] = ParConcurrentTrieMap - - override def empty: ParConcurrentTrieMap[K, V] = ParConcurrentTrieMap.empty - - protected[this] override def newCombiner = ParConcurrentTrieMap.newCombiner - - override def seq = ctrie - - def splitter = new ParConcurrentTrieMapSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[ConcurrentTrieMap[K, V]], true) - - override def clear() = ctrie.clear() - - def result = this - - def get(key: K): Option[V] = ctrie.get(key) - - def put(key: K, value: V): Option[V] = ctrie.put(key, value) - - def update(key: K, value: V): Unit = ctrie.update(key, value) - - def remove(key: K): Option[V] = ctrie.remove(key) - - def +=(kv: (K, V)): this.type = { - ctrie.+=(kv) - this - } - - def -=(key: K): this.type = { - ctrie.-=(key) - this - } - - override def size = { - val in = ctrie.readRoot() - val r = in.gcasRead(ctrie) - r match { - case tn: TNode[_, _] => tn.cachedSize(ctrie) - case ln: LNode[_, _] => ln.cachedSize(ctrie) - case cn: CNode[_, _] => - tasksupport.executeAndWaitResult(new Size(0, cn.array.length, cn.array)) - cn.cachedSize(ctrie) - } - } - - override def stringPrefix = "ParConcurrentTrieMap" - - /* tasks */ - - /** Computes ConcurrentTrieMap size in parallel. */ - class Size(offset: Int, howmany: Int, array: Array[BasicNode]) extends Task[Int, Size] { - var result = -1 - def leaf(prev: Option[Int]) = { - var sz = 0 - var i = offset - val until = offset + howmany - while (i < until) { - array(i) match { - case sn: SNode[_, _] => sz += 1 - case in: INode[K, V] => sz += in.cachedSize(ctrie) - } - i += 1 - } - result = sz - } - def split = { - val fp = howmany / 2 - Seq(new Size(offset, fp, array), new Size(offset + fp, howmany - fp, array)) - } - def shouldSplitFurther = howmany > 1 - override def merge(that: Size) = result = result + that.result - } - -} - - -private[collection] class ParConcurrentTrieMapSplitter[K, V](lev: Int, ct: ConcurrentTrieMap[K, V], mustInit: Boolean) -extends ConcurrentTrieMapIterator[K, V](lev, ct, mustInit) - with IterableSplitter[(K, V)] -{ - // only evaluated if `remaining` is invoked (which is not used by most tasks) - lazy val totalsize = ct.par.size - var iterated = 0 - - protected override def newIterator(_lev: Int, _ct: ConcurrentTrieMap[K, V], _mustInit: Boolean) = new ParConcurrentTrieMapSplitter[K, V](_lev, _ct, _mustInit) - - override def shouldSplitFurther[S](coll: collection.parallel.ParIterable[S], parallelismLevel: Int) = { - val maxsplits = 3 + Integer.highestOneBit(parallelismLevel) - level < maxsplits - } - - def dup = { - val it = newIterator(0, ct, false) - dupTo(it) - it.iterated = this.iterated - it - } - - override def next() = { - iterated += 1 - super.next() - } - - def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]] - - override def isRemainingCheap = false - - def remaining: Int = totalsize - iterated -} - - -/** Only used within the `ParConcurrentTrieMap`. */ -private[mutable] trait ParConcurrentTrieMapCombiner[K, V] extends Combiner[(K, V), ParConcurrentTrieMap[K, V]] { - - def combine[N <: (K, V), NewTo >: ParConcurrentTrieMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this eq other) this else { - throw new UnsupportedOperationException("This shouldn't have been called in the first place.") - - val thiz = this.asInstanceOf[ParConcurrentTrieMap[K, V]] - val that = other.asInstanceOf[ParConcurrentTrieMap[K, V]] - val result = new ParConcurrentTrieMap[K, V] - - result ++= thiz.iterator - result ++= that.iterator - - result - } - - override def canBeShared = true - -} - - -object ParConcurrentTrieMap extends ParMapFactory[ParConcurrentTrieMap] { - - def empty[K, V]: ParConcurrentTrieMap[K, V] = new ParConcurrentTrieMap[K, V] - - def newCombiner[K, V]: Combiner[(K, V), ParConcurrentTrieMap[K, V]] = new ParConcurrentTrieMap[K, V] - - implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParConcurrentTrieMap[K, V]] = new CanCombineFromMap[K, V] - -} - - - - - - - - diff --git a/src/library/scala/collection/parallel/mutable/ParTrieMap.scala b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala new file mode 100644 index 0000000000..fa19990b91 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala @@ -0,0 +1,193 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.collection.parallel.mutable + + + +import scala.collection.generic._ +import scala.collection.parallel.Combiner +import scala.collection.parallel.IterableSplitter +import scala.collection.parallel.Task +import scala.collection.concurrent.BasicNode +import scala.collection.concurrent.TNode +import scala.collection.concurrent.LNode +import scala.collection.concurrent.CNode +import scala.collection.concurrent.SNode +import scala.collection.concurrent.INode +import scala.collection.concurrent.TrieMap +import scala.collection.concurrent.TrieMapIterator + + + +/** Parallel TrieMap collection. + * + * It has its bulk operations parallelized, but uses the snapshot operation + * to create the splitter. This means that parallel bulk operations can be + * called concurrently with the modifications. + * + * @author Aleksandar Prokopec + * @since 2.10 + */ +final class ParTrieMap[K, V] private[collection] (private val ctrie: TrieMap[K, V]) +extends ParMap[K, V] + with GenericParMapTemplate[K, V, ParTrieMap] + with ParMapLike[K, V, ParTrieMap[K, V], TrieMap[K, V]] + with ParTrieMapCombiner[K, V] + with Serializable +{ + def this() = this(new TrieMap) + + override def mapCompanion: GenericParMapCompanion[ParTrieMap] = ParTrieMap + + override def empty: ParTrieMap[K, V] = ParTrieMap.empty + + protected[this] override def newCombiner = ParTrieMap.newCombiner + + override def seq = ctrie + + def splitter = new ParTrieMapSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[TrieMap[K, V]], true) + + override def clear() = ctrie.clear() + + def result = this + + def get(key: K): Option[V] = ctrie.get(key) + + def put(key: K, value: V): Option[V] = ctrie.put(key, value) + + def update(key: K, value: V): Unit = ctrie.update(key, value) + + def remove(key: K): Option[V] = ctrie.remove(key) + + def +=(kv: (K, V)): this.type = { + ctrie.+=(kv) + this + } + + def -=(key: K): this.type = { + ctrie.-=(key) + this + } + + override def size = { + val in = ctrie.readRoot() + val r = in.gcasRead(ctrie) + r match { + case tn: TNode[_, _] => tn.cachedSize(ctrie) + case ln: LNode[_, _] => ln.cachedSize(ctrie) + case cn: CNode[_, _] => + tasksupport.executeAndWaitResult(new Size(0, cn.array.length, cn.array)) + cn.cachedSize(ctrie) + } + } + + override def stringPrefix = "ParTrieMap" + + /* tasks */ + + /** Computes TrieMap size in parallel. */ + class Size(offset: Int, howmany: Int, array: Array[BasicNode]) extends Task[Int, Size] { + var result = -1 + def leaf(prev: Option[Int]) = { + var sz = 0 + var i = offset + val until = offset + howmany + while (i < until) { + array(i) match { + case sn: SNode[_, _] => sz += 1 + case in: INode[K, V] => sz += in.cachedSize(ctrie) + } + i += 1 + } + result = sz + } + def split = { + val fp = howmany / 2 + Seq(new Size(offset, fp, array), new Size(offset + fp, howmany - fp, array)) + } + def shouldSplitFurther = howmany > 1 + override def merge(that: Size) = result = result + that.result + } + +} + + +private[collection] class ParTrieMapSplitter[K, V](lev: Int, ct: TrieMap[K, V], mustInit: Boolean) +extends TrieMapIterator[K, V](lev, ct, mustInit) + with IterableSplitter[(K, V)] +{ + // only evaluated if `remaining` is invoked (which is not used by most tasks) + lazy val totalsize = ct.par.size + var iterated = 0 + + protected override def newIterator(_lev: Int, _ct: TrieMap[K, V], _mustInit: Boolean) = new ParTrieMapSplitter[K, V](_lev, _ct, _mustInit) + + override def shouldSplitFurther[S](coll: collection.parallel.ParIterable[S], parallelismLevel: Int) = { + val maxsplits = 3 + Integer.highestOneBit(parallelismLevel) + level < maxsplits + } + + def dup = { + val it = newIterator(0, ct, false) + dupTo(it) + it.iterated = this.iterated + it + } + + override def next() = { + iterated += 1 + super.next() + } + + def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]] + + override def isRemainingCheap = false + + def remaining: Int = totalsize - iterated +} + + +/** Only used within the `ParTrieMap`. */ +private[mutable] trait ParTrieMapCombiner[K, V] extends Combiner[(K, V), ParTrieMap[K, V]] { + + def combine[N <: (K, V), NewTo >: ParTrieMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this eq other) this else { + throw new UnsupportedOperationException("This shouldn't have been called in the first place.") + + val thiz = this.asInstanceOf[ParTrieMap[K, V]] + val that = other.asInstanceOf[ParTrieMap[K, V]] + val result = new ParTrieMap[K, V] + + result ++= thiz.iterator + result ++= that.iterator + + result + } + + override def canBeShared = true + +} + + +object ParTrieMap extends ParMapFactory[ParTrieMap] { + + def empty[K, V]: ParTrieMap[K, V] = new ParTrieMap[K, V] + + def newCombiner[K, V]: Combiner[(K, V), ParTrieMap[K, V]] = new ParTrieMap[K, V] + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParTrieMap[K, V]] = new CanCombineFromMap[K, V] + +} + + + + + + + + diff --git a/test/files/jvm/serialization.check b/test/files/jvm/serialization.check index 0b8055a6b9..fa51c6a879 100644 --- a/test/files/jvm/serialization.check +++ b/test/files/jvm/serialization.check @@ -192,8 +192,8 @@ x = TreeSet(1, 2, 3) y = TreeSet(1, 2, 3) x equals y: true, y equals x: true -x = ConcurrentTrieMap(1 -> one, 2 -> two, 3 -> three) -y = ConcurrentTrieMap(1 -> one, 2 -> two, 3 -> three) +x = TrieMap(1 -> one, 2 -> two, 3 -> three) +y = TrieMap(1 -> one, 2 -> two, 3 -> three) x equals y: true, y equals x: true x = xml:src="hello" @@ -287,8 +287,8 @@ x = ParHashMap(2 -> 4, 1 -> 2) y = ParHashMap(2 -> 4, 1 -> 2) x equals y: true, y equals x: true -x = ParConcurrentTrieMap(1 -> 2, 2 -> 4) -y = ParConcurrentTrieMap(1 -> 2, 2 -> 4) +x = ParTrieMap(1 -> 2, 2 -> 4) +y = ParTrieMap(1 -> 2, 2 -> 4) x equals y: true, y equals x: true x = ParHashSet(1, 2, 3) diff --git a/test/files/jvm/serialization.scala b/test/files/jvm/serialization.scala index 1e89036f37..9c2f2acdbf 100644 --- a/test/files/jvm/serialization.scala +++ b/test/files/jvm/serialization.scala @@ -286,7 +286,8 @@ object Test3_mutable { import scala.collection.mutable.{ ArrayBuffer, ArrayBuilder, ArraySeq, ArrayStack, BitSet, DoubleLinkedList, HashMap, HashSet, History, LinkedList, ListBuffer, Publisher, Queue, - Stack, StringBuilder, WrappedArray, TreeSet, ConcurrentTrieMap} + Stack, StringBuilder, WrappedArray, TreeSet} + import scala.collection.concurrent.TrieMap // in alphabetic order try { @@ -386,9 +387,9 @@ object Test3_mutable { val _ts1: TreeSet[Int] = read(write(ts1)) check(ts1, _ts1) - // ConcurrentTrieMap - val ct1 = ConcurrentTrieMap[Int, String]() ++= Array(1 -> "one", 2 -> "two", 3 -> "three") - val _ct1: ConcurrentTrieMap[Int, String] = read(write(ct1)) + // concurrent.TrieMap + val ct1 = TrieMap[Int, String]() ++= Array(1 -> "one", 2 -> "two", 3 -> "three") + val _ct1: TrieMap[Int, String] = read(write(ct1)) check(ct1, _ct1) } catch { @@ -613,9 +614,9 @@ object Test9_parallel { val _mpm: mutable.ParHashMap[Int, Int] = read(write(mpm)) check(mpm, _mpm) - // mutable.ParConcurrentTrieMap - val mpc = mutable.ParConcurrentTrieMap(1 -> 2, 2 -> 4) - val _mpc: mutable.ParConcurrentTrieMap[Int, Int] = read(write(mpc)) + // mutable.ParTrieMap + val mpc = mutable.ParTrieMap(1 -> 2, 2 -> 4) + val _mpc: mutable.ParTrieMap[Int, Int] = read(write(mpc)) check(mpc, _mpc) // mutable.ParHashSet diff --git a/test/files/run/ctries/concmap.scala b/test/files/run/ctries/concmap.scala index bf8cc9d12f..3ec0256afb 100644 --- a/test/files/run/ctries/concmap.scala +++ b/test/files/run/ctries/concmap.scala @@ -1,7 +1,7 @@ -import collection.mutable.ConcurrentTrieMap +import collection.concurrent.TrieMap object ConcurrentMapSpec extends Spec { @@ -11,13 +11,13 @@ object ConcurrentMapSpec extends Spec { def test() { "support put" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until initsz) assert(ct.put(new Wrap(i), i) == None) for (i <- 0 until initsz) assert(ct.put(new Wrap(i), -i) == Some(i)) } "support put if absent" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until initsz) ct.update(new Wrap(i), i) for (i <- 0 until initsz) assert(ct.putIfAbsent(new Wrap(i), -i) == Some(i)) for (i <- 0 until initsz) assert(ct.putIfAbsent(new Wrap(i), -i) == Some(i)) @@ -26,7 +26,7 @@ object ConcurrentMapSpec extends Spec { } "support remove if mapped to a specific value" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until initsz) ct.update(new Wrap(i), i) for (i <- 0 until initsz) assert(ct.remove(new Wrap(i), -i - 1) == false) for (i <- 0 until initsz) assert(ct.remove(new Wrap(i), i) == true) @@ -34,7 +34,7 @@ object ConcurrentMapSpec extends Spec { } "support replace if mapped to a specific value" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until initsz) ct.update(new Wrap(i), i) for (i <- 0 until initsz) assert(ct.replace(new Wrap(i), -i - 1, -i - 2) == false) for (i <- 0 until initsz) assert(ct.replace(new Wrap(i), i, -i - 2) == true) @@ -43,7 +43,7 @@ object ConcurrentMapSpec extends Spec { } "support replace if present" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until initsz) ct.update(new Wrap(i), i) for (i <- 0 until initsz) assert(ct.replace(new Wrap(i), -i) == Some(i)) for (i <- 0 until initsz) assert(ct.replace(new Wrap(i), i) == Some(-i)) @@ -56,7 +56,7 @@ object ConcurrentMapSpec extends Spec { } "support replace if mapped to a specific value, using several threads" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] val sz = 55000 for (i <- 0 until sz) ct.update(new Wrap(i), i) @@ -89,7 +89,7 @@ object ConcurrentMapSpec extends Spec { } "support put if absent, several threads" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] val sz = 110000 class Updater(offs: Int) extends Thread { @@ -110,7 +110,7 @@ object ConcurrentMapSpec extends Spec { } "support remove if mapped to a specific value, several threads" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] val sz = 55000 for (i <- 0 until sz) ct.update(new Wrap(i), i) @@ -132,7 +132,7 @@ object ConcurrentMapSpec extends Spec { } "have all or none of the elements depending on the oddity" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] val sz = 65000 for (i <- 0 until sz) ct(new Wrap(i)) = i @@ -165,7 +165,7 @@ object ConcurrentMapSpec extends Spec { } "compute size correctly" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] val sz = 36450 for (i <- 0 until sz) ct(new Wrap(i)) = i @@ -174,7 +174,7 @@ object ConcurrentMapSpec extends Spec { } "compute size correctly in parallel" in { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] val sz = 36450 for (i <- 0 until sz) ct(new Wrap(i)) = i val pct = ct.par diff --git a/test/files/run/ctries/iterator.scala b/test/files/run/ctries/iterator.scala index dbfab6b8a9..b953a40e00 100644 --- a/test/files/run/ctries/iterator.scala +++ b/test/files/run/ctries/iterator.scala @@ -3,7 +3,7 @@ import collection._ -import collection.mutable.ConcurrentTrieMap +import collection.concurrent.TrieMap @@ -11,7 +11,7 @@ object IteratorSpec extends Spec { def test() { "work for an empty trie" in { - val ct = new ConcurrentTrieMap + val ct = new TrieMap val it = ct.iterator it.hasNext shouldEqual (false) @@ -19,7 +19,7 @@ object IteratorSpec extends Spec { } def nonEmptyIteratorCheck(sz: Int) { - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct.put(new Wrap(i), i) val it = ct.iterator @@ -84,7 +84,7 @@ object IteratorSpec extends Spec { } def nonEmptyCollideCheck(sz: Int) { - val ct = new ConcurrentTrieMap[DumbHash, Int] + val ct = new TrieMap[DumbHash, Int] for (i <- 0 until sz) ct.put(new DumbHash(i), i) val it = ct.iterator @@ -144,7 +144,7 @@ object IteratorSpec extends Spec { val W = 15 val S = 5 val checks = 5 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct.put(new Wrap(i), i) class Modifier extends Thread { @@ -156,7 +156,7 @@ object IteratorSpec extends Spec { } } - def consistentIteration(ct: ConcurrentTrieMap[Wrap, Int], checks: Int) { + def consistentIteration(ct: TrieMap[Wrap, Int], checks: Int) { class Iter extends Thread { override def run() { val snap = ct.readOnlySnapshot() @@ -185,7 +185,7 @@ object IteratorSpec extends Spec { val sgroupsize = 10 val sgroupnum = 5 val removerslowdown = 50 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct.put(new Wrap(i), i) class Remover extends Thread { @@ -227,7 +227,7 @@ object IteratorSpec extends Spec { val sgroupsize = 10 val sgroupnum = 10 val inserterslowdown = 50 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] class Inserter extends Thread { override def run() { @@ -265,7 +265,7 @@ object IteratorSpec extends Spec { "work on a yet unevaluated snapshot" in { val sz = 50000 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct.update(new Wrap(i), i) val snap = ct.snapshot() @@ -276,7 +276,7 @@ object IteratorSpec extends Spec { "be duplicated" in { val sz = 50 - val ct = collection.parallel.mutable.ParConcurrentTrieMap((0 until sz) zip (0 until sz): _*) + val ct = collection.parallel.mutable.ParTrieMap((0 until sz) zip (0 until sz): _*) val it = ct.splitter for (_ <- 0 until (sz / 2)) it.next() val dupit = it.dup diff --git a/test/files/run/ctries/lnode.scala b/test/files/run/ctries/lnode.scala index e480795956..92a31088e5 100644 --- a/test/files/run/ctries/lnode.scala +++ b/test/files/run/ctries/lnode.scala @@ -1,7 +1,7 @@ -import collection.mutable.ConcurrentTrieMap +import collection.concurrent.TrieMap object LNodeSpec extends Spec { @@ -11,19 +11,19 @@ object LNodeSpec extends Spec { def test() { "accept elements with the same hash codes" in { - val ct = new ConcurrentTrieMap[DumbHash, Int] + val ct = new TrieMap[DumbHash, Int] for (i <- 0 until initsz) ct.update(new DumbHash(i), i) } "lookup elements with the same hash codes" in { - val ct = new ConcurrentTrieMap[DumbHash, Int] + val ct = new TrieMap[DumbHash, Int] for (i <- 0 until initsz) ct.update(new DumbHash(i), i) for (i <- 0 until initsz) assert(ct.get(new DumbHash(i)) == Some(i)) for (i <- initsz until secondsz) assert(ct.get(new DumbHash(i)) == None) } "remove elements with the same hash codes" in { - val ct = new ConcurrentTrieMap[DumbHash, Int] + val ct = new TrieMap[DumbHash, Int] for (i <- 0 until initsz) ct.update(new DumbHash(i), i) for (i <- 0 until initsz) { val remelem = ct.remove(new DumbHash(i)) @@ -33,7 +33,7 @@ object LNodeSpec extends Spec { } "put elements with the same hash codes if absent" in { - val ct = new ConcurrentTrieMap[DumbHash, Int] + val ct = new TrieMap[DumbHash, Int] for (i <- 0 until initsz) ct.put(new DumbHash(i), i) for (i <- 0 until initsz) assert(ct.lookup(new DumbHash(i)) == i) for (i <- 0 until initsz) assert(ct.putIfAbsent(new DumbHash(i), i) == Some(i)) @@ -42,7 +42,7 @@ object LNodeSpec extends Spec { } "replace elements with the same hash codes" in { - val ct = new ConcurrentTrieMap[DumbHash, Int] + val ct = new TrieMap[DumbHash, Int] for (i <- 0 until initsz) assert(ct.put(new DumbHash(i), i) == None) for (i <- 0 until initsz) assert(ct.lookup(new DumbHash(i)) == i) for (i <- 0 until initsz) assert(ct.replace(new DumbHash(i), -i) == Some(i)) @@ -51,7 +51,7 @@ object LNodeSpec extends Spec { } "remove elements with the same hash codes if mapped to a specific value" in { - val ct = new ConcurrentTrieMap[DumbHash, Int] + val ct = new TrieMap[DumbHash, Int] for (i <- 0 until initsz) assert(ct.put(new DumbHash(i), i) == None) for (i <- 0 until initsz) assert(ct.remove(new DumbHash(i), i) == true) } diff --git a/test/files/run/ctries/snapshot.scala b/test/files/run/ctries/snapshot.scala index 3c816130b3..5fe77d445b 100644 --- a/test/files/run/ctries/snapshot.scala +++ b/test/files/run/ctries/snapshot.scala @@ -3,7 +3,7 @@ import collection._ -import collection.mutable.ConcurrentTrieMap +import collection.concurrent.TrieMap @@ -11,11 +11,11 @@ object SnapshotSpec extends Spec { def test() { "support snapshots" in { - val ctn = new ConcurrentTrieMap + val ctn = new TrieMap ctn.snapshot() ctn.readOnlySnapshot() - val ct = new ConcurrentTrieMap[Int, Int] + val ct = new TrieMap[Int, Int] for (i <- 0 until 100) ct.put(i, i) ct.snapshot() ct.readOnlySnapshot() @@ -24,7 +24,7 @@ object SnapshotSpec extends Spec { "empty 2 quiescent snapshots in isolation" in { val sz = 4000 - class Worker(trie: ConcurrentTrieMap[Wrap, Int]) extends Thread { + class Worker(trie: TrieMap[Wrap, Int]) extends Thread { override def run() { for (i <- 0 until sz) { assert(trie.remove(new Wrap(i)) == Some(i)) @@ -35,7 +35,7 @@ object SnapshotSpec extends Spec { } } - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct.put(new Wrap(i), i) val snapt = ct.snapshot() @@ -96,7 +96,7 @@ object SnapshotSpec extends Spec { } // traverses the trie `rep` times and modifies each entry - class Modifier(trie: ConcurrentTrieMap[Wrap, Int], index: Int, rep: Int, sz: Int) extends Thread { + class Modifier(trie: TrieMap[Wrap, Int], index: Int, rep: Int, sz: Int) extends Thread { setName("Modifier %d".format(index)) override def run() { @@ -110,7 +110,7 @@ object SnapshotSpec extends Spec { } // removes all the elements from the trie - class Remover(trie: ConcurrentTrieMap[Wrap, Int], index: Int, totremovers: Int, sz: Int) extends Thread { + class Remover(trie: TrieMap[Wrap, Int], index: Int, totremovers: Int, sz: Int) extends Thread { setName("Remover %d".format(index)) override def run() { @@ -123,7 +123,7 @@ object SnapshotSpec extends Spec { val N = 100 val W = 10 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct(new Wrap(i)) = i val readonly = ct.readOnlySnapshot() val threads = for (i <- 0 until W) yield new Modifier(ct, i, N, sz) @@ -141,7 +141,7 @@ object SnapshotSpec extends Spec { val W = 100 val S = 5000 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct(new Wrap(i)) = i val threads = for (i <- 0 until W) yield new Remover(ct, i, W, sz) @@ -156,7 +156,7 @@ object SnapshotSpec extends Spec { val W = 10 val S = 7000 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct(new Wrap(i)) = i val threads = for (i <- 0 until W) yield new Modifier(ct, i, N, sz) @@ -165,7 +165,7 @@ object SnapshotSpec extends Spec { threads.foreach(_.join()) } - def consistentNonReadOnly(name: String, trie: ConcurrentTrieMap[Wrap, Int], sz: Int, N: Int) { + def consistentNonReadOnly(name: String, trie: TrieMap[Wrap, Int], sz: Int, N: Int) { @volatile var e: Exception = null // reads possible entries once and stores them @@ -223,7 +223,7 @@ object SnapshotSpec extends Spec { val W = 10 val S = 400 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct(new Wrap(i)) = i val threads = for (i <- 0 until W) yield new Modifier(ct, i, N, sz) @@ -241,7 +241,7 @@ object SnapshotSpec extends Spec { val S = 10 val modifytimes = 1200 val snaptimes = 600 - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct(new Wrap(i)) = i class Snapshooter extends Thread { diff --git a/test/files/scalacheck/Ctrie.scala b/test/files/scalacheck/Ctrie.scala index b9d71b88a3..736bf938bc 100644 --- a/test/files/scalacheck/Ctrie.scala +++ b/test/files/scalacheck/Ctrie.scala @@ -5,7 +5,7 @@ import org.scalacheck._ import Prop._ import org.scalacheck.Gen._ import collection._ -import collection.mutable.ConcurrentTrieMap +import collection.concurrent.TrieMap @@ -16,7 +16,7 @@ case class Wrap(i: Int) { /** A check mainly oriented towards checking snapshot correctness. */ -object Test extends Properties("ConcurrentTrieMap") { +object Test extends Properties("concurrent.TrieMap") { /* generators */ @@ -102,7 +102,7 @@ object Test extends Properties("ConcurrentTrieMap") { (numThreads, numElems) => val p = 3 //numThreads val sz = 102 //numElems - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] // checker val checker = spawn { @@ -134,7 +134,7 @@ object Test extends Properties("ConcurrentTrieMap") { property("update") = forAll(sizes) { (n: Int) => - val ct = new ConcurrentTrieMap[Int, Int] + val ct = new TrieMap[Int, Int] for (i <- 0 until n) ct(i) = i (0 until n) forall { case i => ct(i) == i @@ -143,7 +143,7 @@ object Test extends Properties("ConcurrentTrieMap") { property("concurrent update") = forAll(threadCountsAndSizes) { case (p, sz) => - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] inParallel(p) { idx => @@ -158,7 +158,7 @@ object Test extends Properties("ConcurrentTrieMap") { property("concurrent remove") = forAll(threadCounts, sizes) { (p, sz) => - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] for (i <- 0 until sz) ct(Wrap(i)) = i inParallel(p) { @@ -174,7 +174,7 @@ object Test extends Properties("ConcurrentTrieMap") { property("concurrent putIfAbsent") = forAll(threadCounts, sizes) { (p, sz) => - val ct = new ConcurrentTrieMap[Wrap, Int] + val ct = new TrieMap[Wrap, Int] val results = inParallel(p) { idx => diff --git a/test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala b/test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala index a04c0ff8d4..e141c398fd 100644 --- a/test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala @@ -19,21 +19,21 @@ abstract class ParallelConcurrentTrieMapCheck[K, V](tp: String) extends Parallel // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) - type CollType = ParConcurrentTrieMap[K, V] + type CollType = ParTrieMap[K, V] def isCheckingViews = false def hasStrictOrder = false def ofSize(vals: Seq[Gen[(K, V)]], sz: Int) = { - val ct = new mutable.ConcurrentTrieMap[K, V] + val ct = new concurrent.TrieMap[K, V] val gen = vals(rnd.nextInt(vals.size)) for (i <- 0 until sz) ct += sample(gen) ct } def fromTraversable(t: Traversable[(K, V)]) = { - val pct = new ParConcurrentTrieMap[K, V] + val pct = new ParTrieMap[K, V] var i = 0 for (kv <- t.toList) { pct += kv @@ -58,7 +58,7 @@ with PairValues[Int, Int] def koperators = intoperators override def printDataStructureDebugInfo(ds: AnyRef) = ds match { - case pm: ParConcurrentTrieMap[k, v] => + case pm: ParTrieMap[k, v] => println("Mutable parallel ctrie") case _ => println("could not match data structure type: " + ds.getClass) -- cgit v1.2.3