aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/partial/PartialResult.scala
blob: 7095bc8ca1bbf4d134a3ce01b3cd1826e3a93722 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package spark.partial

class PartialResult[R](initialVal: R, isFinal: Boolean) {
  private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
  private var failure: Option[Exception] = None
  private var completionHandler: Option[R => Unit] = None
  private var failureHandler: Option[Exception => Unit] = None

  def initialValue: R = initialVal

  def isInitialValueFinal: Boolean = isFinal

  /**
   * Blocking method to wait for and return the final value.
   */
  def getFinalValue(): R = synchronized {
    while (finalValue == None && failure == None) {
      this.wait()
    }
    if (finalValue != None) {
      return finalValue.get
    } else {
      throw failure.get
    }
  }

  /** 
   * Set a handler to be called when this PartialResult completes. Only one completion handler
   * is supported per PartialResult.
   */
  def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
    if (completionHandler != None) {
      throw new UnsupportedOperationException("onComplete cannot be called twice")
    }
    completionHandler = Some(handler)
    if (finalValue != None) {
      // We already have a final value, so let's call the handler
      handler(finalValue.get)
    }
    return this
  }

  /** 
   * Set a handler to be called if this PartialResult's job fails. Only one failure handler
   * is supported per PartialResult.
   */
  def onFail(handler: Exception => Unit): Unit = synchronized {
    if (failureHandler != None) {
      throw new UnsupportedOperationException("onFail cannot be called twice")
    }
    failureHandler = Some(handler)
    if (failure != None) {
      // We already have a failure, so let's call the handler
      handler(failure.get)
    }
  }

  private[spark] def setFinalValue(value: R): Unit = synchronized {
    if (finalValue != None) {
      throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
    }
    finalValue = Some(value)
    // Call the completion handler if it was set
    completionHandler.foreach(h => h(value))
    // Notify any threads that may be calling getFinalValue()
    this.notifyAll()
  }

  private[spark] def setFailure(exception: Exception): Unit = synchronized {
    if (failure != None) {
      throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
    }
    failure = Some(exception)
    // Call the failure handler if it was set
    failureHandler.foreach(h => h(exception))
    // Notify any threads that may be calling getFinalValue()
    this.notifyAll()
  }

  override def toString: String = synchronized {
    finalValue match {
      case Some(value) => "(final: " + value + ")"
      case None => "(partial: " + initialValue + ")"
    }
  }
}