diff options
author | vlad <vlad@driver.xyz> | 2018-01-25 14:12:31 -0800 |
---|---|---|
committer | vlad <vlad@driver.xyz> | 2018-01-25 14:12:31 -0800 |
commit | a0877d81ca2844d75dc361b5ce7c99afacd6e25f (patch) | |
tree | 8fe49f45cbcddbbb9a3d167099abe7aa2625e56b /src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala | |
parent | 46a22e9ab324a0068a85952cdc809800f360f445 (diff) | |
download | rest-query-a0877d81ca2844d75dc361b5ce7c99afacd6e25f.tar.gz rest-query-a0877d81ca2844d75dc361b5ce7c99afacd6e25f.tar.bz2 rest-query-a0877d81ca2844d75dc361b5ce7c99afacd6e25f.zip |
Extracting query library
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala')
-rw-r--r-- | src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala | 60 |
1 files changed, 0 insertions, 60 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala deleted file mode 100644 index 2f7fe6c..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala +++ /dev/null @@ -1,60 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency -import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag} -import xyz.driver.pdsuicommon.logging._ -import xyz.driver.pdsuicommon.serialization.Marshaller - -import scala.concurrent.{ExecutionContext, Future} - -object SafeBridgeUploadQueue { - - trait Tag extends Product with Serializable - - final case class SafeTask[T <: Tag](tag: T, private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item) - - object SafeTask { - implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = { - import x._ - phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)" - } - } - - trait DependencyResolver[T <: Tag] { - def getDependency(tag: T): Option[Dependency] - } - -} - -class SafeBridgeUploadQueue[T <: Tag](kind: String, origQueue: BridgeUploadQueue)( - implicit tagMarshaller: Marshaller[T, String], - dependencyResolver: DependencyResolver[T], - executionContext: ExecutionContext) { - - type Task = SafeTask[T] - - def add(tag: T): Future[BridgeUploadQueue.Item] = - origQueue.add( - BridgeUploadQueue.Item( - kind = kind, - tag = tagMarshaller.write(tag), - dependency = dependencyResolver.getDependency(tag) - )) - - def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem)) - - def get: Future[Option[Task]] = wrap(origQueue.get(kind)) - - def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag)) - - private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover)) - - private def cover(rawTask: BridgeUploadQueue.Item): Task = { - val tag = tagMarshaller - .read(rawTask.tag) - .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'")) - - SafeTask(tag, rawTask) - } - -} |