aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
blob: 3bf9192032c26f979a18e593e881d0bf71cf4eca (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package xyz.driver.pdsuicommon.concurrent

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
import xyz.driver.pdsuicommon.db._
import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository
import xyz.driver.pdsuicommon.logging._

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

object BridgeUploadQueueRepositoryAdapter {

  /**
    * Defines how we work with queue, when an user attempts to remove/tryRetry an item.
    */
  sealed trait Strategy {

    def onComplete: Strategy.OnComplete

    def on(attempt: Int): Strategy.OnAttempt

  }

  object Strategy {

    /**
      * Works forever, but has a limit for intervals.
      */
    final case class LimitExponential(startInterval: FiniteDuration,
                                      intervalFactor: Double,
                                      maxInterval: FiniteDuration,
                                      onComplete: OnComplete)
        extends Strategy {

      override def on(attempt: Int): OnAttempt = {
        OnAttempt.Continue(intervalFor(attempt).min(maxInterval))
      }

      private def intervalFor(attempt: Int): Duration = {
        Try(startInterval * Math.pow(intervalFactor, attempt.toDouble))
          .getOrElse(maxInterval)
      }
    }

    /**
      * Used only in tests.
      */
    final case class Stop(onComplete: OnComplete = OnComplete.Delete) extends Strategy {

      override def on(attempt: Int) = OnAttempt.Complete

    }

    /**
      * Used only in tests.
      */
    final case class Constant(interval: FiniteDuration) extends Strategy {

      override val onComplete = OnComplete.Delete

      override def on(attempt: Int) = OnAttempt.Continue(interval)

    }

    sealed trait OnComplete
    object OnComplete {
      case object Delete extends OnComplete
      case object Mark   extends OnComplete

      implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
    }

    sealed trait OnAttempt
    object OnAttempt {
      case object Complete                          extends OnAttempt
      final case class Continue(interval: Duration) extends OnAttempt

      implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
    }
  }
}

class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, repository: BridgeUploadQueueRepository, dbIo: DbIo)(
        implicit executionContext: ExecutionContext)
    extends BridgeUploadQueue with PhiLogging {

  override def add(item: Item): Future[Item] = dbIo.runAsync(repository.add(item))

  override def get(kind: String): Future[Option[Item]] = dbIo.runAsync(repository.getOne(kind))

  override def complete(kind: String, tag: String): Future[Unit] = {
    import Strategy.OnComplete._

    strategy.onComplete match {
      case Delete => dbIo.runAsync(repository.delete(kind, tag))
      case Mark =>
        dbIo.runAsyncTx {
          repository.getById(kind, tag) match {
            case Some(x) => repository.update(x.copy(completed = true))
            case None    => throw new RuntimeException(s"Can not find the task: kind=$kind, tag=$tag")
          }
        }
    }
  }

  /**
    * Tries to continue the task or complete it
    */
  override def tryRetry(item: Item): Future[Option[Item]] = {
    import Strategy.OnAttempt._

    logger.trace(phi"tryRetry($item)")

    val newAttempts = item.attempts + 1
    val action      = strategy.on(newAttempts)
    logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action")

    action match {
      case Continue(newInterval) =>
        val draftItem = item.copy(
          attempts = newAttempts,
          nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS)
        )

        logger.debug(draftItem)
        dbIo.runAsync {
          Some(repository.update(draftItem))
        }

      case Complete =>
        logger.warn(phi"All attempts are out for $item, complete the task")
        complete(item.kind, item.tag).map(_ => None)
    }
  }
}