diff options
author | Aleksandar Prokopec <axel22@gmail.com> | 2012-02-02 14:05:26 +0100 |
---|---|---|
committer | Aleksandar Prokopec <axel22@gmail.com> | 2012-02-02 14:05:26 +0100 |
commit | c3d19c58d8a94b7232718321f6994c001257cc96 (patch) | |
tree | e334024c4ff56fb7e98c48b47fbffe2d5c8b4fca /src | |
parent | 5fe2d8b109abf3ff3e2d82dd4f248200846795c3 (diff) | |
download | scala-c3d19c58d8a94b7232718321f6994c001257cc96.tar.gz scala-c3d19c58d8a94b7232718321f6994c001257cc96.tar.bz2 scala-c3d19c58d8a94b7232718321f6994c001257cc96.zip |
Incorporate Ctrie into standard library.
Implemented Ctrie serialization. Improved hashcode computation.
Diffstat (limited to 'src')
-rw-r--r-- | src/library/scala/collection/mutable/Ctrie.scala | 103 |
1 files changed, 92 insertions, 11 deletions
diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala index d02e0ce178..84cceb44eb 100644 --- a/src/library/scala/collection/mutable/Ctrie.scala +++ b/src/library/scala/collection/mutable/Ctrie.scala @@ -6,12 +6,14 @@ ** |/ ** \* */ -package scala.collection.mutable +package scala.collection +package mutable import java.util.concurrent.atomic._ import collection.immutable.{ ListMap => ImmutableListMap } +import generic._ import annotation.tailrec import annotation.switch @@ -425,7 +427,7 @@ extends MainNode[K, V] { if (updmap.size > 1) new LNode(updmap) else { val (k, v) = updmap.iterator.next - new TNode(k, v, k.hashCode) // create it tombed so that it gets compressed on subsequent accesses + new TNode(k, v, Ctrie.computeHash(k)) // create it tombed so that it gets compressed on subsequent accesses } } def get(k: K) = listmap.get(k) @@ -568,10 +570,26 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai } -class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef]) +/** A concurrent hash-trie or Ctrie is a concurrent thread-safe lock-free + * implementation of a hash array mapped trie. It is used to implement the + * concurrent map abstraction. It has particularly scalable concurrent insert + * and remove operations and is memory-efficient. It supports O(1), atomic, + * lock-free snapshots which are used to implement linearizable lock-free size, + * iterator and clear operations. The cost of evaluating the (lazy) snapshot is + * distributed across subsequent updates, thus making snapshot evaluation horizontally scalable. + * + * @author Aleksandar Prokopec + * @since 2.10 + */ +@SerialVersionUID(0L - 6402774413839597105L) +final class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef]) extends ConcurrentMap[K, V] + with MapLike[K, V, Ctrie[K, V]] + with Serializable { - private val rootupdater = rtupd + import Ctrie.computeHash + + private var rootupdater = rtupd @volatile var root = r def this() = this( @@ -581,6 +599,31 @@ extends ConcurrentMap[K, V] /* internal methods */ + private def writeObject(out: java.io.ObjectOutputStream) { + val it = iterator + while (it.hasNext) { + val (k, v) = it.next() + out.writeObject(k) + out.writeObject(v) + } + out.writeObject(CtrieSerializationEnd) + } + + private def readObject(in: java.io.ObjectInputStream) { + root = INode.newRootNode + rootupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[Ctrie[K, V]], classOf[AnyRef], "root") + + var obj: AnyRef = null + do { + obj = in.readObject() + if (obj != CtrieSerializationEnd) { + val k = obj.asInstanceOf[K] + val v = in.readObject().asInstanceOf[V] + update(k, v) + } + } while (obj != CtrieSerializationEnd) + } + @inline final def CAS_ROOT(ov: AnyRef, nv: AnyRef) = rootupdater.compareAndSet(this, ov, nv) @inline final def RDCSS_READ_ROOT(abort: Boolean = false): INode[K, V] = { @@ -623,10 +666,6 @@ extends ConcurrentMap[K, V] } else false } - @inline private def computeHash(k: K): Int = { - k.hashCode - } - @tailrec private def inserthc(k: K, hc: Int, v: V) { val r = RDCSS_READ_ROOT() if (!r.rec_insert(k, v, hc, 0, null, r.gen, this)) inserthc(k, hc, v) @@ -647,7 +686,7 @@ extends ConcurrentMap[K, V] else res } - /* + /* slower: //@tailrec private def lookuphc(k: K, hc: Int): AnyRef = { val r = RDCSS_READ_ROOT() @@ -671,10 +710,21 @@ extends ConcurrentMap[K, V] /* public methods */ + override def empty: Ctrie[K, V] = new Ctrie[K, V] + @inline final def isReadOnly = rootupdater eq null @inline final def nonReadOnly = rootupdater ne null + /** Returns a snapshot of this Ctrie. + * This operation is lock-free and linearizable. + * + * The snapshot is lazily updated - the first time some branch + * in the snapshot or this Ctrie are accessed, they are rewritten. + * This means that the work of rebuilding both the snapshot and this + * Ctrie is distributed across all the threads doing updates or accesses + * subsequent to the snapshot creation. + */ @tailrec final def snapshot(): Ctrie[K, V] = { val r = RDCSS_READ_ROOT() val expmain = r.GCAS_READ(this) @@ -682,6 +732,18 @@ extends ConcurrentMap[K, V] else snapshot() } + /** Returns a read-only snapshot of this Ctrie. + * This operation is lock-free and linearizable. + * + * The snapshot is lazily updated - the first time some branch + * of this Ctrie are accessed, it is rewritten. The work of creating + * the snapshot is thus distributed across subsequent updates + * and accesses on this Ctrie by all threads. + * Note that the snapshot itself is never rewritten unlike when calling + * the `snapshot` method, but the obtained snapshot cannot be modified. + * + * This method is used by other methods such as `size` and `iterator`. + */ @tailrec final def readOnlySnapshot(): collection.Map[K, V] = { val r = RDCSS_READ_ROOT() val expmain = r.GCAS_READ(this) @@ -760,11 +822,25 @@ extends ConcurrentMap[K, V] if (nonReadOnly) readOnlySnapshot().iterator else new CtrieIterator(this) + override def stringPrefix = "Ctrie" + } -object Ctrie { - val inodeupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[INodeBase[_, _]], classOf[AnyRef], "mainnode") +object Ctrie extends MutableMapFactory[Ctrie] { + val inodeupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[INodeBase[_, _]], classOf[MainNode[_, _]], "mainnode") + + implicit def canBuildFrom[K, V]: CanBuildFrom[Coll, (K, V), Ctrie[K, V]] = new MapCanBuildFrom[K, V] + + def empty[K, V]: Ctrie[K, V] = new Ctrie[K, V] + + @inline final def computeHash[K](k: K): Int = { + var hcode = k.hashCode + hcode = hcode * 0x9e3775cd + hcode = java.lang.Integer.reverseBytes(hcode) + hcode * 0x9e3775cd + } + } @@ -877,6 +953,11 @@ private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = private[mutable] object RestartException extends util.control.ControlThrowable +/** Only used for ctrie serialization. */ +@SerialVersionUID(0L - 7237891413820527142L) +private[mutable] case object CtrieSerializationEnd + + private[mutable] object Debug { import collection._ |