aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/util/TimeStampedHashMap.scala
blob: 92dfaa6e6f3d1c8491df5ee933d634aa4199f2bc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package spark.util

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions
import scala.collection.mutable.Map
import spark.scheduler.MapStatus

/**
 * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
 * time stamp along with each key-value pair. Key-value pairs that are older than a particular
 * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
 * replacement of scala.collection.mutable.HashMap.
 */
class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
  val internalMap = new ConcurrentHashMap[A, (B, Long)]()

  def get(key: A): Option[B] = {
    val value = internalMap.get(key)
    if (value != null) Some(value._1) else None
  }

  def iterator: Iterator[(A, B)] = {
    val jIterator = internalMap.entrySet().iterator()
    JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1))
  }

  override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
    val newMap = new TimeStampedHashMap[A, B1]
    newMap.internalMap.putAll(this.internalMap)
    newMap.internalMap.put(kv._1, (kv._2, currentTime))
    newMap
  }

  override def - (key: A): Map[A, B] = {
    val newMap = new TimeStampedHashMap[A, B]
    newMap.internalMap.putAll(this.internalMap)
    newMap.internalMap.remove(key)
    newMap
  }

  override def += (kv: (A, B)): this.type = {
    internalMap.put(kv._1, (kv._2, currentTime))
    this
  }

  // Should we return previous value directly or as Option ?
  def putIfAbsent(key: A, value: B): Option[B] = {
    val prev = internalMap.putIfAbsent(key, (value, currentTime))
    if (prev != null) Some(prev._1) else None
  }


  override def -= (key: A): this.type = {
    internalMap.remove(key)
    this
  }

  override def update(key: A, value: B) {
    this += ((key, value))
  }

  override def apply(key: A): B = {
    val value = internalMap.get(key)
    if (value == null) throw new NoSuchElementException()
    value._1
  }

  override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
    JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
  }

  override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()

  override def size: Int = internalMap.size

  override def foreach[U](f: ((A, B)) => U) {
    val iterator = internalMap.entrySet().iterator()
    while(iterator.hasNext) {
      val entry = iterator.next()
      val kv = (entry.getKey, entry.getValue._1)
      f(kv)
    }
  }

  /**
   * Removes old key-value pairs that have timestamp earlier than `threshTime`
   */
  def clearOldValues(threshTime: Long) {
    val iterator = internalMap.entrySet().iterator()
    while(iterator.hasNext) {
      val entry = iterator.next()
      if (entry.getValue._2 < threshTime) {
        logDebug("Removing key " + entry.getKey)
        iterator.remove()
      }
    }
  }

  private def currentTime: Long = System.currentTimeMillis()

}