summaryrefslogtreecommitdiff
path: root/test/files/pos/Transactions.scala
blob: 32889f8180f95bf5363620f211e80266160f6d04 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package scala.concurrent1

class AbortException extends RuntimeException

object Transaction {
  private var cnt = 0L
  def nextId: Long = synchronized {
    cnt += 1; cnt
  }

  // Transaction status constants
  val Running = 0
  val Committed = 1
  val Abortable = 2
  val Aborted = 3
  val Compound = 4

  def atomic[T](b: Transaction => T): Option[T] =
    (new Transaction).run(b)
}

class Transaction {
  var status: Int = _

  var id: Long = _  // only for real transactions

  var head: Transaction = this
  var next: Transaction = null

  def this(hd: Transaction, tl: Transaction) = { this(); this.head = head; this.next = next }

  def makeAbort() = synchronized {
    while (status != Transaction.Aborted && status != Transaction.Committed) {
      status = Transaction.Abortable
      wait()
    }
  }
  private def abort() = synchronized { status = Transaction.Aborted; notifyAll() }
  private def commit() = synchronized { status = Transaction.Committed; notifyAll() }
  def run[T](b: Transaction => T): Option[T] =
    try {
      status = Transaction.Running
      id = Transaction.nextId
      val result = Some(b(this))
      commit()
      result
    } catch {
      case ex: AbortException => abort(); None
      case ex: Throwable => abort(); throw ex
    }

}

trait Transactional {

  /** create a new snapshot */
  def checkPoint(): Unit

  /** copy back snapshot */
  def rollBack(): Unit

  var readers: Transaction
  var writer: Transaction

  def currentWriter(): Transaction = null
    if (writer == null) null
    else if (writer.status == Transaction.Running) writer
    else {
      if (writer.status != Transaction.Committed) rollBack();
      writer = null;
      null
    }

  def getter(thisTrans: Transaction) {
    if (writer == thisTrans) return
    var r = readers
    while (r != null && r.head.status != Transaction.Running) { r = r.next; readers = r }
    while (r != null) {
      if (r.head == thisTrans) return
      val last = r
      r = r.next
      while (r != null && r.head.status != Transaction.Running) { r = r.next; last.next = r }
    }
    synchronized {
      if (thisTrans.status == Transaction.Abortable) throw new AbortException
      val w = currentWriter()
      if (w != null)
        if (thisTrans.id < w.id) { w.makeAbort(); rollBack(); writer = null }
        else throw new AbortException
      readers = if (readers == null) thisTrans else new Transaction(thisTrans, readers)
    }
  }

  def setter(thisTrans: Transaction) {
    if (writer == thisTrans) return
    synchronized {
      val w = currentWriter()
      if (w != null)
        if (thisTrans.id < w.id) { w.makeAbort(); rollBack() }
        else throw new AbortException
      var r = readers
      while (r != null && r.head.status != Transaction.Running) { r = r.next; readers = r }
      while (r != null) {
        if (r.id < thisTrans.id) throw new AbortException
        else w.makeAbort()
        val last = r
        r = r.next
        while (r != null && r.head.status != Transaction.Running) { r = r.next; last.next = r }
      }
      checkPoint()
    }
  }
}