summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/mutable/Publisher.scala
blob: 22bbea16efc3010e8095714c0300dd4a52fe86af (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */



package scala
package collection
package mutable


/** `Publisher[A,This]` objects publish events of type `A`
 *  to all registered subscribers. When subscribing, a subscriber may specify
 *  a filter which can be used to constrain the number of events sent to the
 *  subscriber. Subscribers may suspend their subscription, or reactivate a
 *  suspended subscription. Class `Publisher` is typically used
 *  as a mixin. The abstract type `Pub` models the type of the publisher itself.
 *
 *  @tparam Evt      type of the published event.
 *
 *  @author  Matthias Zenger
 *  @author  Martin Odersky
 *  @version 2.8
 *  @since   1
 */
trait Publisher[Evt] {

  type Pub <: Publisher[Evt]
  type Sub = Subscriber[Evt, Pub]
  type Filter = Evt => Boolean

  /** The publisher itself of type `Pub`. Implemented by a cast from `this` here.
   *  Needs to be overridden if the actual publisher is different from `this`.
   */
  protected val self: Pub = this.asInstanceOf[Pub]

  private val filters = new HashMap[Sub, Set[Filter]] with MultiMap[Sub, Filter]
  private val suspended = new HashSet[Sub]

  def subscribe(sub: Sub) { subscribe(sub, event => true) }
  def subscribe(sub: Sub, filter: Filter) { filters.addBinding(sub, filter) }
  def suspendSubscription(sub: Sub) { suspended += sub }
  def activateSubscription(sub: Sub) { suspended -= sub }
  def removeSubscription(sub: Sub) { filters -= sub }
  def removeSubscriptions() { filters.clear() }

  protected def publish(event: Evt) {
    filters.keys.foreach(sub =>
      if (!suspended.contains(sub) &&
          filters.entryExists(sub, p => p(event)))
        sub.notify(self, event)
    )
  }

  /** Checks if two publishers are structurally identical.
   *
   *  @return true, iff both publishers contain the same sequence of elements.
   */
  override def equals(obj: Any): Boolean = obj match {
    case that: Publisher[_] => filters == that.filters && suspended == that.suspended
    case _                  => false
  }
}