diff options
author | Vlad Uspensky <v.uspenskiy@icloud.com> | 2017-07-06 15:05:35 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-06 15:05:35 -0700 |
commit | c83566d7e22be806a6fc490b670eb5024a0b882d (patch) | |
tree | 33c48ae644866491a5b6b5cb9695b9c2e37cf327 | |
parent | 6fa693851222057d459d23330474ea83a3fb73f1 (diff) | |
parent | d967ca5b62bd790f0ff750d2f94e3f392cc1f8ad (diff) | |
download | driver-core-c83566d7e22be806a6fc490b670eb5024a0b882d.tar.gz driver-core-c83566d7e22be806a6fc490b670eb5024a0b882d.tar.bz2 driver-core-c83566d7e22be806a6fc490b670eb5024a0b882d.zip |
Merge pull request #49 from drivergroup/pubsubv0.13.16
Pubsub
-rw-r--r-- | build.sbt | 3 | ||||
-rw-r--r-- | src/main/scala/xyz/driver/core/pubsub.scala | 98 | ||||
-rw-r--r-- | src/test/scala/xyz/driver/core/FileTest.scala | 8 |
3 files changed, 104 insertions, 5 deletions
@@ -16,7 +16,8 @@ lazy val core = (project in file(".")) "org.mockito" % "mockito-core" % "1.9.5" % "test", "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.1", "com.amazonaws" % "aws-java-sdk-s3" % "1.11.26", - "com.google.cloud" % "google-cloud-storage" % "0.9.4-beta", + "com.google.cloud" % "google-cloud-pubsub" % "0.17.2-alpha", + "com.google.cloud" % "google-cloud-storage" % "1.0.1", "com.typesafe.slick" %% "slick" % "3.1.1", "com.typesafe" % "config" % "1.2.1", "com.typesafe.scala-logging" %% "scala-logging" % "3.4.0", diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala new file mode 100644 index 0000000..a4de51b --- /dev/null +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -0,0 +1,98 @@ +package xyz.driver.core + +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import akka.stream.Materializer +import com.google.api.core.{ApiFutureCallback, ApiFutures} +import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} +import com.google.protobuf.ByteString +import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName} +import com.typesafe.scalalogging.Logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Try} + +object pubsub { + + trait PubsubPublisher[Message] { + + type Result + + def publish(message: Message): Future[Result] + } + + class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( + implicit messageMarshaller: Marshaller[Message, Array[Byte]], + ex: ExecutionContext + ) extends PubsubPublisher[Message] { + + type Result = Id[PubsubMessage] + + private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() + + override def publish(message: Message): Future[Id[PubsubMessage]] = { + + Marshal(message).to[Array[Byte]].flatMap { messageBytes => + val data = ByteString.copyFrom(messageBytes) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + + val promise = Promise[Id[PubsubMessage]]() + + val messageIdFuture = publisher.publish(pubsubMessage) + + ApiFutures.addCallback( + messageIdFuture, + new ApiFutureCallback[String]() { + override def onSuccess(messageId: String): Unit = { + log.info(s"Published a message with topic $topicName, message id $messageId: $message") + promise.complete(Try(Id[PubsubMessage](messageId))) + } + + override def onFailure(t: Throwable): Unit = { + log.warn(s"Failed to publish a message with topic $topicName: $message", t) + promise.complete(Failure(t)) + } + } + ) + + promise.future + } + } + } + + trait PubsubSubscriber { + + def stopListening(): Unit + } + + class GooglePubsubSubscriber[Message, Result](projectId: String, + subscriptionId: String, + receiver: Message => Future[Result], + log: Logger)( + implicit messageMarshaller: Unmarshaller[Array[Byte], Message], + mat: Materializer, + ex: ExecutionContext) + extends PubsubSubscriber { + + private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) + + private val messageReceiver = new MessageReceiver() { + override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { + val messageBytes = message.getData.toByteArray + Unmarshal(messageBytes).to[Message].flatMap { messageBody => + log.info( + s"Received a message ${message.getMessageId} for subscription $subscriptionId: ${new String(messageBytes)}") + receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + } + } + } + + private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + + subscriber.startAsync() + + override def stopListening(): Unit = { + subscriber.stopAsync() + } + } +} diff --git a/src/test/scala/xyz/driver/core/FileTest.scala b/src/test/scala/xyz/driver/core/FileTest.scala index 246cd95..717233e 100644 --- a/src/test/scala/xyz/driver/core/FileTest.scala +++ b/src/test/scala/xyz/driver/core/FileTest.scala @@ -133,7 +133,7 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar { } "Google Cloud Storage" should "upload and download files" in { - import com.google.cloud.Page + import com.google.api.gax.paging.Page import com.google.cloud.storage.{Blob, Bucket, Storage} import Bucket.BlobWriteOption import Storage.BlobListOption @@ -162,9 +162,9 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar { val gcsStorage = new GcsStorage(gcsMock, testBucket, scala.concurrent.ExecutionContext.global) when(pageMock.iterateAll()).thenReturn( - Iterator[Blob]().asJava, - Iterator[Blob](blobMock).asJava, - Iterator[Blob]().asJava + Iterable[Blob]().asJava, + Iterable[Blob](blobMock).asJava, + Iterable[Blob]().asJava ) when(gcsMock.list(testBucket.value, BlobListOption.currentDirectory(), BlobListOption.prefix(s"$testDirPath/"))) .thenReturn(pageMock) |