diff options
Diffstat (limited to 'src/library/scala/collection/concurrent')
-rw-r--r-- | src/library/scala/collection/concurrent/Map.scala | 11 | ||||
-rw-r--r-- | src/library/scala/collection/concurrent/TrieMap.scala | 50 |
2 files changed, 51 insertions, 10 deletions
diff --git a/src/library/scala/collection/concurrent/Map.scala b/src/library/scala/collection/concurrent/Map.scala index cfb567abe9..f27dfd57fc 100644 --- a/src/library/scala/collection/concurrent/Map.scala +++ b/src/library/scala/collection/concurrent/Map.scala @@ -86,4 +86,15 @@ trait Map[A, B] extends scala.collection.mutable.Map[A, B] { * @return `Some(v)` if the given key was previously mapped to some value `v`, or `None` otherwise */ def replace(k: A, v: B): Option[B] + + override def getOrElseUpdate(key: A, op: =>B): B = get(key) match { + case Some(v) => v + case None => + val v = op + putIfAbsent(key, v) match { + case Some(nv) => nv + case None => v + } + } + } diff --git a/src/library/scala/collection/concurrent/TrieMap.scala b/src/library/scala/collection/concurrent/TrieMap.scala index bcfea7a463..fe0b5c4f0e 100644 --- a/src/library/scala/collection/concurrent/TrieMap.scala +++ b/src/library/scala/collection/concurrent/TrieMap.scala @@ -11,13 +11,11 @@ package collection package concurrent import java.util.concurrent.atomic._ -import scala.collection.immutable.{ ListMap => ImmutableListMap } import scala.collection.parallel.mutable.ParTrieMap import scala.util.hashing.Hashing import scala.util.control.ControlThrowable import generic._ import scala.annotation.tailrec -import scala.annotation.switch private[collection] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INodeBase[K, V](g) { import INodeBase._ @@ -471,7 +469,7 @@ private[collection] final class CNode[K, V](val bitmap: Int, val array: Array[Ba val offset = if (array.length > 0) //util.Random.nextInt(array.length) /* <-- benchmarks show that this causes observable contention */ - scala.concurrent.forkjoin.ThreadLocalRandom.current.nextInt(0, array.length) + java.util.concurrent.ThreadLocalRandom.current.nextInt(0, array.length) else 0 while (i < array.length) { val pos = (i + offset) % array.length @@ -641,7 +639,8 @@ extends scala.collection.concurrent.Map[K, V] private var rootupdater = rtupd def hashing = hashingobj def equality = equalityobj - @volatile var root = r + @deprecated("this field will be made private", "2.12.0") + @volatile /*private*/ var root = r def this(hashf: Hashing[K], ef: Equiv[K]) = this( INode.newRootNode, @@ -685,11 +684,14 @@ extends scala.collection.concurrent.Map[K, V] } while (obj != TrieMapSerializationEnd) } - def CAS_ROOT(ov: AnyRef, nv: AnyRef) = rootupdater.compareAndSet(this, ov, nv) + @deprecated("this method will be made private", "2.12.0") + /*private*/ def CAS_ROOT(ov: AnyRef, nv: AnyRef) = rootupdater.compareAndSet(this, ov, nv) - def readRoot(abort: Boolean = false): INode[K, V] = RDCSS_READ_ROOT(abort) + @deprecated("this method will be made private", "2.12.0") + /*private[collection]*/ def readRoot(abort: Boolean = false): INode[K, V] = RDCSS_READ_ROOT(abort) - def RDCSS_READ_ROOT(abort: Boolean = false): INode[K, V] = { + @deprecated("this method will be made private", "2.12.0") + /*private[concurrent]*/ def RDCSS_READ_ROOT(abort: Boolean = false): INode[K, V] = { val r = /*READ*/root r match { case in: INode[K, V] => in @@ -884,7 +886,7 @@ extends scala.collection.concurrent.Map[K, V] * * If the specified mapping function throws an exception, * that exception is rethrown. - * + * * Note: This method will invoke op at most once. * However, `op` may be invoked without the result being added to the map if * a concurrent process is also trying to add a value corresponding to the @@ -930,6 +932,33 @@ extends scala.collection.concurrent.Map[K, V] if (nonReadOnly) readOnlySnapshot().iterator else new TrieMapIterator(0, this) + //////////////////////////////////////////////////////////////////////////// + // + // SI-10177 These methods need overrides as the inherited implementations + // call `.iterator` more than once, which doesn't guarantee a coherent + // view of the data if there is a concurrent writer + // Note that the we don't need overrides for keysIterator or valuesIterator + // TrieMapTest validates the behaviour. + override def values: Iterable[V] = { + if (nonReadOnly) readOnlySnapshot().values + else super.values + } + override def keySet: Set[K] = { + if (nonReadOnly) readOnlySnapshot().keySet + else super.keySet + } + override def filterKeys(p: K => Boolean): collection.Map[K, V] = { + if (nonReadOnly) readOnlySnapshot().filterKeys(p) + else super.filterKeys(p) + } + override def mapValues[W](f: V => W): collection.Map[K, W] = { + if (nonReadOnly) readOnlySnapshot().mapValues(f) + else super.mapValues(f) + } + // END extra overrides + /////////////////////////////////////////////////////////////////// + + private def cachedSize() = { val r = RDCSS_READ_ROOT() r.cachedSize(this) @@ -1083,6 +1112,7 @@ private[collection] class TrieMapIterator[K, V](var level: Int, private var ct: Seq(this) } + @deprecated("this method will be removed", "2.12.0") def printDebug() { println("ctrie iterator") println(stackpos.mkString(",")) @@ -1103,14 +1133,14 @@ private[concurrent] case object TrieMapSerializationEnd private[concurrent] object Debug { - import scala.collection._ + import JavaConverters._ lazy val logbuffer = new java.util.concurrent.ConcurrentLinkedQueue[AnyRef] def log(s: AnyRef) = logbuffer.add(s) def flush() { - for (s <- JavaConversions.asScalaIterator(logbuffer.iterator())) Console.out.println(s.toString) + for (s <- logbuffer.iterator().asScala) Console.out.println(s.toString) logbuffer.clear() } |