From 83e7be6269bc6efc74fc5b954d801b5907404aca Mon Sep 17 00:00:00 2001 From: Karthika Date: Tue, 5 Sep 2017 10:07:12 -0700 Subject: validation for phone number --- src/main/scala/xyz/driver/core/domain.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/scala/xyz/driver/core/domain.scala b/src/main/scala/xyz/driver/core/domain.scala index 48943a7..a72167d 100644 --- a/src/main/scala/xyz/driver/core/domain.scala +++ b/src/main/scala/xyz/driver/core/domain.scala @@ -23,6 +23,9 @@ object domain { } final case class PhoneNumber(countryCode: String = "1", number: String) { + require(countryCode.nonEmpty, "Country Code must be entered") + require(number.nonEmpty, "Phone number must be entered") + override def toString: String = s"+$countryCode $number" } -- cgit v1.2.3 From 07c3303f9c6b8d46763d270f47b730d5519eb204 Mon Sep 17 00:00:00 2001 From: vlad Date: Tue, 19 Sep 2017 20:47:28 -0700 Subject: Utils from pds-common --- src/main/scala/xyz/driver/core/cache.scala | 109 ++++++++++++++++++++++++++++ src/main/scala/xyz/driver/core/future.scala | 87 ++++++++++++++++++++++ 2 files changed, 196 insertions(+) create mode 100644 src/main/scala/xyz/driver/core/cache.scala create mode 100644 src/main/scala/xyz/driver/core/future.scala diff --git a/src/main/scala/xyz/driver/core/cache.scala b/src/main/scala/xyz/driver/core/cache.scala new file mode 100644 index 0000000..79ba2d7 --- /dev/null +++ b/src/main/scala/xyz/driver/core/cache.scala @@ -0,0 +1,109 @@ +package xyz.driver.core + +import java.util.concurrent.{Callable, TimeUnit} + +import com.google.common.cache.{CacheBuilder, Cache => GuavaCache} +import com.typesafe.scalalogging.Logger + +import scala.concurrent.duration.{Duration, _} +import scala.concurrent.{ExecutionContext, Future} + +object cache { + + /** + * FutureCache is used to represent an in-memory, in-process, asynchronous cache. + * + * Every cache operation is atomic. + * + * This implementation evicts failed results, + * and doesn't interrupt the underlying request that has been fired off. + */ + class AsyncCache[K, V](name: String, cache: GuavaCache[K, Future[V]])(implicit executionContext: ExecutionContext) { + + private[this] val log = Logger(s"AsyncCache.$name") + private[this] val underlying = cache.asMap() + + private[this] def evictOnFailure(key: K, f: Future[V]): Future[V] = { + f onFailure { + case ex: Throwable => + log.debug(s"Evict key $key due to exception $ex") + evict(key, f) + } + f // we return the original future to make evict(k, f) easier to work with. + } + + /** + * Equivalent to getOrElseUpdate + */ + def apply(key: K)(value: => Future[V]): Future[V] = getOrElseUpdate(key)(value) + + /** + * Gets the cached Future. + * + * @return None if a value hasn't been specified for that key yet + * Some(ksync computation) if the value has been specified. Just + * because this returns Some(..) doesn't mean that it has been + * satisfied, but if it hasn't been satisfied, it's probably + * in-flight. + */ + def get(key: K): Option[Future[V]] = Option(underlying.get(key)) + + /** + * Gets the cached Future, or if it hasn't been returned yet, computes it and + * returns that value. + */ + def getOrElseUpdate(key: K)(compute: => Future[V]): Future[V] = { + log.debug(s"Try to retrieve key $key from cache") + evictOnFailure(key, cache.get(key, new Callable[Future[V]] { + def call(): Future[V] = { + log.debug(s"Cache miss, load the key: $key") + compute + } + })) + } + + /** + * Unconditionally sets a value for a given key + */ + def set(key: K, value: Future[V]): Unit = { + cache.put(key, value) + evictOnFailure(key, value) + } + + /** + * Evicts the contents of a `key` if the old value is `value`. + * + * Since `scala.concurrent.Future` uses reference equality, you must use the + * same object reference to evict a value. + * + * @return true if the key was evicted + * false if the key was not evicted + */ + def evict(key: K, value: Future[V]): Boolean = underlying.remove(key, value) + + /** + * @return the number of results that have been computed successfully or are in flight. + */ + def size: Int = cache.size.toInt + } + + object AsyncCache { + val DEFAULT_CAPACITY: Long = 10000L + val DEFAULT_READ_EXPIRATION: Duration = 10 minutes + val DEFAULT_WRITE_EXPIRATION: Duration = 1 hour + + def apply[K <: AnyRef, V <: AnyRef](name: String, + capacity: Long = DEFAULT_CAPACITY, + readExpiration: Duration = DEFAULT_READ_EXPIRATION, + writeExpiration: Duration = DEFAULT_WRITE_EXPIRATION)( + implicit executionContext: ExecutionContext): AsyncCache[K, V] = { + val guavaCache = CacheBuilder + .newBuilder() + .maximumSize(capacity) + .expireAfterAccess(readExpiration.toSeconds, TimeUnit.SECONDS) + .expireAfterWrite(writeExpiration.toSeconds, TimeUnit.SECONDS) + .build[K, Future[V]]() + new AsyncCache(name, guavaCache) + } + } +} diff --git a/src/main/scala/xyz/driver/core/future.scala b/src/main/scala/xyz/driver/core/future.scala new file mode 100644 index 0000000..1ee3576 --- /dev/null +++ b/src/main/scala/xyz/driver/core/future.scala @@ -0,0 +1,87 @@ +package xyz.driver.core + +import com.typesafe.scalalogging.Logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} + +object future { + val log = Logger("Driver.Future") + + implicit class RichFuture[T](f: Future[T]) { + def mapAll[U](pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[U] = { + val p = Promise[U]() + f.onComplete(r => p.complete(Try(pf(r)))) + p.future + } + + def failFastZip[U](that: Future[U])(implicit executionContext: ExecutionContext): Future[(T, U)] = { + future.failFastZip(f, that) + } + } + + def failFastSequence[T](t: Iterable[Future[T]])(implicit ec: ExecutionContext): Future[Seq[T]] = { + t.foldLeft(Future.successful(Nil: List[T])) { (f, i) => + failFastZip(f, i).map { case (tail, h) => h :: tail } + } + .map(_.reverse) + } + + /** + * Standard scala zip waits forever on the left side, even if the right side fails + */ + def failFastZip[T, U](ft: Future[T], fu: Future[U])(implicit ec: ExecutionContext): Future[(T, U)] = { + type State = Either[(T, Promise[U]), (U, Promise[T])] + val middleState = Promise[State]() + + ft.onComplete { + case f @ Failure(err) => + if (!middleState.tryFailure(err)) { + // the right has already succeeded + middleState.future.foreach { + case Right((_, pt)) => pt.complete(f) + case Left((t1, _)) => // This should never happen + log.error(s"Logic error: tried to set Failure($err) but Left($t1) already set") + } + } + case Success(t) => + // Create the next promise: + val pu = Promise[U]() + if (!middleState.trySuccess(Left((t, pu)))) { + // we can't set, so the other promise beat us here. + middleState.future.foreach { + case Right((_, pt)) => pt.success(t) + case Left((t1, _)) => // This should never happen + log.error(s"Logic error: tried to set Left($t) but Left($t1) already set") + } + } + } + fu.onComplete { + case f @ Failure(err) => + if (!middleState.tryFailure(err)) { + // we can't set, so the other promise beat us here. + middleState.future.foreach { + case Left((_, pu)) => pu.complete(f) + case Right((u1, _)) => // This should never happen + log.error(s"Logic error: tried to set Failure($err) but Right($u1) already set") + } + } + case Success(u) => + // Create the next promise: + val pt = Promise[T]() + if (!middleState.trySuccess(Right((u, pt)))) { + // we can't set, so the other promise beat us here. + middleState.future.foreach { + case Left((_, pu)) => pu.success(u) + case Right((u1, _)) => // This should never happen + log.error(s"Logic error: tried to set Right($u) but Right($u1) already set") + } + } + } + + middleState.future.flatMap { + case Left((t, pu)) => pu.future.map((t, _)) + case Right((u, pt)) => pt.future.map((_, u)) + } + } +} -- cgit v1.2.3 From 3fc91f50ae799106c99d3aa730d62d12be80b247 Mon Sep 17 00:00:00 2001 From: vlad Date: Tue, 19 Sep 2017 20:49:40 -0700 Subject: Utils from pds-common --- src/main/scala/xyz/driver/core/domain.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/scala/xyz/driver/core/domain.scala b/src/main/scala/xyz/driver/core/domain.scala index a72167d..48943a7 100644 --- a/src/main/scala/xyz/driver/core/domain.scala +++ b/src/main/scala/xyz/driver/core/domain.scala @@ -23,9 +23,6 @@ object domain { } final case class PhoneNumber(countryCode: String = "1", number: String) { - require(countryCode.nonEmpty, "Country Code must be entered") - require(number.nonEmpty, "Phone number must be entered") - override def toString: String = s"+$countryCode $number" } -- cgit v1.2.3 From e4a55e9fdcb3fb89963416b273654259a9699d03 Mon Sep 17 00:00:00 2001 From: vlad Date: Tue, 19 Sep 2017 22:19:59 -0700 Subject: Utils from pds-common --- src/main/scala/xyz/driver/core/rest.scala | 109 ++++++++++++++++++++++++------ 1 file changed, 89 insertions(+), 20 deletions(-) diff --git a/src/main/scala/xyz/driver/core/rest.scala b/src/main/scala/xyz/driver/core/rest.scala index 2527b75..dde570a 100644 --- a/src/main/scala/xyz/driver/core/rest.scala +++ b/src/main/scala/xyz/driver/core/rest.scala @@ -10,10 +10,11 @@ import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallabl import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.{HttpChallenges, RawHeader, `User-Agent`} import akka.http.scaladsl.server.AuthenticationFailedRejection.CredentialsRejected +import akka.http.scaladsl.server.Route import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} -import akka.stream.scaladsl.Flow -import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.stream._ import akka.util.ByteString import com.github.swagger.akka.model._ import com.github.swagger.akka.{HasActorSystem, SwaggerHttpService} @@ -26,7 +27,8 @@ import xyz.driver.core.auth._ import xyz.driver.core.{Name, generators} import xyz.driver.core.time.provider.TimeProvider -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.duration._ import scala.util.{Failure, Success} import scalaz.Scalaz.{futureInstance, intInstance, listInstance, mapEqual, mapMonoid, stringInstance} import scalaz.syntax.equal._ @@ -365,10 +367,10 @@ trait RestService extends Service { protected def delete(baseUri: Uri, path: String) = HttpRequest(HttpMethods.DELETE, endpointUri(baseUri, path)) - protected def endpointUri(baseUri: Uri, path: String) = + protected def endpointUri(baseUri: Uri, path: String): Uri = baseUri.withPath(Uri.Path(path)) - protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]) = + protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]): Uri = baseUri.withPath(Uri.Path(path)).withQuery(Uri.Query(query: _*)) } @@ -409,17 +411,9 @@ class HttpRestServiceTransport(applicationName: Name[App], time: TimeProvider) extends ServiceTransport { - protected implicit val materializer = ActorMaterializer()(actorSystem) - protected implicit val execution = executionContext + protected implicit val execution: ExecutionContext = executionContext - private val client = Http()(actorSystem) - - private val clientConnectionSettings: ClientConnectionSettings = - ClientConnectionSettings(actorSystem).withUserAgentHeader( - Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) - - private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) - .withConnectionSettings(clientConnectionSettings) + protected val httpClient: HttpClient = new SingleRequestHttpClient(applicationName, applicationVersion, actorSystem) def sendRequestGetResponse(context: ServiceRequestContext)(requestStub: HttpRequest): Future[HttpResponse] = { @@ -439,7 +433,7 @@ class HttpRestServiceTransport(applicationName: Name[App], log.info(s"Sending request to ${request.method} ${request.uri}") - val response = client.singleRequest(request, settings = connectionPoolSettings)(materializer) + val response = httpClient.makeRequest(request) response.onComplete { case Success(r) => @@ -469,6 +463,81 @@ class HttpRestServiceTransport(applicationName: Name[App], } } +trait HttpClient { + def makeRequest(request: HttpRequest): Future[HttpResponse] +} + +class SingleRequestHttpClient(applicationName: Name[App], applicationVersion: String, actorSystem: ActorSystem) + extends HttpClient { + + protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) + private val client = Http()(actorSystem) + + private val clientConnectionSettings: ClientConnectionSettings = + ClientConnectionSettings(actorSystem).withUserAgentHeader( + Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) + + private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) + .withConnectionSettings(clientConnectionSettings) + + def makeRequest(request: HttpRequest): Future[HttpResponse] = { + client.singleRequest(request, settings = connectionPoolSettings)(materializer) + } +} + +class PooledHttpClient( + baseUri: Uri, + applicationName: Name[App], + applicationVersion: String, + requestRateLimit: Int = 64, + requestQueueSize: Int = 1024)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) + extends HttpClient { + + private val host = baseUri.authority.host.toString() + private val port = baseUri.effectivePort + private val scheme = baseUri.scheme + + protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) + + private val clientConnectionSettings: ClientConnectionSettings = + ClientConnectionSettings(actorSystem).withUserAgentHeader( + Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) + + private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) + .withConnectionSettings(clientConnectionSettings) + + private val pool = if (scheme.equalsIgnoreCase("https")) { + Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings) + } else { + Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings) + } + + private val queue = Source + .queue[(HttpRequest, Promise[HttpResponse])](requestQueueSize, OverflowStrategy.dropNew) + .via(pool) + .throttle(requestRateLimit, 1.second, maximumBurst = requestRateLimit, ThrottleMode.shaping) + .toMat(Sink.foreach({ + case ((Success(resp), p)) => p.success(resp) + case ((Failure(e), p)) => p.failure(e) + }))(Keep.left) + .run + + def makeRequest(request: HttpRequest): Future[HttpResponse] = { + val responsePromise = Promise[HttpResponse]() + + queue.offer(request -> responsePromise).flatMap { + case QueueOfferResult.Enqueued => + responsePromise.future + case QueueOfferResult.Dropped => + Future.failed(new Exception(s"Request queue to the host $host is overflown")) + case QueueOfferResult.Failure(ex) => + Future.failed(ex) + case QueueOfferResult.QueueClosed => + Future.failed(new Exception("Queue was closed (pool shut down) while running the request")) + } + } +} + import scala.reflect.runtime.universe._ class Swagger(override val host: String, @@ -479,10 +548,10 @@ class Swagger(override val host: String, val config: Config) extends SwaggerHttpService with HasActorSystem { - val materializer = ActorMaterializer()(actorSystem) + val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) - override val basePath = config.getString("swagger.basePath") - override val apiDocsPath = config.getString("swagger.docsPath") + override val basePath: String = config.getString("swagger.basePath") + override val apiDocsPath: String = config.getString("swagger.docsPath") override val info = Info( config.getString("swagger.apiInfo.description"), @@ -503,7 +572,7 @@ class Swagger(override val host: String, vendorExtensions = Map.empty[String, AnyRef] ) - def swaggerUI = get { + def swaggerUI: Route = get { pathPrefix("") { pathEndOrSingleSlash { getFromResource("swagger-ui/index.html") -- cgit v1.2.3