aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala
blob: c6a21449993d45da8bb60ffd0c03cd8f000dd28f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package xyz.driver.common.concurrent

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

import xyz.driver.common.concurrent.BridgeUploadQueue.Item
import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
import xyz.driver.common.db.Transactions
import xyz.driver.common.db.repositories.BridgeUploadQueueRepository
import xyz.driver.common.domain.LongId
import xyz.driver.common.logging._

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}

object BridgeUploadQueueRepositoryAdapter {

  sealed trait Strategy {

    def onComplete: Strategy.OnComplete

    def on(attempt: Int): Strategy.OnAttempt

  }

  object Strategy {

    /**
      * Works forever, but has a limit for intervals.
      */
    final case class LimitExponential(startInterval: FiniteDuration,
                                      intervalFactor: Double,
                                      maxInterval: FiniteDuration,
                                      onComplete: OnComplete) extends Strategy {

      override def on(attempt: Int): OnAttempt = {
        OnAttempt.Continue(intervalFor(attempt).min(maxInterval))
      }

      private def intervalFor(attempt: Int): Duration = {
        startInterval * Math.pow(intervalFactor, attempt.toDouble)
      }
    }

    /**
      * Used only in tests.
      */
    case object Ignore extends Strategy {

      override val onComplete = OnComplete.Delete

      override def on(attempt: Int) = OnAttempt.Complete

    }

    /**
      * Used only in tests.
      */
    final case class Constant(interval: FiniteDuration) extends Strategy {

      override val onComplete = OnComplete.Delete

      override def on(attempt: Int) = OnAttempt.Continue(interval)

    }

    sealed trait OnComplete
    object OnComplete {
      case object Delete extends OnComplete
      case object Mark extends OnComplete

      implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
    }

    sealed trait OnAttempt
    object OnAttempt {
      case object Complete extends OnAttempt
      case class Continue(interval: Duration) extends OnAttempt

      implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
    }
  }
}

class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
                                         repository: BridgeUploadQueueRepository,
                                         transactions: Transactions)
                                        (implicit executionContext: ExecutionContext)
  extends BridgeUploadQueue with PhiLogging {

  override def add(item: Item): Future[Unit] = transactions.run { _ =>
    repository.add(item)
  }

  override def get(kind: String): Future[Option[Item]] = {
    repository.getOne(kind)
  }

  override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
    import Strategy.OnComplete._

    strategy.onComplete match {
      case Delete => repository.delete(item)
      case Mark =>
        repository.getById(item) match {
          case Some(x) => repository.update(x.copy(completed = true))
          case None => throw new RuntimeException(s"Can not find the $item task")
        }
    }
  }

  override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
    import Strategy.OnAttempt._

    logger.trace(phi"tryRetry($item)")

    val newAttempts = item.attempts + 1
    val action = strategy.on(newAttempts)
    logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action")

    action match {
      case Continue(newInterval) =>
        val draftItem = item.copy(
          attempts = newAttempts,
          nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS)
        )

        logger.debug(draftItem)
        Some(repository.update(draftItem))

      case Complete =>
        repository.delete(item.id)
        None
    }
  }
}