package xyz.driver.pdsuicommon.concurrent import java.util.concurrent.ThreadLocalRandom import xyz.driver.pdsuicommon.BaseSuite import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy.{OnAttempt, OnComplete} import xyz.driver.pdsuicommon.db.{FakeDbIo, MysqlQueryBuilder} import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository import scala.concurrent.Future import scala.concurrent.duration.DurationInt class BridgeUploadQueueRepositoryAdapterSuite extends BaseSuite { // IDEA have some issue here with imports private implicit val executionContext = scala.concurrent.ExecutionContext.global "Strategy" - { "LimitExponential" - { "on" - { val strategy = Strategy.LimitExponential( startInterval = 10.seconds, intervalFactor = 1.4, maxInterval = 50.seconds, onComplete = OnComplete.Delete ) "a new interval should be greater than the previous one if the limit not reached" in { val previous = strategy.on(1) val current = strategy.on(2) (previous, current) match { case (OnAttempt.Continue(a), OnAttempt.Continue(b)) => assert(a < b) case x => fail(s"Unexpected result: $x") } } "should limit intervals" in { assert(strategy.on(20) == OnAttempt.Continue(strategy.maxInterval)) } "should not fail, if there is many attempts" in { assert(strategy.on(1000) == OnAttempt.Continue(strategy.maxInterval)) } } } } "complete" - { "onComplete == mark" - { "should update the item" in { var done = false val item = defaultItem val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def delete(kind: String, tag: String): Unit = throw new IllegalStateException("Impossible call") override def update(entity: EntityT): EntityT = { assert(entity.kind == item.kind, "repository.delete, kind") assert(entity.tag == item.tag, "repository.delete, tag") done = true entity } override def getById(kind: String, tag: String): Option[EntityT] = Some(item) } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = Strategy.Stop(OnComplete.Mark), repository = repository, dbIo = FakeDbIo ) assert(adapter.complete(item.kind, item.tag).isReadyWithin(100.millis)) assert(done) } } "onComplete == delete" - { "should delete the item" in { var done = false val item = defaultItem val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def getById(kind: String, tag: String): Option[EntityT] = fail("getById should not be used!") override def delete(kind: String, tag: String): Unit = { assert(kind == item.kind, "repository.delete, kind") assert(tag == item.tag, "repository.delete, tag") done = true } override def update(entity: EntityT): EntityT = throw new IllegalStateException("Impossible call") } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = Strategy.Stop(OnComplete.Delete), repository = repository, dbIo = FakeDbIo ) assert(adapter.complete(item.kind, item.tag).isReadyWithin(100.millis)) assert(done) } } } "tryRetry" - { "when all attempts are not out" - { val defaultStrategy = Strategy.Constant(10.seconds) "should return an updated item" in { val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def getById(kind: String, tag: String): Option[EntityT] = fail("getById should not be used!") override def update(draft: EntityT): EntityT = draft override def delete(kind: String, tag: String): Unit = throw new IllegalAccessError(s"kind=$kind, tag=$tag") } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = defaultStrategy, repository = repository, dbIo = FakeDbIo ) val item = defaultItem val r = adapter.tryRetry(item).futureValue assert(r.isDefined) assert(!r.contains(item)) } "should update an item with increased attempts" in { val item = defaultItem val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def getById(kind: String, tag: String): Option[EntityT] = fail("getById should not be used!") override def update(draft: EntityT): EntityT = { assert(draft.attempts === (item.attempts + 1), "repository.add") draft } override def delete(kind: String, tag: String): Unit = throw new IllegalAccessError(s"kind=$kind, tag=$tag") } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = defaultStrategy, repository = repository, dbIo = FakeDbIo ) assert(adapter.tryRetry(item).isReadyWithin(100.millis)) } "should remove an old item" in { val item = defaultItem val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def getById(kind: String, tag: String): Option[EntityT] = fail("getById should not be used!") override def update(draft: EntityT): EntityT = draft override def delete(kind: String, tag: String): Unit = { assert(kind == item.kind, "repository.delete, kind") assert(tag == item.tag, "repository.delete, kind") } } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = defaultStrategy, repository = repository, dbIo = FakeDbIo ) assert(adapter.tryRetry(item).isReadyWithin(100.millis)) } "should update time of the next attempt" in { val item = defaultItem val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def getById(kind: String, tag: String): Option[EntityT] = fail("getById should not be used!") override def update(draft: EntityT): EntityT = { assert(draft.nextAttempt.isAfter(item.nextAttempt), "repository.add") draft } override def delete(kind: String, tag: String): Unit = throw new IllegalAccessError(s"kind=$kind, tag=$tag") } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = defaultStrategy, repository = repository, dbIo = FakeDbIo ) assert(adapter.tryRetry(item).isReadyWithin(100.millis)) } } "when all attempts are out" - { val defaultStrategy = Strategy.Stop() "should not return an item" in { val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def getById(kind: String, tag: String): Option[EntityT] = fail("getById should not be used!") override def update(entity: EntityT): EntityT = fail("update should not be used!") override def delete(kind: String, tag: String): Unit = {} } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = defaultStrategy, repository = repository, dbIo = FakeDbIo ) val r = adapter.tryRetry(defaultItem).futureValue assert(r.isEmpty) } "should complete the item" in { var taskWasCompleted = false val item = defaultItem val repository = new BridgeUploadQueueRepository { override def add(draft: EntityT): EntityT = draft override def getOne(kind: String): Option[EntityT] = fail("getOne should not be used!") override def buildQuery: MysqlQueryBuilder[EntityT] = fail("buildQuery should not be used!") override def getById(kind: String, tag: String): Option[EntityT] = fail("getById should not be used!") override def update(entity: EntityT): EntityT = fail("update should not be used!") override def delete(kind: String, tag: String): Unit = {} } val adapter = new BridgeUploadQueueRepositoryAdapter( strategy = defaultStrategy, repository = repository, dbIo = FakeDbIo ) { override def complete(kind: String, tag: String): Future[Unit] = Future { assert(kind == item.kind, "adapter.complete, kind") assert(tag == item.tag, "adapter.complete, tag") taskWasCompleted = true } } val r = adapter.tryRetry(item).futureValue assert(r.isEmpty) assert(taskWasCompleted) } } } private def defaultItem = BridgeUploadQueue.Item( "test", ThreadLocalRandom.current().nextInt().toString ) }