diff options
Diffstat (limited to 'src/main/scala/xyz/driver')
69 files changed, 0 insertions, 5733 deletions
diff --git a/src/main/scala/xyz/driver/core/Refresh.scala b/src/main/scala/xyz/driver/core/Refresh.scala deleted file mode 100644 index 6db9c26..0000000 --- a/src/main/scala/xyz/driver/core/Refresh.scala +++ /dev/null @@ -1,69 +0,0 @@ -package xyz.driver.core - -import java.time.Instant -import java.util.concurrent.atomic.AtomicReference - -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.concurrent.duration.Duration - -/** A single-value asynchronous cache with TTL. - * - * Slightly adapted from - * [[https://github.com/twitter/util/blob/ae0ab09134414438af9dfaa88a4613cecbff4741/util-cache/src/main/scala/com/twitter/cache/Refresh.scala - * Twitter's "util" library]] - * - * Released under the Apache License 2.0. - */ -object Refresh { - - /** Creates a function that will provide a cached value for a given time-to-live (TTL). - * - * It avoids the "thundering herd" problem if multiple requests arrive - * simultanously and the cached value has expired or is unset. - * - * Usage example: - * {{{ - * def freshToken(): Future[String] = // expensive network call to get an access token - * val getToken: () => Future[String] = Refresh.every(1.hour)(freshToken()) - * - * getToken() // new token is issued - * getToken() // subsequent calls use the cached token - * // wait 1 hour - * getToken() // new token is issued - * }}} - * - * @param ttl Time-To-Live duration to cache a computed value. - * @param compute Call-by-name operation that eventually computes a value to - * be cached. Note that if the computation (i.e. the future) fails, the value - * is not cached. - * @param ec The execution context in which valeu computations will be run. - * @return A zero-arg function that returns the cached value. - */ - def every[A](ttl: Duration)(compute: => Future[A])(implicit ec: ExecutionContext): () => Future[A] = { - val ref = new AtomicReference[(Future[A], Instant)]( - (Future.failed(new NoSuchElementException("Cached value was never computed")), Instant.MIN) - ) - def refresh(): Future[A] = { - val tuple = ref.get - val (cachedValue, lastRetrieved) = tuple - val now = Instant.now - if (now.getEpochSecond < lastRetrieved.getEpochSecond + ttl.toSeconds) { - cachedValue - } else { - val p = Promise[A] - val nextTuple = (p.future, now) - if (ref.compareAndSet(tuple, nextTuple)) { - compute.onComplete { done => - if (done.isFailure) { - ref.set((p.future, lastRetrieved)) // don't update retrieval time in case of failure - } - p.complete(done) - } - } - refresh() - } - } - refresh _ - } - -} diff --git a/src/main/scala/xyz/driver/core/auth.scala b/src/main/scala/xyz/driver/core/auth.scala deleted file mode 100644 index 896bd89..0000000 --- a/src/main/scala/xyz/driver/core/auth.scala +++ /dev/null @@ -1,43 +0,0 @@ -package xyz.driver.core - -import xyz.driver.core.domain.Email -import xyz.driver.core.time.Time -import scalaz.Equal - -object auth { - - trait Permission - - final case class Role(id: Id[Role], name: Name[Role]) { - - def oneOf(roles: Role*): Boolean = roles.contains(this) - - def oneOf(roles: Set[Role]): Boolean = roles.contains(this) - } - - object Role { - implicit def idEqual: Equal[Role] = Equal.equal[Role](_ == _) - } - - trait User { - def id: Id[User] - } - - final case class AuthToken(value: String) - - final case class AuthTokenUserInfo( - id: Id[User], - email: Email, - emailVerified: Boolean, - audience: String, - roles: Set[Role], - expirationTime: Time) - extends User - - final case class RefreshToken(value: String) - final case class PermissionsToken(value: String) - - final case class PasswordHash(value: String) - - final case class AuthCredentials(identifier: String, password: String) -} diff --git a/src/main/scala/xyz/driver/core/core.scala b/src/main/scala/xyz/driver/core/core.scala deleted file mode 100644 index 11c1ffe..0000000 --- a/src/main/scala/xyz/driver/core/core.scala +++ /dev/null @@ -1,174 +0,0 @@ -package xyz.driver - -import eu.timepit.refined.api.{Refined, Validate} -import eu.timepit.refined.collection.NonEmpty -import scalaz.{Equal, Monad, OptionT} -import xyz.driver.core.rest.errors.ExternalServiceException -import xyz.driver.core.tagging.Tagged - -import scala.concurrent.{ExecutionContext, Future} - -// TODO: this package seems too complex, look at all the features we need! -import scala.language.{higherKinds, implicitConversions, reflectiveCalls} - -package object core { - - def make[T](v: => T)(f: T => Unit): T = { - val value = v - f(value) - value - } - - def using[R <: { def close() }, P](r: => R)(f: R => P): P = { - val resource = r - try { - f(resource) - } finally { - resource.close() - } - } - - type @@[+V, +Tag] = V with Tagged[V, Tag] - - implicit class OptionTExtensions[H[_]: Monad, T](optionTValue: OptionT[H, T]) { - - def returnUnit: H[Unit] = optionTValue.fold[Unit](_ => (), ()) - - def continueIgnoringNone: OptionT[H, Unit] = - optionTValue.map(_ => ()).orElse(OptionT.some[H, Unit](())) - - def subflatMap[B](f: T => Option[B]): OptionT[H, B] = - OptionT.optionT[H](implicitly[Monad[H]].map(optionTValue.run)(_.flatMap(f))) - } - - implicit class MonadicExtensions[H[_]: Monad, T](monadicValue: H[T]) { - private implicit val monadT = implicitly[Monad[H]] - - def returnUnit: H[Unit] = monadT(monadicValue)(_ => ()) - - def toOptionT: OptionT[H, T] = - OptionT.optionT[H](monadT(monadicValue)(value => Option(value))) - - def toUnitOptionT: OptionT[H, Unit] = - OptionT.optionT[H](monadT(monadicValue)(_ => Option(()))) - } - - implicit class FutureExtensions[T](future: Future[T]) { - def passThroughExternalServiceException(implicit executionContext: ExecutionContext): Future[T] = - future.transform(identity, { - case ExternalServiceException(_, _, Some(e)) => e - case t: Throwable => t - }) - } -} - -package core { - - import java.util.UUID - - sealed trait GenericId[+Tag, IdType] extends Any { - def value: IdType - def length: Int - def toString: String - } - - final case class Id[+Tag](value: String) extends AnyVal with GenericId[Tag, String] { - @inline def length: Int = value.length - override def toString: String = value - } - - @SuppressWarnings(Array("org.wartremover.warts.ImplicitConversion")) - object Id { - implicit def idEqual[T]: Equal[Id[T]] = Equal.equal[Id[T]](_ == _) - implicit def idOrdering[T]: Ordering[Id[T]] = Ordering.by[Id[T], String](_.value) - - sealed class Mapper[E, R] { - def apply[T >: E](id: Id[R]): Id[T] = Id[E](id.value) - def apply[T >: R](id: Id[E])(implicit dummy: DummyImplicit): Id[T] = Id[R](id.value) - } - object Mapper { - def apply[E, R] = new Mapper[E, R] - } - implicit def convertRE[R, E](id: Id[R])(implicit mapper: Mapper[E, R]): Id[E] = mapper[E](id) - implicit def convertER[E, R](id: Id[E])(implicit mapper: Mapper[E, R]): Id[R] = mapper[R](id) - } - - final case class UuidId[+Tag](value: UUID) extends AnyVal with GenericId[Tag, UUID] { - @inline def length: Int = value.toString.length - override def toString: String = value.toString - } - - @SuppressWarnings(Array("org.wartremover.warts.ImplicitConversion")) - object UuidId { - implicit def idEqual[T]: Equal[UuidId[T]] = Equal.equal[UuidId[T]](_ == _) - implicit def idOrdering[T]: Ordering[UuidId[T]] = Ordering.by[UuidId[T], UUID](_.value) - - sealed class Mapper[E, R] { - def apply[T >: E](id: UuidId[R]): UuidId[T] = UuidId[E](id.value) - def apply[T >: R](id: UuidId[E])(implicit dummy: DummyImplicit): UuidId[T] = UuidId[R](id.value) - } - object Mapper { - def apply[E, R] = new Mapper[E, R] - } - implicit def convertRE[R, E](id: UuidId[R])(implicit mapper: Mapper[E, R]): UuidId[E] = mapper[E](id) - implicit def convertER[E, R](id: UuidId[E])(implicit mapper: Mapper[E, R]): UuidId[R] = mapper[R](id) - } - - final case class NumericId[+Tag](value: Long) extends AnyVal with GenericId[Tag, Long] { - @inline def length: Int = value.toString.length - override def toString: String = value.toString - } - - @SuppressWarnings(Array("org.wartremover.warts.ImplicitConversion")) - object NumericId { - implicit def idEqual[T]: Equal[NumericId[T]] = Equal.equal[NumericId[T]](_ == _) - implicit def idOrdering[T]: Ordering[NumericId[T]] = Ordering.by[NumericId[T], Long](_.value) - - sealed class Mapper[E, R] { - def apply[T >: E](id: NumericId[R]): NumericId[T] = NumericId[E](id.value) - def apply[T >: R](id: NumericId[E])(implicit dummy: DummyImplicit): NumericId[T] = NumericId[R](id.value) - } - object Mapper { - def apply[E, R] = new Mapper[E, R] - } - implicit def convertRE[R, E](id: NumericId[R])(implicit mapper: Mapper[E, R]): NumericId[E] = mapper[E](id) - implicit def convertER[E, R](id: NumericId[E])(implicit mapper: Mapper[E, R]): NumericId[R] = mapper[R](id) - } - - final case class Name[+Tag](value: String) extends AnyVal { - @inline def length: Int = value.length - override def toString: String = value - } - - object Name { - implicit def nameEqual[T]: Equal[Name[T]] = Equal.equal[Name[T]](_ == _) - implicit def nameOrdering[T]: Ordering[Name[T]] = Ordering.by(_.value) - - implicit def nameValidator[T, P](implicit stringValidate: Validate[String, P]): Validate[Name[T], P] = { - Validate.instance[Name[T], P, stringValidate.R]( - name => stringValidate.validate(name.value), - name => stringValidate.showExpr(name.value)) - } - } - - final case class NonEmptyName[+Tag](value: String Refined NonEmpty) { - @inline def length: Int = value.value.length - override def toString: String = value.value - } - - object NonEmptyName { - implicit def nonEmptyNameEqual[T]: Equal[NonEmptyName[T]] = - Equal.equal[NonEmptyName[T]](_.value.value == _.value.value) - - implicit def nonEmptyNameOrdering[T]: Ordering[NonEmptyName[T]] = Ordering.by(_.value.value) - } - - final case class Revision[T](id: String) - - object Revision { - implicit def revisionEqual[T]: Equal[Revision[T]] = Equal.equal[Revision[T]](_.id == _.id) - } - - final case class Base64(value: String) - -} diff --git a/src/main/scala/xyz/driver/core/database/Converters.scala b/src/main/scala/xyz/driver/core/database/Converters.scala deleted file mode 100644 index ad79abf..0000000 --- a/src/main/scala/xyz/driver/core/database/Converters.scala +++ /dev/null @@ -1,26 +0,0 @@ -package xyz.driver.core.database - -import xyz.driver.core.rest.errors.DatabaseException - -import scala.reflect.ClassTag - -/** - * Helper methods for converting between table rows and Scala objects - */ -trait Converters { - def fromStringOrThrow[ADT](entityStr: String, mapper: (String => Option[ADT]), entityName: String): ADT = - mapper(entityStr).getOrElse(throw DatabaseException(s"Invalid $entityName in database: $entityStr")) - - def expectValid[ADT](mapper: String => Option[ADT], query: String)(implicit ct: ClassTag[ADT]): ADT = - fromStringOrThrow[ADT](query, mapper, ct.toString()) - - def expectExistsAndValid[ADT](mapper: String => Option[ADT], query: Option[String], contextMsg: String = "")( - implicit ct: ClassTag[ADT]): ADT = { - expectValid[ADT](mapper, query.getOrElse(throw DatabaseException(contextMsg))) - } - - def expectValidOrEmpty[ADT](mapper: String => Option[ADT], query: Option[String], contextMsg: String = "")( - implicit ct: ClassTag[ADT]): Option[ADT] = { - query.map(expectValid[ADT](mapper, _)) - } -} diff --git a/src/main/scala/xyz/driver/core/database/MdcAsyncExecutor.scala b/src/main/scala/xyz/driver/core/database/MdcAsyncExecutor.scala deleted file mode 100644 index 5939efb..0000000 --- a/src/main/scala/xyz/driver/core/database/MdcAsyncExecutor.scala +++ /dev/null @@ -1,53 +0,0 @@ -/** Code ported from "de.geekonaut" %% "slickmdc" % "1.0.0" - * License: @see https://github.com/AVGP/slickmdc/blob/master/LICENSE - * Blog post: @see http://50linesofco.de/post/2016-07-01-slick-and-slf4j-mdc-logging-in-scala.html - */ -package xyz.driver.core -package database - -import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicInteger - -import scala.concurrent._ -import com.typesafe.scalalogging.StrictLogging -import slick.util.AsyncExecutor - -import logging.MdcExecutionContext - -/** Taken from the original Slick AsyncExecutor and simplified - * @see https://github.com/slick/slick/blob/3.1/slick/src/main/scala/slick/util/AsyncExecutor.scala - */ -object MdcAsyncExecutor extends StrictLogging { - - /** Create an AsyncExecutor with a fixed-size thread pool. - * - * @param name The name for the thread pool. - * @param numThreads The number of threads in the pool. - */ - def apply(name: String, numThreads: Int): AsyncExecutor = { - new AsyncExecutor { - val tf = new DaemonThreadFactory(name + "-") - - lazy val executionContext = { - new MdcExecutionContext(ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads, tf))) - } - - def close(): Unit = {} - } - } - - def default(name: String = "AsyncExecutor.default"): AsyncExecutor = apply(name, 20) - - private class DaemonThreadFactory(namePrefix: String) extends ThreadFactory { - private[this] val group = - Option(System.getSecurityManager).fold(Thread.currentThread.getThreadGroup)(_.getThreadGroup) - private[this] val threadNumber = new AtomicInteger(1) - - def newThread(r: Runnable): Thread = { - val t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement, 0) - if (!t.isDaemon) t.setDaemon(true) - if (t.getPriority != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY) - t - } - } -} diff --git a/src/main/scala/xyz/driver/core/database/PatchedHsqldbProfile.scala b/src/main/scala/xyz/driver/core/database/PatchedHsqldbProfile.scala deleted file mode 100644 index e2efd32..0000000 --- a/src/main/scala/xyz/driver/core/database/PatchedHsqldbProfile.scala +++ /dev/null @@ -1,16 +0,0 @@ -package xyz.driver.core.database - -import slick.jdbc.{HsqldbProfile, JdbcType} -import slick.ast.FieldSymbol -import slick.relational.RelationalProfile - -trait PatchedHsqldbProfile extends HsqldbProfile { - override def defaultSqlTypeName(tmd: JdbcType[_], sym: Option[FieldSymbol]): String = tmd.sqlType match { - case java.sql.Types.VARCHAR => - val size = sym.flatMap(_.findColumnOption[RelationalProfile.ColumnOption.Length]) - size.fold("LONGVARCHAR")(l => if (l.varying) s"VARCHAR(${l.length})" else s"CHAR(${l.length})") - case _ => super.defaultSqlTypeName(tmd, sym) - } -} - -object PatchedHsqldbProfile extends PatchedHsqldbProfile diff --git a/src/main/scala/xyz/driver/core/database/Repository.scala b/src/main/scala/xyz/driver/core/database/Repository.scala deleted file mode 100644 index 5d7f787..0000000 --- a/src/main/scala/xyz/driver/core/database/Repository.scala +++ /dev/null @@ -1,74 +0,0 @@ -package xyz.driver.core.database - -import scalaz.std.scalaFuture._ -import scalaz.{ListT, Monad, OptionT} -import slick.lifted.{AbstractTable, CanBeQueryCondition, RunnableCompiled} -import slick.{lifted => sl} - -import scala.concurrent.{ExecutionContext, Future} -import scala.language.higherKinds - -trait Repository { - type T[D] - implicit def monadT: Monad[T] - - def execute[D](operations: T[D]): Future[D] - def noAction[V](v: V): T[V] - def customAction[R](action: => Future[R]): T[R] - - def customAction[R](action: => OptionT[Future, R]): OptionT[T, R] = - OptionT[T, R](customAction(action.run)) -} - -class FutureRepository(executionContext: ExecutionContext) extends Repository { - implicit val exec: ExecutionContext = executionContext - override type T[D] = Future[D] - implicit val monadT: Monad[Future] = implicitly[Monad[Future]] - - def execute[D](operations: T[D]): Future[D] = operations - def noAction[V](v: V): T[V] = Future.successful(v) - def customAction[R](action: => Future[R]): T[R] = action -} - -class SlickRepository(database: Database, executionContext: ExecutionContext) extends Repository { - import database.profile.api._ - implicit val exec: ExecutionContext = executionContext - - override type T[D] = slick.dbio.DBIO[D] - - implicit protected class QueryOps[+E, U](query: Query[E, U, Seq]) { - def resultT: ListT[T, U] = ListT[T, U](query.result.map(_.toList)) - - def maybeFilter[V, R: CanBeQueryCondition](data: Option[V])(f: V => E => R): sl.Query[E, U, Seq] = - data.map(v => query.withFilter(f(v))).getOrElse(query) - } - - implicit protected class CompiledQueryOps[U](compiledQuery: RunnableCompiled[_, Seq[U]]) { - def resultT: ListT[T, U] = ListT.listT[T](compiledQuery.result.map(_.toList)) - } - - private val dbioMonad = new Monad[T] { - override def point[A](a: => A): T[A] = DBIO.successful(a) - - override def bind[A, B](fa: T[A])(f: A => T[B]): T[B] = fa.flatMap(f) - } - - override implicit def monadT: Monad[T] = dbioMonad - - override def execute[D](readOperations: T[D]): Future[D] = { - database.database.run(readOperations.transactionally) - } - - override def noAction[V](v: V): T[V] = DBIO.successful(v) - - override def customAction[R](action: => Future[R]): T[R] = DBIO.from(action) - - def affectsRows(updatesCount: Int): Option[Unit] = { - if (updatesCount > 0) Some(()) else None - } - - def insertReturning[AT <: AbstractTable[_], V](table: TableQuery[AT])( - row: AT#TableElementType): slick.dbio.DBIO[AT#TableElementType] = { - table.returning(table) += row - } -} diff --git a/src/main/scala/xyz/driver/core/database/SlickGetResultSupport.scala b/src/main/scala/xyz/driver/core/database/SlickGetResultSupport.scala deleted file mode 100644 index 8293371..0000000 --- a/src/main/scala/xyz/driver/core/database/SlickGetResultSupport.scala +++ /dev/null @@ -1,30 +0,0 @@ -package xyz.driver.core.database - -import slick.jdbc.GetResult -import xyz.driver.core.date.Date -import xyz.driver.core.time.Time -import xyz.driver.core.{Id, Name} - -trait SlickGetResultSupport { - implicit def GetId[U]: GetResult[Id[U]] = - GetResult(r => Id[U](r.nextString())) - implicit def GetIdOption[U]: GetResult[Option[Id[U]]] = - GetResult(_.nextStringOption().map(Id.apply[U])) - - implicit def GetName[U]: GetResult[Name[U]] = - GetResult(r => Name[U](r.nextString())) - implicit def GetNameOption[U]: GetResult[Option[Name[U]]] = - GetResult(_.nextStringOption().map(Name.apply[U])) - - implicit val GetTime: GetResult[Time] = - GetResult(r => Time(r.nextTimestamp.getTime)) - implicit val GetTimeOption: GetResult[Option[Time]] = - GetResult(_.nextTimestampOption().map(t => Time(t.getTime))) - - implicit val GetDate: GetResult[Date] = - GetResult(r => sqlDateToDate(r.nextDate())) - implicit val GetDateOption: GetResult[Option[Date]] = - GetResult(_.nextDateOption().map(sqlDateToDate)) -} - -object SlickGetResultSupport extends SlickGetResultSupport diff --git a/src/main/scala/xyz/driver/core/database/database.scala b/src/main/scala/xyz/driver/core/database/database.scala deleted file mode 100644 index f3630ff..0000000 --- a/src/main/scala/xyz/driver/core/database/database.scala +++ /dev/null @@ -1,215 +0,0 @@ -package xyz.driver.core - -import slick.basic.DatabaseConfig -import slick.jdbc.JdbcProfile -import xyz.driver.core.date.Date -import xyz.driver.core.time.Time - -import scala.concurrent.Future -import com.typesafe.config.Config - -package database { - - import java.sql.SQLDataException - import java.time.{Instant, LocalDate} - - import eu.timepit.refined.api.{Refined, Validate} - import eu.timepit.refined.refineV - - trait Database { - val profile: JdbcProfile - val database: JdbcProfile#Backend#Database - } - - object Database { - def fromConfig(config: Config, databaseName: String): Database = { - val dbConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig(databaseName, config) - - new Database { - val profile: JdbcProfile = dbConfig.profile - val database: JdbcProfile#Backend#Database = dbConfig.db - } - } - - def fromConfig(databaseName: String): Database = { - fromConfig(com.typesafe.config.ConfigFactory.load(), databaseName) - } - } - - trait ColumnTypes { - val profile: JdbcProfile - } - - trait NameColumnTypes extends ColumnTypes { - import profile.api._ - implicit def `xyz.driver.core.Name.columnType`[T]: BaseColumnType[Name[T]] - } - - object NameColumnTypes { - trait StringName extends NameColumnTypes { - import profile.api._ - - override implicit def `xyz.driver.core.Name.columnType`[T]: BaseColumnType[Name[T]] = - MappedColumnType.base[Name[T], String](_.value, Name[T]) - } - } - - trait DateColumnTypes extends ColumnTypes { - import profile.api._ - implicit def `xyz.driver.core.time.Date.columnType`: BaseColumnType[Date] - implicit def `java.time.LocalDate.columnType`: BaseColumnType[LocalDate] - } - - object DateColumnTypes { - trait SqlDate extends DateColumnTypes { - import profile.api._ - - override implicit def `xyz.driver.core.time.Date.columnType`: BaseColumnType[Date] = - MappedColumnType.base[Date, java.sql.Date](dateToSqlDate, sqlDateToDate) - - override implicit def `java.time.LocalDate.columnType`: BaseColumnType[LocalDate] = - MappedColumnType.base[LocalDate, java.sql.Date](java.sql.Date.valueOf, _.toLocalDate) - } - } - - trait RefinedColumnTypes[T, Predicate] extends ColumnTypes { - import profile.api._ - implicit def `eu.timepit.refined.api.Refined`( - implicit columnType: BaseColumnType[T], - validate: Validate[T, Predicate]): BaseColumnType[T Refined Predicate] - } - - object RefinedColumnTypes { - trait RefinedValue[T, Predicate] extends RefinedColumnTypes[T, Predicate] { - import profile.api._ - override implicit def `eu.timepit.refined.api.Refined`( - implicit columnType: BaseColumnType[T], - validate: Validate[T, Predicate]): BaseColumnType[T Refined Predicate] = - MappedColumnType.base[T Refined Predicate, T]( - _.value, { dbValue => - refineV[Predicate](dbValue) match { - case Left(refinementError) => - throw new SQLDataException( - s"Value in the database doesn't match the refinement constraints: $refinementError") - case Right(refinedValue) => - refinedValue - } - } - ) - } - } - - trait IdColumnTypes extends ColumnTypes { - import profile.api._ - implicit def `xyz.driver.core.Id.columnType`[T]: BaseColumnType[Id[T]] - } - - object IdColumnTypes { - trait UUID extends IdColumnTypes { - import profile.api._ - - override implicit def `xyz.driver.core.Id.columnType`[T] = - MappedColumnType - .base[Id[T], java.util.UUID](id => java.util.UUID.fromString(id.value), uuid => Id[T](uuid.toString)) - } - trait SerialId extends IdColumnTypes { - import profile.api._ - - override implicit def `xyz.driver.core.Id.columnType`[T] = - MappedColumnType.base[Id[T], Long](_.value.toLong, serialId => Id[T](serialId.toString)) - } - trait NaturalId extends IdColumnTypes { - import profile.api._ - - override implicit def `xyz.driver.core.Id.columnType`[T] = - MappedColumnType.base[Id[T], String](_.value, Id[T]) - } - } - - trait GenericIdColumnTypes[IdType] extends ColumnTypes { - import profile.api._ - implicit def `xyz.driver.core.GenericId.columnType`[T]: BaseColumnType[GenericId[T, IdType]] - } - - object GenericIdColumnTypes { - trait UUID extends GenericIdColumnTypes[java.util.UUID] { - import profile.api._ - - override implicit def `xyz.driver.core.GenericId.columnType`[T]: BaseColumnType[GenericId[T, java.util.UUID]] = - MappedColumnType - .base[GenericId[T, java.util.UUID], java.util.UUID](id => id.value, uuid => UuidId[T](uuid)) - } - trait SerialId extends GenericIdColumnTypes[Long] { - import profile.api._ - - override implicit def `xyz.driver.core.GenericId.columnType`[T]: BaseColumnType[GenericId[T, Long]] = - MappedColumnType.base[GenericId[T, Long], Long](_.value, serialId => NumericId[T](serialId)) - } - trait NaturalId extends GenericIdColumnTypes[String] { - import profile.api._ - - override implicit def `xyz.driver.core.GenericId.columnType`[T]: BaseColumnType[GenericId[T, String]] = - MappedColumnType.base[GenericId[T, String], String](_.value, Id[T]) - } - } - - trait TimestampColumnTypes extends ColumnTypes { - import profile.api._ - implicit def `xyz.driver.core.time.Time.columnType`: BaseColumnType[Time] - implicit def `java.time.Instant.columnType`: BaseColumnType[Instant] - } - - object TimestampColumnTypes { - trait SqlTimestamp extends TimestampColumnTypes { - import profile.api._ - - override implicit def `xyz.driver.core.time.Time.columnType`: BaseColumnType[Time] = - MappedColumnType.base[Time, java.sql.Timestamp]( - time => new java.sql.Timestamp(time.millis), - timestamp => Time(timestamp.getTime)) - - override implicit def `java.time.Instant.columnType`: BaseColumnType[Instant] = - MappedColumnType.base[Instant, java.sql.Timestamp](java.sql.Timestamp.from, _.toInstant) - } - - trait PrimitiveTimestamp extends TimestampColumnTypes { - import profile.api._ - - override implicit def `xyz.driver.core.time.Time.columnType`: BaseColumnType[Time] = - MappedColumnType.base[Time, Long](_.millis, Time.apply) - - override implicit def `java.time.Instant.columnType`: BaseColumnType[Instant] = - MappedColumnType.base[Instant, Long](_.toEpochMilli, Instant.ofEpochMilli) - } - } - - trait KeyMappers extends ColumnTypes { - import profile.api._ - - def uuidKeyMapper[T] = - MappedColumnType - .base[Id[T], java.util.UUID](id => java.util.UUID.fromString(id.value), uuid => Id[T](uuid.toString)) - def serialKeyMapper[T] = MappedColumnType.base[Id[T], Long](_.value.toLong, serialId => Id[T](serialId.toString)) - def naturalKeyMapper[T] = MappedColumnType.base[Id[T], String](_.value, Id[T]) - } - - trait GenericKeyMappers extends ColumnTypes { - import profile.api._ - - def uuidKeyMapper[T] = - MappedColumnType - .base[UuidId[T], java.util.UUID](id => id.value, uuid => UuidId[T](uuid)) - def serialKeyMapper[T] = MappedColumnType.base[NumericId[T], Long](_.value, serialId => NumericId[T](serialId)) - def naturalKeyMapper[T] = MappedColumnType.base[Id[T], String](_.value, Id[T]) - } - - trait DatabaseObject extends ColumnTypes { - def createTables(): Future[Unit] - def disconnect(): Unit - } - - abstract class DatabaseObjectAdapter extends DatabaseObject { - def createTables(): Future[Unit] = Future.successful(()) - def disconnect(): Unit = {} - } -} diff --git a/src/main/scala/xyz/driver/core/database/package.scala b/src/main/scala/xyz/driver/core/database/package.scala deleted file mode 100644 index aee14c6..0000000 --- a/src/main/scala/xyz/driver/core/database/package.scala +++ /dev/null @@ -1,61 +0,0 @@ -package xyz.driver.core - -import java.sql.{Date => SqlDate} -import java.util.Calendar - -import date.{Date, Month} -import slick.dbio._ -import slick.jdbc.JdbcProfile -import slick.relational.RelationalProfile - -package object database { - - type Schema = { - def create: DBIOAction[Unit, NoStream, Effect.Schema] - def drop: DBIOAction[Unit, NoStream, Effect.Schema] - } - - @deprecated( - "sbt-slick-codegen 0.11.0+ no longer needs to generate these methods. Please use the new `CodegenTables` trait when upgrading.", - "driver-core 1.8.12") - type GeneratedTables = { - // structure of Slick data model traits generated by sbt-slick-codegen - val profile: JdbcProfile - def schema: profile.SchemaDescription - - def createNamespaceSchema: StreamingDBIO[Vector[Unit], Unit] - def dropNamespaceSchema: StreamingDBIO[Vector[Unit], Unit] - } - - /** A structural type for schema traits generated by sbt-slick-codegen. - * This will compile with codegen versions before 0.11.0, but note - * that methods in [[GeneratedTables]] are no longer generated. - */ - type CodegenTables[Profile <: RelationalProfile] = { - val profile: Profile - def schema: profile.SchemaDescription - } - - private[database] def sqlDateToDate(sqlDate: SqlDate): Date = { - // NOTE: SQL date does not have a time component, so this date - // should only be interpreted in the running JVMs timezone. - val cal = Calendar.getInstance() - cal.setTime(sqlDate) - Date(cal.get(Calendar.YEAR), Month(cal.get(Calendar.MONTH)), cal.get(Calendar.DAY_OF_MONTH)) - } - - private[database] def dateToSqlDate(date: Date): SqlDate = { - val cal = Calendar.getInstance() - cal.set(date.year, date.month, date.day, 0, 0, 0) - new SqlDate(cal.getTime.getTime) - } - - @deprecated("Dal is deprecated. Please use Repository trait instead!", "1.8.26") - type Dal = Repository - - @deprecated("SlickDal is deprecated. Please use SlickRepository class instead!", "1.8.26") - type SlickDal = SlickRepository - - @deprecated("FutureDal is deprecated. Please use FutureRepository class instead!", "1.8.26") - type FutureDal = FutureRepository -} diff --git a/src/main/scala/xyz/driver/core/date.scala b/src/main/scala/xyz/driver/core/date.scala deleted file mode 100644 index 5454093..0000000 --- a/src/main/scala/xyz/driver/core/date.scala +++ /dev/null @@ -1,109 +0,0 @@ -package xyz.driver.core - -import java.util.Calendar - -import enumeratum._ -import scalaz.std.anyVal._ -import scalaz.syntax.equal._ - -import scala.collection.immutable.IndexedSeq -import scala.util.Try - -/** - * Driver Date type and related validators/extractors. - * Day, Month, and Year extractors are from ISO 8601 strings => driver...Date integers. - * TODO: Decouple extractors from ISO 8601, as we might want to parse other formats. - */ -object date { - - sealed trait DayOfWeek extends EnumEntry - object DayOfWeek extends Enum[DayOfWeek] { - case object Monday extends DayOfWeek - case object Tuesday extends DayOfWeek - case object Wednesday extends DayOfWeek - case object Thursday extends DayOfWeek - case object Friday extends DayOfWeek - case object Saturday extends DayOfWeek - case object Sunday extends DayOfWeek - - val values: IndexedSeq[DayOfWeek] = findValues - - val All: Set[DayOfWeek] = values.toSet - - def fromString(day: String): Option[DayOfWeek] = withNameInsensitiveOption(day) - } - - type Day = Int @@ Day.type - - object Day { - def apply(value: Int): Day = { - require(1 to 31 contains value, "Day must be in range 1 <= value <= 31") - value.asInstanceOf[Day] - } - - def unapply(dayString: String): Option[Int] = { - require(dayString.length === 2, s"ISO 8601 day string, DD, must have length 2: $dayString") - Try(dayString.toInt).toOption.map(apply) - } - } - - type Month = Int @@ Month.type - - object Month { - def apply(value: Int): Month = { - require(0 to 11 contains value, "Month is zero-indexed: 0 <= value <= 11") - value.asInstanceOf[Month] - } - val JANUARY = Month(Calendar.JANUARY) - val FEBRUARY = Month(Calendar.FEBRUARY) - val MARCH = Month(Calendar.MARCH) - val APRIL = Month(Calendar.APRIL) - val MAY = Month(Calendar.MAY) - val JUNE = Month(Calendar.JUNE) - val JULY = Month(Calendar.JULY) - val AUGUST = Month(Calendar.AUGUST) - val SEPTEMBER = Month(Calendar.SEPTEMBER) - val OCTOBER = Month(Calendar.OCTOBER) - val NOVEMBER = Month(Calendar.NOVEMBER) - val DECEMBER = Month(Calendar.DECEMBER) - - def unapply(monthString: String): Option[Month] = { - require(monthString.length === 2, s"ISO 8601 month string, MM, must have length 2: $monthString") - Try(monthString.toInt).toOption.map(isoM => apply(isoM - 1)) - } - } - - type Year = Int @@ Year.type - - object Year { - def apply(value: Int): Year = value.asInstanceOf[Year] - - def unapply(yearString: String): Option[Int] = { - require(yearString.length === 4, s"ISO 8601 year string, YYYY, must have length 4: $yearString") - Try(yearString.toInt).toOption.map(apply) - } - } - - final case class Date(year: Int, month: Month, day: Int) { - override def toString = f"$year%04d-${month + 1}%02d-$day%02d" - } - - object Date { - implicit def dateOrdering: Ordering[Date] = Ordering.fromLessThan { (date1, date2) => - if (date1.year != date2.year) { - date1.year < date2.year - } else if (date1.month != date2.month) { - date1.month < date2.month - } else { - date1.day < date2.day - } - } - - def fromString(dateString: String): Option[Date] = { - dateString.split('-') match { - case Array(Year(year), Month(month), Day(day)) => Some(Date(year, month, day)) - case _ => None - } - } - } -} diff --git a/src/main/scala/xyz/driver/core/domain.scala b/src/main/scala/xyz/driver/core/domain.scala deleted file mode 100644 index 59bed54..0000000 --- a/src/main/scala/xyz/driver/core/domain.scala +++ /dev/null @@ -1,40 +0,0 @@ -package xyz.driver.core - -import com.google.i18n.phonenumbers.PhoneNumberUtil -import scalaz.Equal -import scalaz.std.string._ -import scalaz.syntax.equal._ - -object domain { - - final case class Email(username: String, domain: String) { - override def toString: String = username + "@" + domain - } - - object Email { - implicit val emailEqual: Equal[Email] = Equal.equal { - case (left, right) => left.toString.toLowerCase === right.toString.toLowerCase - } - - def parse(emailString: String): Option[Email] = { - Some(emailString.split("@")) collect { - case Array(username, domain) => Email(username, domain) - } - } - } - - final case class PhoneNumber(countryCode: String = "1", number: String) { - override def toString: String = s"+$countryCode $number" - } - - object PhoneNumber { - - private val phoneUtil = PhoneNumberUtil.getInstance() - - def parse(phoneNumber: String): Option[PhoneNumber] = { - val validated = - util.Try(phoneUtil.parseAndKeepRawInput(phoneNumber, "US")).toOption.filter(phoneUtil.isValidNumber) - validated.map(pn => PhoneNumber(pn.getCountryCode.toString, pn.getNationalNumber.toString)) - } - } -} diff --git a/src/main/scala/xyz/driver/core/generators.scala b/src/main/scala/xyz/driver/core/generators.scala deleted file mode 100644 index 0a4a7ab..0000000 --- a/src/main/scala/xyz/driver/core/generators.scala +++ /dev/null @@ -1,145 +0,0 @@ -package xyz.driver.core - -import enumeratum._ -import java.math.MathContext -import java.time.{Instant, LocalDate, ZoneOffset} -import java.util.UUID - -import xyz.driver.core.time.{Time, TimeOfDay, TimeRange} -import xyz.driver.core.date.{Date, DayOfWeek} - -import scala.reflect.ClassTag -import scala.util.Random -import eu.timepit.refined.refineV -import eu.timepit.refined.api.Refined -import eu.timepit.refined.collection._ - -object generators { - - private val random = new Random - import random._ - private val secureRandom = new java.security.SecureRandom() - - private val DefaultMaxLength = 10 - private val StringLetters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ ".toSet - private val NonAmbigiousCharacters = "abcdefghijkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789" - private val Numbers = "0123456789" - - private def nextTokenString(length: Int, chars: IndexedSeq[Char]): String = { - val builder = new StringBuilder - for (_ <- 0 until length) { - builder += chars(secureRandom.nextInt(chars.length)) - } - builder.result() - } - - /** Creates a random invitation token. - * - * This token is meant fo human input and avoids using ambiguous characters such as 'O' and '0'. It - * therefore contains less entropy and is not meant to be used as a cryptographic secret. */ - @deprecated( - "The term 'token' is too generic and security and readability conventions are not well defined. " + - "Services should implement their own version that suits their security requirements.", - "1.11.0" - ) - def nextToken(length: Int): String = nextTokenString(length, NonAmbigiousCharacters) - - @deprecated( - "The term 'token' is too generic and security and readability conventions are not well defined. " + - "Services should implement their own version that suits their security requirements.", - "1.11.0" - ) - def nextNumericToken(length: Int): String = nextTokenString(length, Numbers) - - def nextInt(maxValue: Int, minValue: Int = 0): Int = random.nextInt(maxValue - minValue) + minValue - - def nextBoolean(): Boolean = random.nextBoolean() - - def nextDouble(): Double = random.nextDouble() - - def nextId[T](): Id[T] = Id[T](nextUuid().toString) - - def nextId[T](maxLength: Int): Id[T] = Id[T](nextString(maxLength)) - - def nextNumericId[T](): Id[T] = Id[T](nextLong.abs.toString) - - def nextNumericId[T](maxValue: Int): Id[T] = Id[T](nextInt(maxValue).toString) - - def nextUuidId[T](): UuidId[T] = UuidId[T](nextUuid()) - - def nextName[T](maxLength: Int = DefaultMaxLength): Name[T] = Name[T](nextString(maxLength)) - - def nextNonEmptyName[T](maxLength: Int = DefaultMaxLength): NonEmptyName[T] = - NonEmptyName[T](nextNonEmptyString(maxLength)) - - def nextUuid(): UUID = java.util.UUID.randomUUID - - def nextRevision[T](): Revision[T] = Revision[T](nextUuid().toString) - - def nextString(maxLength: Int = DefaultMaxLength): String = - (oneOf[Char](StringLetters) +: arrayOf(oneOf[Char](StringLetters), maxLength - 1)).mkString - - def nextNonEmptyString(maxLength: Int = DefaultMaxLength): String Refined NonEmpty = { - refineV[NonEmpty]( - (oneOf[Char](StringLetters) +: arrayOf(oneOf[Char](StringLetters), maxLength - 1)).mkString - ).right.get - } - - def nextOption[T](value: => T): Option[T] = if (nextBoolean()) Option(value) else None - - def nextPair[L, R](left: => L, right: => R): (L, R) = (left, right) - - def nextTriad[F, S, T](first: => F, second: => S, third: => T): (F, S, T) = (first, second, third) - - def nextInstant(): Instant = Instant.ofEpochMilli(math.abs(nextLong() % System.currentTimeMillis)) - - def nextTime(): Time = nextInstant() - - def nextTimeOfDay: TimeOfDay = TimeOfDay(java.time.LocalTime.MIN.plusSeconds(nextLong), java.util.TimeZone.getDefault) - - def nextTimeRange(): TimeRange = { - val oneTime = nextTime() - val anotherTime = nextTime() - - TimeRange( - Time(scala.math.min(oneTime.millis, anotherTime.millis)), - Time(scala.math.max(oneTime.millis, anotherTime.millis))) - } - - def nextDate(): Date = nextTime().toDate(java.util.TimeZone.getTimeZone("UTC")) - - def nextLocalDate(): LocalDate = nextInstant().atZone(ZoneOffset.UTC).toLocalDate - - def nextDayOfWeek(): DayOfWeek = oneOf(DayOfWeek.All) - - def nextBigDecimal(multiplier: Double = 1000000.00, precision: Int = 2): BigDecimal = - BigDecimal(multiplier * nextDouble, new MathContext(precision)) - - def oneOf[T](items: T*): T = oneOf(items.toSet) - - def oneOf[T](items: Set[T]): T = items.toSeq(nextInt(items.size)) - - def oneOf[T <: EnumEntry](enum: Enum[T]): T = oneOf(enum.values: _*) - - def arrayOf[T: ClassTag](generator: => T, maxLength: Int = DefaultMaxLength, minLength: Int = 0): Array[T] = - Array.fill(nextInt(maxLength, minLength))(generator) - - def seqOf[T](generator: => T, maxLength: Int = DefaultMaxLength, minLength: Int = 0): Seq[T] = - Seq.fill(nextInt(maxLength, minLength))(generator) - - def vectorOf[T](generator: => T, maxLength: Int = DefaultMaxLength, minLength: Int = 0): Vector[T] = - Vector.fill(nextInt(maxLength, minLength))(generator) - - def listOf[T](generator: => T, maxLength: Int = DefaultMaxLength, minLength: Int = 0): List[T] = - List.fill(nextInt(maxLength, minLength))(generator) - - def setOf[T](generator: => T, maxLength: Int = DefaultMaxLength, minLength: Int = 0): Set[T] = - seqOf(generator, maxLength, minLength).toSet - - def mapOf[K, V]( - keyGenerator: => K, - valueGenerator: => V, - maxLength: Int = DefaultMaxLength, - minLength: Int = 0): Map[K, V] = - seqOf(nextPair(keyGenerator, valueGenerator), maxLength, minLength).toMap -} diff --git a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala deleted file mode 100644 index df6611e..0000000 --- a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala +++ /dev/null @@ -1,190 +0,0 @@ -package xyz.driver.core -package init - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.StatusCodes -import akka.http.scaladsl.server.{RequestContext, Route} -import akka.stream.scaladsl.Source -import akka.stream.{ActorMaterializer, Materializer} -import akka.util.ByteString -import com.softwaremill.sttp.SttpBackend -import com.softwaremill.sttp.akkahttp.AkkaHttpBackend -import com.typesafe.config.Config -import kamon.Kamon -import kamon.statsd.StatsDReporter -import kamon.system.SystemMetrics -import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggingCompat, SpanContext} -import xyz.driver.core.rest.HttpRestServiceTransport - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} - -/** Provides standard scaffolding for applications that use Akka HTTP. - * - * Among the features provided are: - * - * - execution contexts of various kinds - * - basic JVM metrics collection via Kamon - * - startup and shutdown hooks - * - * This trait provides a minimal, runnable application. It is designed to be extended by various mixins (see - * Known Subclasses) in this package. - * - * By implementing a "main" method, mixing this trait into a singleton object will result in a runnable - * application. - * I.e. - * {{{ - * object Main extends AkkaBootable // this is a runnable application - * }}} - * In case this trait isn't mixed into a top-level singleton object, the [[AkkaBootable#main main]] method should - * be called explicitly, in order to initialize and start this application. - * I.e. - * {{{ - * object Main { - * val bootable = new AkkaBootable {} - * def main(args: Array[String]): Unit = { - * bootable.main(args) - * } - * } - * }}} - * - * @groupname config Configuration - * @groupname contexts Contexts - * @groupname utilities Utilities - * @groupname hooks Overrideable Hooks - */ -trait AkkaBootable { - - /** The application's name. This value is extracted from the build configuration. - * @group config - */ - def name: String = BuildInfoReflection.name - - /** The application's version (or git sha). This value is extracted from the build configuration. - * @group config - */ - def version: Option[String] = BuildInfoReflection.version - - /** TCP port that this application will listen on. - * @group config - */ - def port: Int = 8080 - - // contexts - /** General-purpose actor system for this application. - * @group contexts - */ - implicit lazy val system: ActorSystem = ActorSystem(name) - - /** General-purpose stream materializer for this application. - * @group contexts - */ - implicit lazy val materializer: Materializer = ActorMaterializer() - - /** General-purpose execution context for this application. - * - * Note that no thread-blocking tasks should be submitted to this context. In cases that do require blocking, - * a custom execution context should be defined and used. See - * [[https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html this guide]] - * on how to configure custom execution contexts in Akka. - * - * @group contexts - */ - implicit lazy val executionContext: ExecutionContext = system.dispatcher - - /** Default HTTP client, backed by this application's actor system. - * @group contexts - */ - implicit lazy val httpClient: SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend.usingActorSystem(system) - - /** Client RPC transport abstraction. - * @group contexts - */ - implicit lazy val clientTransport: HttpRestServiceTransport = new HttpRestServiceTransport( - applicationName = Name(name), - applicationVersion = version.getOrElse("<unknown>"), - actorSystem = system, - executionContext = executionContext, - reporter = reporter - ) - - // utilities - /** Default reporter instance. - * - * Note that this is currently defined to be a ScalaLoggerLike, so that it can be implicitly converted to a - * [[com.typesafe.scalalogging.Logger]] when necessary. This conversion is provided to ensure backwards - * compatibility with code that requires such a logger. Warning: using a logger instead of a reporter will - * not include tracing information in any messages! - * - * @group utilities - */ - def reporter: Reporter with ScalaLoggingCompat = - new Reporter with NoTraceReporter with ScalaLoggingCompat { - val logger = ScalaLoggingCompat.defaultScalaLogger(json = false) - } - - /** Top-level application configuration. - * - * TODO: should we expose some config wrapper rather than the typesafe config library? - * (Author's note: I'm a fan of TOML since it's so simple. There's already an implementation for Scala - * [[https://github.com/jvican/stoml]].) - * - * @group utilities - */ - def config: Config = system.settings.config - - /** Overridable startup hook. - * - * Invoked by [[main]] during application startup. - * - * @group hooks - */ - def startup(): Unit = () - - /** Overridable shutdown hook. - * - * Invoked on an arbitrary thread when a shutdown signal is caught. - * - * @group hooks - */ - def shutdown(): Unit = () - - /** Overridable HTTP route. - * - * Any services that present an HTTP interface should implement this method. - * - * @group hooks - * @see [[HttpApi]] - */ - def route: Route = (ctx: RequestContext) => ctx.complete(StatusCodes.NotFound) - - private def syslog(message: String)(implicit ctx: SpanContext) = reporter.info(s"application: " + message) - - /** This application's entry point. */ - def main(args: Array[String]): Unit = { - implicit val ctx = SpanContext.fresh() - syslog("initializing metrics collection") - Kamon.addReporter(new StatsDReporter()) - SystemMetrics.startCollecting() - - system.registerOnTermination { - syslog("running shutdown hooks") - shutdown() - syslog("bye!") - } - - syslog("running startup hooks") - startup() - - syslog("binding to network interface") - val binding = Await.result( - Http().bindAndHandle(route, "::", port), - 2.seconds - ) - syslog(s"listening to ${binding.localAddress}") - - syslog("startup complete") - } - -} diff --git a/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala b/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala deleted file mode 100644 index 0e53085..0000000 --- a/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala +++ /dev/null @@ -1,37 +0,0 @@ -package xyz.driver.core -package init - -import scala.reflect.runtime -import scala.util.Try -import scala.util.control.NonFatal - -/** Utility object to retrieve fields from static build configuration objects. */ -private[init] object BuildInfoReflection { - - final val BuildInfoName = "xyz.driver.BuildInfo" - - lazy val name: String = get[String]("name") - lazy val version: Option[String] = find[String]("version") - - /** Lookup a given field in the build configuration. This field is required to exist. */ - private def get[A](fieldName: String): A = - try { - val mirror = runtime.currentMirror - val module = mirror.staticModule(BuildInfoName) - val instance = mirror.reflectModule(module).instance - val accessor = module.info.decl(mirror.universe.TermName(fieldName)).asMethod - mirror.reflect(instance).reflectMethod(accessor).apply().asInstanceOf[A] - } catch { - case NonFatal(err) => - throw new RuntimeException( - s"Cannot find field name '$fieldName' in $BuildInfoName. Please define (or generate) a singleton " + - s"object with that field. Alternatively, in order to avoid runtime reflection, you may override the " + - s"caller with a static value.", - err - ) - } - - /** Try finding a given field in the build configuration. If the field does not exist, None is returned. */ - private def find[A](fieldName: String): Option[A] = Try { get[A](fieldName) }.toOption - -} diff --git a/src/main/scala/xyz/driver/core/init/CloudServices.scala b/src/main/scala/xyz/driver/core/init/CloudServices.scala deleted file mode 100644 index 857dd4c..0000000 --- a/src/main/scala/xyz/driver/core/init/CloudServices.scala +++ /dev/null @@ -1,89 +0,0 @@ -package xyz.driver.core -package init - -import java.nio.file.Paths - -import xyz.driver.core.messaging.{CreateOnDemand, GoogleBus, QueueBus, StreamBus} -import xyz.driver.core.reporting._ -import xyz.driver.core.rest.DnsDiscovery -import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage} - -import scala.collection.JavaConverters._ - -/** Mixin trait that provides essential cloud utilities. */ -trait CloudServices extends AkkaBootable { self => - - /** The platform that this application is running on. - * @group config - */ - def platform: Platform = Platform.current - - /** Service discovery for the current platform. - * @group utilities - */ - lazy val discovery: DnsDiscovery = { - def getOverrides(): Map[String, String] = { - val block = config.getObject("services.dev-overrides").unwrapped().asScala - for ((key, value) <- block) yield { - require(value.isInstanceOf[String], s"Service URL override for '$key' must be a string. Found '$value'.") - key -> value.toString - } - }.toMap - val overrides = platform match { - case Platform.Dev => getOverrides() - // TODO: currently, deployed services must be configured via Kubernetes DNS resolver. Maybe we may want to - // provide a way to override deployed services as well. - case _ => Map.empty[String, String] - } - new DnsDiscovery(clientTransport, overrides) - } - - /* TODO: this reporter uses the platform to determine if JSON logging should be enabled. - * Since the default logger uses slf4j, its settings must be specified before a logger - * is first accessed. This in turn leads to somewhat convoluted code, - * since we can't log when the platform being is determined. - * A potential fix would be to make the log format independent of the platform, and always log - * as JSON for example. - */ - override lazy val reporter: Reporter with ScalaLoggingCompat = { - Console.println("determining platform") // scalastyle:ignore - val r = platform match { - case p @ Platform.GoogleCloud(_, _) => - new GoogleReporter(p.credentials, p.namespace) with ScalaLoggingCompat with GoogleMdcLogger { - val logger = ScalaLoggingCompat.defaultScalaLogger(true) - } - case Platform.Dev => - new NoTraceReporter with ScalaLoggingCompat { - val logger = ScalaLoggingCompat.defaultScalaLogger(false) - } - } - r.info(s"application started on platform '${platform}'")(SpanContext.fresh()) - r - } - - /** Object storage. - * - * When running on a cloud platform, prepends `$project-` to bucket names, where `$project` - * is the project ID (for example 'driverinc-production` or `driverinc-sandbox`). - * - * @group utilities - */ - def storage(bucketName: String): BlobStorage = - platform match { - case p @ Platform.GoogleCloud(keyfile, _) => - GcsBlobStorage.fromKeyfile(keyfile, s"${p.project}-$bucketName") - case Platform.Dev => - new FileSystemBlobStorage(Paths.get(s".data-$bucketName")) - } - - /** Message bus. - * @group utilities - */ - def messageBus: StreamBus = platform match { - case p @ Platform.GoogleCloud(_, namespace) => - new GoogleBus(p.credentials, namespace) with StreamBus with CreateOnDemand - case Platform.Dev => - new QueueBus()(self.system) with StreamBus - } - -} diff --git a/src/main/scala/xyz/driver/core/init/HttpApi.scala b/src/main/scala/xyz/driver/core/init/HttpApi.scala deleted file mode 100644 index 81428bf..0000000 --- a/src/main/scala/xyz/driver/core/init/HttpApi.scala +++ /dev/null @@ -1,100 +0,0 @@ -package xyz.driver.core -package init - -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport -import akka.http.scaladsl.server.{RequestContext, Route, RouteConcatenation} -import spray.json.DefaultJsonProtocol._ -import spray.json._ -import xyz.driver.core.rest.Swagger -import xyz.driver.core.rest.directives.Directives -import akka.http.scaladsl.model.headers._ -import xyz.driver.core.reporting.Reporter.CausalRelation -import xyz.driver.core.rest.headers.Traceparent - -import scala.collection.JavaConverters._ - -/** Mixin trait that provides some well-known HTTP endpoints, diagnostic header injection and forwarding, - * and exposes an application-specific route that must be implemented by services. - * @see ProtobufApi - */ -trait HttpApi extends CloudServices with Directives with SprayJsonSupport { self => - - /** Route that handles the application's business logic. - * @group hooks - */ - def applicationRoute: Route - - /** Classes with Swagger annotations. - * @group hooks - */ - def swaggerRouteClasses: Set[Class[_]] - - private val healthRoute = path("health") { - complete(Map("status" -> "good").toJson) - } - - private val versionRoute = path("version") { - complete(Map("name" -> self.name.toJson, "version" -> self.version.toJson).toJson) - } - - private lazy val swaggerRoute = { - val generator = new Swagger( - "", - "https" :: "http" :: Nil, - self.version.getOrElse("<unknown>"), - swaggerRouteClasses, - config, - reporter - ) - generator.routes ~ generator.swaggerUINew - } - - private def cors(inner: Route): Route = - cors( - config.getStringList("application.cors.allowedOrigins").asScala.toSet, - xyz.driver.core.rest.AllowedHeaders - )(inner) - - private def traced(inner: Route): Route = (ctx: RequestContext) => { - val tags = Map( - "service.version" -> version.getOrElse("<unknown>"), - // open tracing semantic tags - "span.kind" -> "server", - "service" -> name, - "http.url" -> ctx.request.uri.toString, - "http.method" -> ctx.request.method.value, - "peer.hostname" -> ctx.request.uri.authority.host.toString, - // google's tracing console provides extra search features if we define these tags - "/http/path" -> ctx.request.uri.path.toString, - "/http/method" -> ctx.request.method.value.toString, - "/http/url" -> ctx.request.uri.toString, - "/http/user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("<unknown>") - ) - val parent = ctx.request.header[Traceparent].map { header => - header.spanContext -> CausalRelation.Child - } - reporter - .traceWithOptionalParentAsync(s"http_handle_rpc", tags, parent) { spanContext => - val header = Traceparent(spanContext) - val withHeader = ctx.withRequest( - ctx.request - .removeHeader(header.name) - .addHeader(header) - ) - inner(withHeader) - } - } - - /** Extended route. */ - override lazy val route: Route = traced( - cors( - RouteConcatenation.concat( - healthRoute, - versionRoute, - swaggerRoute, - applicationRoute - ) - ) - ) - -} diff --git a/src/main/scala/xyz/driver/core/init/Platform.scala b/src/main/scala/xyz/driver/core/init/Platform.scala deleted file mode 100644 index 2daa2c8..0000000 --- a/src/main/scala/xyz/driver/core/init/Platform.scala +++ /dev/null @@ -1,31 +0,0 @@ -package xyz.driver.core -package init - -import java.nio.file.{Files, Path, Paths} - -import com.google.auth.oauth2.ServiceAccountCredentials - -sealed trait Platform -object Platform { - case class GoogleCloud(keyfile: Path, namespace: String) extends Platform { - def credentials: ServiceAccountCredentials = ServiceAccountCredentials.fromStream( - Files.newInputStream(keyfile) - ) - def project: String = credentials.getProjectId - } - // case object AliCloud extends Platform - case object Dev extends Platform - - lazy val fromEnv: Platform = { - def isGoogle = sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").map { value => - val keyfile = Paths.get(value) - require(Files.isReadable(keyfile), s"Google credentials file $value is not readable.") - val namespace = sys.env.getOrElse("SERVICE_NAMESPACE", sys.error("Namespace not set")) - GoogleCloud(keyfile, namespace) - } - isGoogle.getOrElse(Dev) - } - - def current: Platform = fromEnv - -} diff --git a/src/main/scala/xyz/driver/core/init/ProtobufApi.scala b/src/main/scala/xyz/driver/core/init/ProtobufApi.scala deleted file mode 100644 index 284ac67..0000000 --- a/src/main/scala/xyz/driver/core/init/ProtobufApi.scala +++ /dev/null @@ -1,7 +0,0 @@ -package xyz.driver.core -package init - -/** Mixin trait for services that implement an API based on Protocol Buffers and gRPC. - * TODO: implement - */ -trait ProtobufApi extends AkkaBootable diff --git a/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala b/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala deleted file mode 100644 index 61ca363..0000000 --- a/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala +++ /dev/null @@ -1,4 +0,0 @@ -package xyz.driver.core -package init - -trait SimpleHttpApp extends AkkaBootable with HttpApi with CloudServices diff --git a/src/main/scala/xyz/driver/core/json.scala b/src/main/scala/xyz/driver/core/json.scala deleted file mode 100644 index 48011e4..0000000 --- a/src/main/scala/xyz/driver/core/json.scala +++ /dev/null @@ -1,408 +0,0 @@ -package xyz.driver.core - -import java.net.InetAddress -import java.time.format.DateTimeFormatter -import java.time.{Instant, LocalDate} -import java.util.{TimeZone, UUID} - -import akka.http.scaladsl.unmarshalling.Unmarshaller -import com.neovisionaries.i18n.{CountryCode, CurrencyCode} -import enumeratum._ -import eu.timepit.refined.api.{Refined, Validate} -import eu.timepit.refined.collection.NonEmpty -import eu.timepit.refined.refineV -import spray.json._ -import xyz.driver.core.auth.AuthCredentials -import xyz.driver.core.date.{Date, DayOfWeek, Month} -import xyz.driver.core.domain.{Email, PhoneNumber} -import xyz.driver.core.rest.directives.{PathMatchers, Unmarshallers} -import xyz.driver.core.rest.errors._ -import xyz.driver.core.time.{Time, TimeOfDay} - -import scala.reflect.runtime.universe._ -import scala.reflect.{ClassTag, classTag} -import scala.util.Try -import scala.util.control.NonFatal - -object json extends PathMatchers with Unmarshallers { - import DefaultJsonProtocol._ - - implicit def idFormat[T]: RootJsonFormat[Id[T]] = new RootJsonFormat[Id[T]] { - def write(id: Id[T]) = JsString(id.value) - - def read(value: JsValue): Id[T] = value match { - case JsString(id) if Try(UUID.fromString(id)).isSuccess => Id[T](id.toLowerCase) - case JsString(id) => Id[T](id) - case _ => throw DeserializationException("Id expects string") - } - } - - implicit def uuidIdFormat[T]: RootJsonFormat[UuidId[T]] = new RootJsonFormat[UuidId[T]] { - def write(id: UuidId[T]) = JsString(id.toString) - - def read(value: JsValue): UuidId[T] = value match { - case JsString(id) if Try(UUID.fromString(id)).isSuccess => UuidId[T](UUID.fromString(id)) - case _ => throw DeserializationException("Id expects UUID") - } - } - - implicit def numericIdFormat[T]: RootJsonFormat[NumericId[T]] = new RootJsonFormat[NumericId[T]] { - def write(id: NumericId[T]) = JsString(id.toString) - - def read(value: JsValue): NumericId[T] = value match { - case JsString(id) if Try(id.toLong).isSuccess => NumericId[T](id.toLong) - case _ => throw DeserializationException("Id expects number") - } - } - - implicit def taggedFormat[F, T](implicit underlying: JsonFormat[F], convert: F => F @@ T = null): JsonFormat[F @@ T] = - new JsonFormat[F @@ T] { - import tagging._ - - private val transformReadValue = Option(convert).getOrElse((_: F).tagged[T]) - - override def write(obj: F @@ T): JsValue = underlying.write(obj) - - override def read(json: JsValue): F @@ T = transformReadValue(underlying.read(json)) - } - - implicit def nameFormat[T] = new RootJsonFormat[Name[T]] { - def write(name: Name[T]) = JsString(name.value) - - def read(value: JsValue): Name[T] = value match { - case JsString(name) => Name[T](name) - case _ => throw DeserializationException("Name expects string") - } - } - - implicit val timeFormat: RootJsonFormat[Time] = new RootJsonFormat[Time] { - def write(time: Time) = JsObject("timestamp" -> JsNumber(time.millis)) - - def read(value: JsValue): Time = Time(instantFormat.read(value)) - } - - implicit val instantFormat: JsonFormat[Instant] = new JsonFormat[Instant] { - def write(instant: Instant): JsValue = JsString(instant.toString) - - def read(value: JsValue): Instant = value match { - case JsObject(fields) => - fields - .get("timestamp") - .flatMap { - case JsNumber(millis) => Some(Instant.ofEpochMilli(millis.longValue())) - case _ => None - } - .getOrElse(deserializationError(s"Instant expects ISO timestamp but got ${value.compactPrint}")) - case JsNumber(millis) => Instant.ofEpochMilli(millis.longValue()) - case JsString(str) => - try Instant.parse(str) - catch { case NonFatal(_) => deserializationError(s"Instant expects ISO timestamp but got $str") } - case _ => deserializationError(s"Instant expects ISO timestamp but got ${value.compactPrint}") - } - } - - implicit object localTimeFormat extends JsonFormat[java.time.LocalTime] { - private val formatter = TimeOfDay.getFormatter - def read(json: JsValue): java.time.LocalTime = json match { - case JsString(chars) => - java.time.LocalTime.parse(chars) - case _ => deserializationError(s"Expected time string got ${json.toString}") - } - - def write(obj: java.time.LocalTime): JsValue = { - JsString(obj.format(formatter)) - } - } - - implicit object timeZoneFormat extends JsonFormat[java.util.TimeZone] { - override def write(obj: TimeZone): JsValue = { - JsString(obj.getID()) - } - - override def read(json: JsValue): TimeZone = json match { - case JsString(chars) => - java.util.TimeZone.getTimeZone(chars) - case _ => deserializationError(s"Expected time zone string got ${json.toString}") - } - } - - implicit val timeOfDayFormat: RootJsonFormat[TimeOfDay] = jsonFormat2(TimeOfDay.apply) - - implicit val dayOfWeekFormat: JsonFormat[DayOfWeek] = new enumeratum.EnumJsonFormat(DayOfWeek) - - implicit val dateFormat = new RootJsonFormat[Date] { - def write(date: Date) = JsString(date.toString) - def read(value: JsValue): Date = value match { - case JsString(dateString) => - Date - .fromString(dateString) - .getOrElse( - throw DeserializationException(s"Misformated ISO 8601 Date. Expected YYYY-MM-DD, but got $dateString.")) - case _ => throw DeserializationException(s"Date expects a string, but got $value.") - } - } - - implicit val localDateFormat = new RootJsonFormat[LocalDate] { - val format = DateTimeFormatter.ISO_LOCAL_DATE - - def write(date: LocalDate): JsValue = JsString(date.format(format)) - def read(value: JsValue): LocalDate = value match { - case JsString(dateString) => - try LocalDate.parse(dateString, format) - catch { - case NonFatal(_) => - throw deserializationError(s"Malformed ISO 8601 Date. Expected YYYY-MM-DD, but got $dateString.") - } - case _ => - throw deserializationError(s"Malformed ISO 8601 Date. Expected YYYY-MM-DD, but got ${value.compactPrint}.") - } - } - - implicit val monthFormat = new RootJsonFormat[Month] { - def write(month: Month) = JsNumber(month) - def read(value: JsValue): Month = value match { - case JsNumber(month) if 0 <= month && month <= 11 => Month(month.toInt) - case _ => throw DeserializationException("Expected a number from 0 to 11") - } - } - - implicit def revisionFormat[T]: RootJsonFormat[Revision[T]] = new RootJsonFormat[Revision[T]] { - def write(revision: Revision[T]) = JsString(revision.id.toString) - - def read(value: JsValue): Revision[T] = value match { - case JsString(revision) => Revision[T](revision) - case _ => throw DeserializationException("Revision expects uuid string") - } - } - - implicit val base64Format = new RootJsonFormat[Base64] { - def write(base64Value: Base64) = JsString(base64Value.value) - - def read(value: JsValue): Base64 = value match { - case JsString(base64Value) => Base64(base64Value) - case _ => throw DeserializationException("Base64 format expects string") - } - } - - implicit val emailFormat = new RootJsonFormat[Email] { - def write(email: Email) = JsString(email.username + "@" + email.domain) - def read(json: JsValue): Email = json match { - - case JsString(value) => - Email.parse(value).getOrElse { - deserializationError("Expected '@' symbol in email string as Email, but got " + json.toString) - } - - case _ => - deserializationError("Expected string as Email, but got " + json.toString) - } - } - - implicit object phoneNumberFormat extends RootJsonFormat[PhoneNumber] { - private val basicFormat = jsonFormat2(PhoneNumber.apply) - override def write(obj: PhoneNumber): JsValue = basicFormat.write(obj) - override def read(json: JsValue): PhoneNumber = { - PhoneNumber.parse(basicFormat.read(json).toString).getOrElse(deserializationError("Invalid phone number")) - } - } - - implicit val authCredentialsFormat = new RootJsonFormat[AuthCredentials] { - override def read(json: JsValue): AuthCredentials = { - json match { - case JsObject(fields) => - val emailField = fields.get("email") - val identifierField = fields.get("identifier") - val passwordField = fields.get("password") - - (emailField, identifierField, passwordField) match { - case (_, _, None) => - deserializationError("password field must be set") - case (Some(JsString(em)), _, Some(JsString(pw))) => - val email = Email.parse(em).getOrElse(throw deserializationError(s"failed to parse email $em")) - AuthCredentials(email.toString, pw) - case (_, Some(JsString(id)), Some(JsString(pw))) => AuthCredentials(id.toString, pw.toString) - case (None, None, _) => deserializationError("identifier must be provided") - case _ => deserializationError(s"failed to deserialize ${json.prettyPrint}") - } - case _ => deserializationError(s"failed to deserialize ${json.prettyPrint}") - } - } - - override def write(obj: AuthCredentials): JsValue = JsObject( - "identifier" -> JsString(obj.identifier), - "password" -> JsString(obj.password) - ) - } - - implicit object inetAddressFormat extends JsonFormat[InetAddress] { - override def read(json: JsValue): InetAddress = json match { - case JsString(ipString) => - Try(InetAddress.getByName(ipString)) - .getOrElse(deserializationError(s"Invalid IP Address: $ipString")) - case _ => deserializationError(s"Expected string for IP Address, got $json") - } - - override def write(obj: InetAddress): JsValue = - JsString(obj.getHostAddress) - } - - implicit val countryCodeFormat: JsonFormat[CountryCode] = javaEnumFormat[CountryCode] - - implicit val currencyCodeFormat: JsonFormat[CurrencyCode] = javaEnumFormat[CurrencyCode] - - object enumeratum { - - def enumUnmarshaller[T <: EnumEntry](enum: Enum[T]): Unmarshaller[String, T] = - Unmarshaller.strict { value => - enum.withNameOption(value).getOrElse(unrecognizedValue(value, enum.values)) - } - - trait HasJsonFormat[T <: EnumEntry] { enum: Enum[T] => - - implicit val format: JsonFormat[T] = new EnumJsonFormat(enum) - - implicit val unmarshaller: Unmarshaller[String, T] = - Unmarshaller.strict { value => - enum.withNameOption(value).getOrElse(unrecognizedValue(value, enum.values)) - } - } - - class EnumJsonFormat[T <: EnumEntry](enum: Enum[T]) extends JsonFormat[T] { - override def read(json: JsValue): T = json match { - case JsString(name) => enum.withNameOption(name).getOrElse(unrecognizedValue(name, enum.values)) - case _ => deserializationError("Expected string as enumeration value, but got " + json.toString) - } - - override def write(obj: T): JsValue = JsString(obj.entryName) - } - - private def unrecognizedValue(value: String, possibleValues: Seq[Any]): Nothing = - deserializationError(s"Unexpected value $value. Expected one of: ${possibleValues.mkString("[", ", ", "]")}") - } - - class EnumJsonFormat[T](mapping: (String, T)*) extends RootJsonFormat[T] { - private val map = mapping.toMap - - override def write(value: T): JsValue = { - map.find(_._2 == value).map(_._1) match { - case Some(name) => JsString(name) - case _ => serializationError(s"Value $value is not found in the mapping $map") - } - } - - override def read(json: JsValue): T = json match { - case JsString(name) => - map.getOrElse(name, throw DeserializationException(s"Value $name is not found in the mapping $map")) - case _ => deserializationError("Expected string as enumeration value, but got " + json.toString) - } - } - - def javaEnumFormat[T <: java.lang.Enum[_]: ClassTag]: JsonFormat[T] = { - val values = classTag[T].runtimeClass.asInstanceOf[Class[T]].getEnumConstants - new EnumJsonFormat[T](values.map(v => v.name() -> v): _*) - } - - class ValueClassFormat[T: TypeTag](writeValue: T => BigDecimal, create: BigDecimal => T) extends JsonFormat[T] { - def write(valueClass: T) = JsNumber(writeValue(valueClass)) - def read(json: JsValue): T = json match { - case JsNumber(value) => create(value) - case _ => deserializationError(s"Expected number as ${typeOf[T].getClass.getName}, but got " + json.toString) - } - } - - class GadtJsonFormat[T: TypeTag]( - typeField: String, - typeValue: PartialFunction[T, String], - jsonFormat: PartialFunction[String, JsonFormat[_ <: T]]) - extends RootJsonFormat[T] { - - def write(value: T): JsValue = { - - val valueType = typeValue.applyOrElse(value, { v: T => - deserializationError(s"No Value type for this type of ${typeOf[T].getClass.getName}: " + v.toString) - }) - - val valueFormat = - jsonFormat.applyOrElse(valueType, { f: String => - deserializationError(s"No Json format for this type of $valueType") - }) - - valueFormat.asInstanceOf[JsonFormat[T]].write(value) match { - case JsObject(fields) => JsObject(fields ++ Map(typeField -> JsString(valueType))) - case _ => serializationError(s"${typeOf[T].getClass.getName} serialized not to a JSON object") - } - } - - def read(json: JsValue): T = json match { - case JsObject(fields) => - val valueJson = JsObject(fields.filterNot(_._1 == typeField)) - fields(typeField) match { - case JsString(valueType) => - val valueFormat = jsonFormat.applyOrElse(valueType, { t: String => - deserializationError(s"Unknown ${typeOf[T].getClass.getName} type ${fields(typeField)}") - }) - valueFormat.read(valueJson) - case _ => - deserializationError(s"Unknown ${typeOf[T].getClass.getName} type ${fields(typeField)}") - } - case _ => - deserializationError(s"Expected Json Object as ${typeOf[T].getClass.getName}, but got " + json.toString) - } - } - - object GadtJsonFormat { - - def create[T: TypeTag](typeField: String)(typeValue: PartialFunction[T, String])( - jsonFormat: PartialFunction[String, JsonFormat[_ <: T]]) = { - - new GadtJsonFormat[T](typeField, typeValue, jsonFormat) - } - } - - /** - * Provides the JsonFormat for the Refined types provided by the Refined library. - * - * @see https://github.com/fthomas/refined - */ - implicit def refinedJsonFormat[T, Predicate]( - implicit valueFormat: JsonFormat[T], - validate: Validate[T, Predicate]): JsonFormat[Refined[T, Predicate]] = - new JsonFormat[Refined[T, Predicate]] { - def write(x: T Refined Predicate): JsValue = valueFormat.write(x.value) - def read(value: JsValue): T Refined Predicate = { - refineV[Predicate](valueFormat.read(value))(validate) match { - case Right(refinedValue) => refinedValue - case Left(refinementError) => deserializationError(refinementError) - } - } - } - - implicit def nonEmptyNameFormat[T](implicit nonEmptyStringFormat: JsonFormat[Refined[String, NonEmpty]]) = - new RootJsonFormat[NonEmptyName[T]] { - def write(name: NonEmptyName[T]) = JsString(name.value.value) - - def read(value: JsValue): NonEmptyName[T] = - NonEmptyName[T](nonEmptyStringFormat.read(value)) - } - - implicit val serviceExceptionFormat: RootJsonFormat[ServiceException] = - GadtJsonFormat.create[ServiceException]("type") { - case _: InvalidInputException => "InvalidInputException" - case _: InvalidActionException => "InvalidActionException" - case _: UnauthorizedException => "UnauthorizedException" - case _: ResourceNotFoundException => "ResourceNotFoundException" - case _: ExternalServiceException => "ExternalServiceException" - case _: ExternalServiceTimeoutException => "ExternalServiceTimeoutException" - case _: DatabaseException => "DatabaseException" - } { - case "InvalidInputException" => jsonFormat(InvalidInputException, "message") - case "InvalidActionException" => jsonFormat(InvalidActionException, "message") - case "UnauthorizedException" => jsonFormat(UnauthorizedException, "message") - case "ResourceNotFoundException" => jsonFormat(ResourceNotFoundException, "message") - case "ExternalServiceException" => - jsonFormat(ExternalServiceException, "serviceName", "serviceMessage", "serviceException") - case "ExternalServiceTimeoutException" => jsonFormat(ExternalServiceTimeoutException, "message") - case "DatabaseException" => jsonFormat(DatabaseException, "message") - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala deleted file mode 100644 index c23ea0f..0000000 --- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala +++ /dev/null @@ -1,153 +0,0 @@ -package xyz.driver.core.messaging -import java.nio.ByteBuffer -import java.util - -import com.aliyun.mns.client.{AsyncCallback, CloudAccount} -import com.aliyun.mns.common.ServiceException -import com.aliyun.mns.model -import com.aliyun.mns.model._ - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} - -class AliyunBus( - accountId: String, - accessId: String, - accessSecret: String, - region: String, - namespace: String, - pullTimeout: Int -)(implicit val executionContext: ExecutionContext) - extends Bus { - private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" - private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) - private val client = cloudAccount.getMNSClient - - // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the - // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError - // occasionally. We mask both of these errors and return an empty list of messages - private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError") - - override val defaultMaxMessages: Int = 10 - - case class MessageId(queueName: String, messageHandle: String) - - case class Message[A](id: MessageId, data: A) extends BasicMessage[A] - - case class SubscriptionConfig( - subscriptionPrefix: String = accessId, - ackTimeout: FiniteDuration = 10.seconds - ) - - override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() - - private def rawTopicName(topic: Topic[_]) = - s"$namespace-${topic.name}" - private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = - s"$namespace-${config.subscriptionPrefix}-${topic.name}" - - override def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - import collection.JavaConverters._ - val subscriptionName = rawSubscriptionName(config, topic) - val queueRef = client.getQueueRef(subscriptionName) - - val promise = Promise[Seq[model.Message]] - queueRef.asyncBatchPopMessage( - maxMessages, - pullTimeout, - new AsyncCallback[util.List[model.Message]] { - override def onSuccess(result: util.List[model.Message]): Unit = { - promise.success(result.asScala) - } - override def onFail(ex: Exception): Unit = ex match { - case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) => - promise.success(Nil) - case _ => - promise.failure(ex) - } - } - ) - - promise.future.map(_.map { message => - import scala.xml.XML - val messageId = MessageId(subscriptionName, message.getReceiptHandle) - val messageXML = XML.loadString(message.getMessageBodyAsRawString) - val messageNode = messageXML \ "Message" - val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) - - val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) - Message(messageId, deserializedMessage) - }) - } - - override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { - import collection.JavaConverters._ - require(messages.nonEmpty, "Acknowledged message list must be non-empty") - - val queueRef = client.getQueueRef(messages.head.queueName) - - val promise = Promise[Unit] - queueRef.asyncBatchDeleteMessage( - messages.map(_.messageHandle).asJava, - new AsyncCallback[Void] { - override def onSuccess(result: Void): Unit = promise.success(()) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { - val topicRef = client.getTopicRef(rawTopicName(topic)) - - val publishMessages = messages.map { message => - val promise = Promise[TopicMessage] - - val topicMessage = new Base64TopicMessage - topicMessage.setMessageBody(topic.serialize(message).array()) - - topicRef.asyncPublishMessage( - topicMessage, - new AsyncCallback[TopicMessage] { - override def onSuccess(result: TopicMessage): Unit = promise.success(result) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - Future.sequence(publishMessages).map(_ => ()) - } - - def createTopic(topic: Topic[_]): Future[Unit] = Future { - val topicName = rawTopicName(topic) - val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) - if (!topicExists) { - val topicMeta = new TopicMeta - topicMeta.setTopicName(topicName) - client.createTopic(topicMeta) - } - } - - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { - val subscriptionName = rawSubscriptionName(config, topic) - val topicName = rawTopicName(topic) - val topicRef = client.getTopicRef(topicName) - - val queueMeta = new QueueMeta - queueMeta.setQueueName(subscriptionName) - queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) - client.createQueue(queueMeta) - - val subscriptionMeta = new SubscriptionMeta - subscriptionMeta.setSubscriptionName(subscriptionName) - subscriptionMeta.setTopicName(topicName) - subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) - topicRef.subscribe(subscriptionMeta) - } -} diff --git a/src/main/scala/xyz/driver/core/messaging/Bus.scala b/src/main/scala/xyz/driver/core/messaging/Bus.scala deleted file mode 100644 index 75954f4..0000000 --- a/src/main/scala/xyz/driver/core/messaging/Bus.scala +++ /dev/null @@ -1,74 +0,0 @@ -package xyz.driver.core -package messaging - -import scala.concurrent._ -import scala.language.higherKinds - -/** Base trait for representing message buses. - * - * Message buses are expected to provide "at least once" delivery semantics and are - * expected to retry delivery when a message remains unacknowledged for a reasonable - * amount of time. */ -trait Bus { - - /** Type of unique message identifiers. Usually a string or UUID. */ - type MessageId - - /** Most general kind of message. Any implementation of a message bus must provide - * the fields and methods specified in this trait. */ - trait BasicMessage[A] { self: Message[A] => - - /** All messages must have unique IDs so that they can be acknowledged unambiguously. */ - def id: MessageId - - /** Actual message content. */ - def data: A - - } - - /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage - * (as that defines the minimal required fields of a messages), but may be refined to - * provide bus-specific additional data. */ - type Message[A] <: BasicMessage[A] - - /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */ - type SubscriptionConfig - - /** Default value for a subscription configuration. It is such that any service will have a unique subscription - * for every topic, shared among all its instances. */ - val defaultSubscriptionConfig: SubscriptionConfig - - /** Maximum amount of messages handled in a single retrieval call. */ - val defaultMaxMessages = 64 - - /** Execution context that is used to query and dispatch messages from this bus. */ - implicit val executionContext: ExecutionContext - - /** Retrieve any new messages in the mailbox of a subscription. - * - * Any retrieved messages become "outstanding" and should not be returned by this function - * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged. - * In that case, they will again be considered new and will be returned by this function. - * - * Note that although outstanding and acknowledged messages will eventually be removed from - * mailboxes, no guarantee can be made that a message will be delivered only once. */ - def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]] - - /** Acknowledge that a given series of messages has been handled and should - * not be delivered again. - * - * Note that messages become eventually acknowledged and hence may be delivered more than once. - * @see fetchMessages() - */ - def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] - - /** Send a series of messages to a topic. - * - * The returned future will complete once messages have been accepted to the underlying bus. - * No guarantee can be made of their delivery to subscribers. */ - def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] - -} diff --git a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala deleted file mode 100644 index 1af5308..0000000 --- a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala +++ /dev/null @@ -1,49 +0,0 @@ -package xyz.driver.core -package messaging - -import java.util.concurrent.ConcurrentHashMap - -import scala.async.Async.{async, await} -import scala.concurrent.Future - -/** Utility mixin that will ensure that topics and subscriptions exist before - * attempting to read or write from or to them. - */ -trait CreateOnDemand extends Bus { - - /** Create the given topic. This operation is idempotent and is expected to succeed if the topic - * already exists. - */ - def createTopic(topic: Topic[_]): Future[Unit] - - /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription - * already exists. - */ - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] - - private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]] - private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]] - - private def ensureTopic(topic: Topic[_]) = - createdTopics.computeIfAbsent(topic, t => createTopic(t)) - - private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) = - createdSubscriptions.computeIfAbsent(topic -> config, { - case (t, c) => createSubscription(t, c) - }) - - abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { - await(ensureTopic(topic)) - await(super.publishMessages(topic, messages)) - } - - abstract override def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = async { - await(ensureTopic(topic)) - await(ensureSubscription(topic, config)) - await(super.fetchMessages(topic, config, maxMessages)) - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala deleted file mode 100644 index b296c50..0000000 --- a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala +++ /dev/null @@ -1,267 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer -import java.nio.file.{Files, Path, Paths} -import java.security.Signature -import java.time.Instant -import java.util - -import com.google.auth.oauth2.ServiceAccountCredentials -import com.softwaremill.sttp._ -import spray.json.DefaultJsonProtocol._ -import spray.json._ - -import scala.async.Async.{async, await} -import scala.concurrent._ -import scala.concurrent.duration._ - -/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]] - * - * == Overview == - * - * The Pub/Sub message system is focused around a few concepts: 'topics', - * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may - * have multiple ''subscriptions'' associated to them. Every subscription to a - * topic will receive all messages sent to the topic. Messages are enqueued in - * a subscription until they are acknowledged by a ''subscriber''. Multiple - * subscribers may be associated to a subscription, in which case messages will - * get delivered arbitrarily among them. - * - * Topics and subscriptions are named resources which can be specified in - * Pub/Sub's configuration and may be queried. Subscribers on the other hand, - * are ephemeral processes that query a subscription on a regular basis, handle any - * messages and acknowledge them. - * - * == Delivery semantics == - * - * - at least once - * - no ordering - * - * == Retention == - * - * - configurable retry delay for unacknowledged messages, defaults to 10s - * - undeliverable messages are kept for 7 days - * - * @param credentials Google cloud credentials, usually the same as used by a - * service. Must have admin access to topics and - * descriptions. - * @param namespace The namespace in which this bus is running. Will be used to - * determine the exact name of topics and subscriptions. - * @param pullTimeout Delay after which a call to fetchMessages() will return an - * empty list, assuming that no messages have been received. - * @param executionContext Execution context to run any blocking commands. - * @param backend sttp backend used to query Pub/Sub's HTTP API - */ -class GoogleBus( - credentials: ServiceAccountCredentials, - namespace: String, - pullTimeout: Duration = 90.seconds -)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _]) - extends Bus { - import GoogleBus.Protocol - - case class MessageId(subscription: String, ackId: String) - - case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A] - type Message[A] = PubsubMessage[A] - - /** Subscription-specific configuration - * - * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions. - * All messages sent to a subscription will be dispatched arbitrarily - * among any subscribers. Defaults to the email of the credentials used by this - * bus instance, thereby giving every service a unique subscription to every topic. - * To give every service instance a unique subscription, this must be changed to a - * unique value. - * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again. - */ - case class SubscriptionConfig( - subscriptionPrefix: String = credentials.getClientEmail.split("@")(0), - ackTimeout: FiniteDuration = 10.seconds - ) - override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() - - /** Obtain an authentication token valid for the given duration - * https://developers.google.com/identity/protocols/OAuth2ServiceAccount - */ - private def freshAuthToken(duration: FiniteDuration): Future[String] = { - def jwt = { - val now = Instant.now().getEpochSecond - val base64 = util.Base64.getEncoder - val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) - val body = base64.encodeToString( - s"""|{ - | "iss": "${credentials.getClientEmail}", - | "scope": "https://www.googleapis.com/auth/pubsub", - | "aud": "https://www.googleapis.com/oauth2/v4/token", - | "exp": ${now + duration.toSeconds}, - | "iat": $now - |}""".stripMargin.getBytes("utf-8") - ) - val signer = Signature.getInstance("SHA256withRSA") - signer.initSign(credentials.getPrivateKey) - signer.update(s"$header.$body".getBytes("utf-8")) - val signature = base64.encodeToString(signer.sign()) - s"$header.$body.$signature" - } - sttp - .post(uri"https://www.googleapis.com/oauth2/v4/token") - .body( - "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", - "assertion" -> jwt - ) - .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) - .send() - .map(_.unsafeBody) - } - - // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time - private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes)) - - private val baseUri = uri"https://pubsub.googleapis.com/" - - private def rawTopicName(topic: Topic[_]) = - s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}" - private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = - s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}" - - def createTopic(topic: Topic[_]): Future[Unit] = async { - val request = sttp - .put(baseUri.path(s"v1/${rawTopicName(topic)}")) - .auth - .bearer(await(getToken())) - val result = await(request.send()) - result.body match { - case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it - throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error") - case _ => () - } - } - - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async { - val request = sttp - .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}")) - .auth - .bearer(await(getToken())) - .body( - JsObject( - "topic" -> rawTopicName(topic).toJson, - "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson - ).compactPrint - ) - val result = await(request.send()) - result.body match { - case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it - throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error") - case _ => () - } - } - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { - import Protocol.bufferFormat - val buffers: Seq[ByteBuffer] = messages.map(topic.serialize) - val request = sttp - .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish")) - .auth - .bearer(await(getToken())) - .body( - JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint - ) - await(request.send()).unsafeBody - () - } - - override def fetchMessages[A]( - topic: Topic[A], - subscriptionConfig: SubscriptionConfig, - maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async { - val subscription = rawSubscriptionName(subscriptionConfig, topic) - val request = sttp - .post(baseUri.path(s"v1/$subscription:pull")) - .auth - .bearer(await(getToken().map(x => x))) - .body( - JsObject( - "returnImmediately" -> JsFalse, - "maxMessages" -> JsNumber(maxMessages) - ).compactPrint - ) - .readTimeout(pullTimeout) - .mapResponse(_.parseJson) - - val messages = await(request.send()).unsafeBody match { - case JsObject(fields) if fields.isEmpty => Seq() - case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages - } - - messages.map { msg => - PubsubMessage[A]( - MessageId(subscription, msg.ackId), - topic.deserialize(msg.message.data), - msg.message.publishTime - ) - } - } - - override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async { - val request = sttp - .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge")) - .auth - .bearer(await(getToken())) - .body( - JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint - ) - await(request.send()).unsafeBody - () - } - -} - -object GoogleBus { - - private object Protocol extends DefaultJsonProtocol { - case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage]) - case class ReceivedMessage(ackId: String, message: PubsubMessage) - case class PubsubMessage(data: ByteBuffer, publishTime: Instant) - - implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] { - override def write(obj: Instant): JsValue = JsString(obj.toString) - override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) - } - implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] { - override def write(obj: ByteBuffer): JsValue = - JsString(util.Base64.getEncoder.encodeToString(obj.array())) - - override def read(json: JsValue): ByteBuffer = { - val encodedBytes = json.convertTo[String].getBytes("utf-8") - val decodedBytes = util.Base64.getDecoder.decode(encodedBytes) - ByteBuffer.wrap(decodedBytes) - } - } - - implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage) - implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage) - implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull) - } - - def fromKeyfile(keyfile: Path, namespace: String)( - implicit executionContext: ExecutionContext, - backend: SttpBackend[Future, _]): GoogleBus = { - val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile)) - new GoogleBus(creds, namespace) - } - - @deprecated( - "Reading from the environment adds opaque dependencies and hance leads to extra complexity. Use fromKeyfile instead.", - "driver-core 1.12.2") - def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = { - def env(key: String) = { - require(sys.env.contains(key), s"Environment variable $key is not set.") - sys.env(key) - } - val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS")) - fromKeyfile(keyfile, env("SERVICE_NAMESPACE")) - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala deleted file mode 100644 index 45c9ed5..0000000 --- a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala +++ /dev/null @@ -1,126 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer - -import akka.actor.{Actor, ActorSystem, Props} - -import scala.collection.mutable -import scala.collection.mutable.Map -import scala.concurrent.duration._ -import scala.concurrent.{Future, Promise} - -/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system - * and is intended for testing purposes only. */ -class QueueBus(implicit system: ActorSystem) extends Bus { - import system.dispatcher - - override type SubscriptionConfig = Long - override val defaultSubscriptionConfig: Long = 0 - override val executionContext = system.dispatcher - - override type MessageId = (String, SubscriptionConfig, Long) - type Message[A] = BasicMessage[A] - - private object ActorMessages { - case class Send[A](topic: Topic[A], messages: Seq[A]) - case class Ack(messages: Seq[MessageId]) - case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) - case object Unack - } - - class Subscription { - val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - } - - private class BusMaster extends Actor { - var _id = 0L - def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { - _id += 1; (topic, cfg, _id) - } - - val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty - - def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { - topics.get(topic) match { - case Some(t) => - t.getOrElseUpdate(cfg, new Subscription) - case None => - topics += topic -> mutable.Map.empty - ensureSubscription(topic, cfg) - } - } - - override def preStart(): Unit = { - context.system.scheduler.schedule(1.seconds, 1.seconds) { - self ! ActorMessages.Unack - } - } - - override def receive: Receive = { - case ActorMessages.Send(topic, messages) => - val buffers = messages.map(topic.serialize) - val subscriptions = topics.getOrElse(topic.name, Map.empty) - for ((cfg, subscription) <- subscriptions) { - for (buffer <- buffers) { - subscription.mailbox += nextId(topic.name, cfg) -> buffer - } - } - - case ActorMessages.Fetch(topic, cfg, max, promise) => - ensureSubscription(topic.name, cfg) - val subscription = topics(topic.name)(cfg) - val messages = subscription.mailbox.take(max) - subscription.unacked ++= messages - subscription.mailbox --= messages.map(_._1) - promise.success(messages.toSeq.map { - case (ackId, buffer) => - new Message[Any] { - val id = ackId - val data = topic.deserialize(buffer) - } - }) - - case ActorMessages.Ack(messageIds) => - for (id @ (topic, cfg, _) <- messageIds) { - ensureSubscription(topic, cfg) - val subscription = topics(topic)(cfg) - subscription.unacked -= id - } - - case ActorMessages.Unack => - for ((_, subscriptions) <- topics) { - for ((_, subscription) <- subscriptions) { - subscription.mailbox ++= subscription.unacked - subscription.unacked.clear() - } - } - } - - } - - val actor = system.actorOf(Props(new BusMaster)) - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { - actor ! ActorMessages.Send(topic, messages) - } - - override def fetchMessages[A]( - topic: messaging.Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - val result = Promise[Seq[Message[A]]] - actor ! ActorMessages.Fetch(topic, config, maxMessages, result) - result.future - } - - override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { - actor ! ActorMessages.Ack(ids) - } - -} - -object QueueBus { - def apply(implicit system: ActorSystem) = new QueueBus -} diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala deleted file mode 100644 index a9ba3a7..0000000 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ /dev/null @@ -1,102 +0,0 @@ -package xyz.driver.core -package messaging - -import akka.NotUsed -import akka.stream.Materializer -import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} - -import scala.collection.mutable.ListBuffer -import scala.concurrent.Future -import scala.concurrent.duration._ - -/** An extension to message buses that offers an Akka-Streams API. - * - * Example usage of a stream that subscribes to one topic, prints any messages - * it receives and finally acknowledges them - * their receipt. - * {{{ - * val bus: StreamBus = ??? - * val topic = Topic.string("topic") - * bus.subscribe(topic1) - * .map{ msg => - * print(msg.data) - * msg - * } - * .to(bus.acknowledge) - * .run() - * }}} */ -trait StreamBus extends Bus { - - /** Flow that publishes any messages to a given topic. - * Emits messages once they have been published to the underlying bus. */ - def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { - Flow[A] - .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) - .mapAsync(1) { a => - publishMessages(topic, a).map(_ => a) - } - .mapConcat(list => list.toList) - } - - /** Sink that acknowledges the receipt of a message. */ - def acknowledge: Sink[MessageId, NotUsed] = { - Flow[MessageId] - .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) - .mapAsync(1)(acknowledgeMessages(_)) - .to(Sink.ignore) - } - - /** Source that listens to a subscription and receives any messages sent to its topic. */ - def subscribe[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { - Source - .unfoldAsync((topic, config))( - topicAndConfig => - fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => - Some(topicAndConfig -> msgs)) - ) - .filter(_.nonEmpty) - .mapConcat(messages => messages.toList) - } - - def runWithRestart[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - minBackoff: FiniteDuration = 3.seconds, - maxBackoff: FiniteDuration = 30.seconds, - randomFactor: Double = 0.2, - maxRestarts: Int = 20 - )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { - RestartSource - .withBackoff[MessageId]( - minBackoff, - maxBackoff, - randomFactor, - maxRestarts - ) { () => - subscribe(topic, config) - .via(processMessage.recover({ case _ => Nil })) - .log(topic.name) - .mapConcat(identity) - } - .to(acknowledge) - .run() - } - - def handleMessage[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - parallelism: Int = 1, - minBackoff: FiniteDuration = 3.seconds, - maxBackoff: FiniteDuration = 30.seconds, - randomFactor: Double = 0.2, - maxRestarts: Int = 20 - )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { - runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { - Flow[Message[A]].mapAsync(parallelism) { message => - processMessage(message.data).map(_ => message.id :: Nil) - } - } - } -} diff --git a/src/main/scala/xyz/driver/core/messaging/Topic.scala b/src/main/scala/xyz/driver/core/messaging/Topic.scala deleted file mode 100644 index 32fd764..0000000 --- a/src/main/scala/xyz/driver/core/messaging/Topic.scala +++ /dev/null @@ -1,43 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer - -/** A topic is a named group of messages that all share a common schema. - * @tparam Message type of messages sent over this topic */ -trait Topic[Message] { - - /** Name of this topic (must be unique). */ - def name: String - - /** Convert a message to its wire format that will be sent over a bus. */ - def serialize(message: Message): ByteBuffer - - /** Convert a message from its wire format. */ - def deserialize(message: ByteBuffer): Message - -} - -object Topic { - - /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */ - def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] { - def name = name0 - override def serialize(message: ByteBuffer): ByteBuffer = message - override def deserialize(message: ByteBuffer): ByteBuffer = message - } - - /** Create a topic that represents data as UTF-8 encoded strings. */ - def string(name0: String): Topic[String] = new Topic[String] { - def name = name0 - override def serialize(message: String): ByteBuffer = { - ByteBuffer.wrap(message.getBytes("utf-8")) - } - override def deserialize(message: ByteBuffer): String = { - val bytes = new Array[Byte](message.remaining()) - message.get(bytes) - new String(bytes, "utf-8") - } - } - -} diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala b/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala deleted file mode 100644 index f5c41cf..0000000 --- a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala +++ /dev/null @@ -1,14 +0,0 @@ -package xyz.driver.core -package reporting - -import org.slf4j.MDC - -trait GoogleMdcLogger extends Reporter { self: GoogleReporter => - - abstract override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = { - MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}") - super.log(severity, message, reason) - } - -} diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala deleted file mode 100644 index 14c4954..0000000 --- a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala +++ /dev/null @@ -1,217 +0,0 @@ -package xyz.driver.core -package reporting - -import java.security.Signature -import java.time.Instant -import java.util - -import akka.NotUsed -import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithComplete} -import akka.stream.{Materializer, OverflowStrategy} -import com.google.auth.oauth2.ServiceAccountCredentials -import com.softwaremill.sttp._ -import spray.json.DerivedJsonProtocol._ -import spray.json._ -import xyz.driver.core.reporting.Reporter.CausalRelation - -import scala.async.Async._ -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal -import scala.util.{Failure, Random, Success, Try} - -/** A reporter that collects traces and submits them to - * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]]. - */ -class GoogleReporter( - val credentials: ServiceAccountCredentials, - namespace: String, - buffer: Int = GoogleReporter.DefaultBufferSize, - interval: FiniteDuration = GoogleReporter.DefaultInterval)( - implicit client: SttpBackend[Future, _], - mat: Materializer, - ec: ExecutionContext -) extends Reporter { - import GoogleReporter._ - - private val getToken: () => Future[String] = Refresh.every(55.minutes) { - def jwt = { - val now = Instant.now().getEpochSecond - val base64 = util.Base64.getEncoder - val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) - val body = base64.encodeToString( - s"""|{ - | "iss": "${credentials.getClientEmail}", - | "scope": "https://www.googleapis.com/auth/trace.append", - | "aud": "https://www.googleapis.com/oauth2/v4/token", - | "exp": ${now + 60.minutes.toSeconds}, - | "iat": $now - |}""".stripMargin.getBytes("utf-8") - ) - val signer = Signature.getInstance("SHA256withRSA") - signer.initSign(credentials.getPrivateKey) - signer.update(s"$header.$body".getBytes("utf-8")) - val signature = base64.encodeToString(signer.sign()) - s"$header.$body.$signature" - } - sttp - .post(uri"https://www.googleapis.com/oauth2/v4/token") - .body( - "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", - "assertion" -> jwt - ) - .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) - .send() - .map(_.unsafeBody) - } - - private val sendToGoogle: Sink[Span, NotUsed] = RestartSink.withBackoff( - minBackoff = 3.seconds, - maxBackoff = 30.seconds, - randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly - ) { () => - Flow[Span] - .groupedWithin(buffer, interval) - .mapAsync(1) { spans => - async { - val token = await(getToken()) - val res = await( - sttp - .post(uri"https://cloudtrace.googleapis.com/v2/projects/${credentials.getProjectId}/traces:batchWrite") - .auth - .bearer(token) - .body( - Spans(spans).toJson.compactPrint - ) - .send() - .map(_.unsafeBody)) - res - } - } - .recover { - case NonFatal(e) => - System.err.println(s"Error submitting trace spans: $e") // scalastyle: ignore - throw e - } - .to(Sink.ignore) - } - private val queue: SourceQueueWithComplete[Span] = Source - .queue[Span](buffer, OverflowStrategy.dropHead) - .to(sendToGoogle) - .run() - - private def submit(span: Span): Unit = queue.offer(span).failed.map { e => - System.err.println(s"Error adding span to submission queue: $e") - } - - private def startSpan( - traceId: String, - spanId: String, - parentSpanId: Option[String], - displayName: String, - attributes: Map[String, String]) = Span( - s"projects/${credentials.getProjectId}/traces/$traceId/spans/$spanId", - spanId, - parentSpanId, - TruncatableString(displayName), - Instant.now(), - Instant.now(), - Attributes(attributes ++ Map("service.namespace" -> namespace)), - Failure(new IllegalStateException("span status not set")) - ) - - def traceWithOptionalParent[A]( - operationName: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => A): A = { - val child = parent match { - case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") - case None => SpanContext.fresh() - } - val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) - val result = operation(child) - span.endTime = Instant.now() - submit(span) - result - } - - def traceWithOptionalParentAsync[A]( - operationName: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => Future[A]): Future[A] = { - val child = parent match { - case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") - case None => SpanContext.fresh() - } - val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) - val result = operation(child) - result.onComplete { result => - span.endTime = Instant.now() - span.status = result - submit(span) - } - result - } - - override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = - sys.error("Submitting logs directly to GCP is " + - "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly - -} - -object GoogleReporter { - - val DefaultBufferSize: Int = 10000 - val DefaultInterval: FiniteDuration = 5.seconds - - private case class Attributes(attributeMap: Map[String, String]) - private case class TruncatableString(value: String) - private case class Span( - name: String, - spanId: String, - parentSpanId: Option[String], - displayName: TruncatableString, - startTime: Instant, - var endTime: Instant, - var attributes: Attributes, - var status: Try[_] - ) - - private case class Spans(spans: Seq[Span]) - - private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { - override def write(obj: Instant): JsValue = obj.toString.toJson - override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) - } - - private implicit val mapFormat = new RootJsonFormat[Map[String, String]] { - override def read(json: JsValue): Map[String, String] = sys.error("unimplemented") - override def write(obj: Map[String, String]): JsValue = { - val withValueObjects = obj.mapValues(value => JsObject("stringValue" -> JsObject("value" -> value.toJson))) - JsObject(withValueObjects) - } - } - - private implicit val statusFormat = new RootJsonFormat[Try[_]] { - override def read(json: JsValue): Try[_] = sys.error("unimplemented") - override def write(obj: Try[_]) = { - // error codes defined at https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto - val (code, message) = obj match { - case Success(_) => (0, "success") - case Failure(_) => (2, "failure") - } - JsObject( - "code" -> code.toJson, - "message" -> message.toJson, - "details" -> JsArray() - ) - } - } - - private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes) - private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString) - private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat8(Span) - private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans) - -} diff --git a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoReporter.scala deleted file mode 100644 index c1c81f4..0000000 --- a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala +++ /dev/null @@ -1,8 +0,0 @@ -package xyz.driver.core -package reporting - -trait NoReporter extends NoTraceReporter { - override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = () -} -object NoReporter extends NoReporter diff --git a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala deleted file mode 100644 index b49cfda..0000000 --- a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala +++ /dev/null @@ -1,20 +0,0 @@ -package xyz.driver.core -package reporting - -import scala.concurrent.Future - -/** A reporter mixin that does not emit traces. */ -trait NoTraceReporter extends Reporter { - - override def traceWithOptionalParent[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh()) - - override def traceWithOptionalParentAsync[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] = - op(SpanContext.fresh()) - -} diff --git a/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/src/main/scala/xyz/driver/core/reporting/Reporter.scala deleted file mode 100644 index 469084c..0000000 --- a/src/main/scala/xyz/driver/core/reporting/Reporter.scala +++ /dev/null @@ -1,183 +0,0 @@ -package xyz.driver.core -package reporting - -import scala.concurrent.Future - -/** Context-aware diagnostic utility for distributed systems, combining logging and tracing. - * - * Diagnostic messages (i.e. logs) are a vital tool for monitoring applications. Tying such messages to an - * execution context, such as a stack trace, simplifies debugging greatly by giving insight to the chains of events - * that led to a particular message. In synchronous systems, execution contexts can easily be determined by an - * external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on - * the JVM shows all relevant information). In asynchronous systems and especially distributed systems however, - * execution contexts are not easily determined by an external observer and hence need to be explicitly passed across - * service boundaries. - * - * This reporter provides tracing and logging utilities that explicitly require references to execution contexts - * (called [[SpanContext]]s here) intended to be passed across service boundaries. It embraces Scala's - * implicit-parameter-as-a-context paradigm. - * - * Tracing is intended to be compatible with the - * [[https://github.com/opentracing/specification/blob/master/specification.md OpenTrace specification]], and hence its - * guidelines on naming and tagging apply to methods provided by this Reporter as well. - * - * Usage example: - * {{{ - * val reporter: Reporter = ??? - * object Repo { - * def getUserQuery(userId: String)(implicit ctx: SpanContext) = reporter.trace("query"){ implicit ctx => - * reporter.debug("Running query") - * // run query - * } - * } - * object Service { - * def getUser(userId: String)(implicit ctx: SpanContext) = reporter.trace("get_user"){ implicit ctx => - * reporter.debug("Getting user") - * Repo.getUserQuery(userId) - * } - * } - * reporter.traceRoot("static_get", Map("user" -> "john")) { implicit ctx => - * Service.getUser("john") - * } - * }}} - * - * '''Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms - * of memory and processing). It should be used in interesting and actionable code paths.''' - * - * @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as - * implicit parameters. This method should only be used for top-level traces when no parent - * traces are available. - */ -trait Reporter { - import Reporter._ - - def traceWithOptionalParent[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => A): A - def traceWithOptionalParentAsync[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => Future[A]): Future[A] - - /** Trace the execution of an operation, if no parent trace is available. - * - * $rootWarning - */ - final def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A = - traceWithOptionalParent( - name, - tags, - None - )(op) - - /** Trace the execution of an asynchronous operation, if no parent trace is available. - * - * $rootWarning - * - * @see traceRoot - */ - final def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)( - op: SpanContext => Future[A]): Future[A] = - traceWithOptionalParentAsync( - name, - tags, - None - )(op) - - /** Trace the execution of an operation, in relation to a parent context. - * - * @param name The name of the operation. Note that this name should not be too specific. According to the - * OpenTrace RFC: "An operation name, a human-readable string which concisely represents the work done - * by the Span (for example, an RPC method name, a function name, or the name of a subtask or stage - * within a larger computation). The operation name should be the most general string that identifies a - * (statistically) interesting class of Span instances. That is, `"get_user"` is better than - * `"get_user/314159"`". - * @param tags Attributes associated with the traced event. Following the above example, if `"get_user"` is an - * operation name, a good tag would be `("account_id" -> 314159)`. - * @param relation Relation of the operation to its parent context. - * @param op The operation to be traced. The trace will complete once the operation returns. - * @param ctx Context of the parent trace. - * @tparam A Return type of the operation. - * @return The value of the child operation. - */ - final def trace[A]( - name: String, - tags: Map[String, String] = Map.empty, - relation: CausalRelation = CausalRelation.Child)(op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)( - implicit ctx: SpanContext): A = - traceWithOptionalParent( - name, - tags, - Some(ctx -> relation) - )(op) - - /** Trace the operation of an asynchronous operation. - * - * Contrary to the synchronous version of this method, a trace is completed once the child operation completes - * (rather than returns). - * - * @see trace - */ - final def traceAsync[A]( - name: String, - tags: Map[String, String] = Map.empty, - relation: CausalRelation = CausalRelation.Child)( - op: /* implicit (gotta wait for Scala 3) */ SpanContext => Future[A])(implicit ctx: SpanContext): Future[A] = - traceWithOptionalParentAsync( - name, - tags, - Some(ctx -> relation) - )(op) - - /** Log a message. */ - def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit - - /** Log a debug message. */ - final def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None) - final def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Debug, message, Some(reason)) - - /** Log an informational message. */ - final def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None) - final def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Informational, message, Some(reason)) - - /** Log a warning message. */ - final def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None) - final def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Warning, message, Some(reason)) - - /** Log an error message. */ - final def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None) - final def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Error, message, Some(reason)) - -} - -object Reporter { - - /** A relation in cause. - * - * Corresponds to - * [[https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans OpenTrace references between spans]] - */ - sealed trait CausalRelation - object CausalRelation { - - /** One event is the child of another. The parent completes once the child is complete. */ - case object Child extends CausalRelation - - /** One event follows from another, not necessarily being the parent. */ - case object Follows extends CausalRelation - } - - sealed trait Severity - object Severity { - case object Debug extends Severity - case object Informational extends Severity - case object Warning extends Severity - case object Error extends Severity - } - -} diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala deleted file mode 100644 index 0ff5574..0000000 --- a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala +++ /dev/null @@ -1,36 +0,0 @@ -package xyz.driver.core -package reporting - -import com.typesafe.scalalogging.{Logger => ScalaLogger} - -/** Compatibility mixin for reporters, that enables implicit conversions to scala-logging loggers. */ -trait ScalaLoggingCompat extends Reporter { - import Reporter.Severity - - def logger: ScalaLogger - - override def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit = - severity match { - case Severity.Debug => logger.debug(message, reason.orNull) - case Severity.Informational => logger.info(message, reason.orNull) - case Severity.Warning => logger.warn(message, reason.orNull) - case Severity.Error => logger.error(message, reason.orNull) - } - -} - -object ScalaLoggingCompat { - import scala.language.implicitConversions - - def defaultScalaLogger(json: Boolean = false): ScalaLogger = { - if (json) { - System.setProperty("logback.configurationFile", "deployed-logback.xml") - } else { - System.setProperty("logback.configurationFile", "logback.xml") - } - ScalaLogger.apply("application") - } - - implicit def toScalaLogger(logger: ScalaLoggingCompat): ScalaLogger = logger.logger - -} diff --git a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala deleted file mode 100644 index 04a822d..0000000 --- a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala +++ /dev/null @@ -1,13 +0,0 @@ -package xyz.driver.core -package reporting - -import scala.util.Random - -case class SpanContext private[core] (traceId: String, spanId: String) - -object SpanContext { - def fresh(): SpanContext = SpanContext( - f"${Random.nextLong()}%016x${Random.nextLong()}%016x", - f"${Random.nextLong()}%016x" - ) -} diff --git a/src/main/scala/xyz/driver/core/rest/DnsDiscovery.scala b/src/main/scala/xyz/driver/core/rest/DnsDiscovery.scala deleted file mode 100644 index 87946e4..0000000 --- a/src/main/scala/xyz/driver/core/rest/DnsDiscovery.scala +++ /dev/null @@ -1,11 +0,0 @@ -package xyz.driver.core -package rest - -class DnsDiscovery(transport: HttpRestServiceTransport, overrides: Map[String, String]) { - - def discover[A](implicit descriptor: ServiceDescriptor[A]): A = { - val url = overrides.getOrElse(descriptor.name, s"https://${descriptor.name}") - descriptor.connect(transport, url) - } - -} diff --git a/src/main/scala/xyz/driver/core/rest/DriverRoute.scala b/src/main/scala/xyz/driver/core/rest/DriverRoute.scala deleted file mode 100644 index 911e306..0000000 --- a/src/main/scala/xyz/driver/core/rest/DriverRoute.scala +++ /dev/null @@ -1,122 +0,0 @@ -package xyz.driver.core.rest - -import java.sql.SQLException - -import akka.http.scaladsl.model.headers.CacheDirectives.`no-cache` -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.model.{StatusCodes, _} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import com.typesafe.scalalogging.Logger -import org.slf4j.MDC -import xyz.driver.core.rest -import xyz.driver.core.rest.errors._ - -import scala.compat.Platform.ConcurrentModificationException - -trait DriverRoute { - def log: Logger - - def route: Route - - def routeWithDefaults: Route = { - (defaultResponseHeaders & handleExceptions(ExceptionHandler(exceptionHandler))) { - route - } - } - - protected def defaultResponseHeaders: Directive0 = { - extractRequest flatMap { request => - // Needs to happen before any request processing, so all the log messages - // associated with processing of this request are having this `trackingId` - val trackingId = rest.extractTrackingId(request) - val tracingHeader = RawHeader(ContextHeaders.TrackingIdHeader, trackingId) - MDC.put("trackingId", trackingId) - - respondWithHeaders(tracingHeader +: DriverRoute.DefaultHeaders: _*) - } - } - - /** - * Override me for custom exception handling - * - * @return Exception handling route for exception type - */ - protected def exceptionHandler: PartialFunction[Throwable, Route] = { - case serviceException: ServiceException => - serviceExceptionHandler(serviceException) - - case is: IllegalStateException => - ctx => - log.warn(s"Request is not allowed to ${ctx.request.method} ${ctx.request.uri}", is) - errorResponse(StatusCodes.BadRequest, message = is.getMessage, is)(ctx) - - case cm: ConcurrentModificationException => - ctx => - log.warn(s"Concurrent modification of the resource ${ctx.request.method} ${ctx.request.uri}", cm) - errorResponse(StatusCodes.Conflict, "Resource was changed concurrently, try requesting a newer version", cm)( - ctx) - - case se: SQLException => - ctx => - log.warn(s"Database exception for the resource ${ctx.request.method} ${ctx.request.uri}", se) - errorResponse(StatusCodes.InternalServerError, "Data access error", se)(ctx) - - case t: Exception => - ctx => - log.warn(s"Request to ${ctx.request.method} ${ctx.request.uri} could not be handled normally", t) - errorResponse(StatusCodes.InternalServerError, t.getMessage, t)(ctx) - } - - protected def serviceExceptionHandler(serviceException: ServiceException): Route = { - val statusCode = serviceException match { - case e: InvalidInputException => - log.info("Invalid client input error", e) - StatusCodes.BadRequest - case e: InvalidActionException => - log.info("Invalid client action error", e) - StatusCodes.Forbidden - case e: UnauthorizedException => - log.info("Unauthorized user error", e) - StatusCodes.Unauthorized - case e: ResourceNotFoundException => - log.info("Resource not found error", e) - StatusCodes.NotFound - case e: ExternalServiceException => - log.error("Error while calling another service", e) - StatusCodes.InternalServerError - case e: ExternalServiceTimeoutException => - log.error("Service timeout error", e) - StatusCodes.GatewayTimeout - case e: DatabaseException => - log.error("Database error", e) - StatusCodes.InternalServerError - } - - { (ctx: RequestContext) => - import xyz.driver.core.json.serviceExceptionFormat - val entity = - HttpEntity(ContentTypes.`application/json`, serviceExceptionFormat.write(serviceException).toString()) - errorResponse(statusCode, entity, serviceException)(ctx) - } - } - - protected def errorResponse[T <: Exception](statusCode: StatusCode, message: String, exception: T): Route = - errorResponse(statusCode, HttpEntity(message), exception) - - protected def errorResponse[T <: Exception](statusCode: StatusCode, entity: ResponseEntity, exception: T): Route = { - complete(HttpResponse(statusCode, entity = entity)) - } - -} - -object DriverRoute { - val DefaultHeaders: List[HttpHeader] = List( - // This header will eliminate the risk of envoy trying to reuse a connection - // that already timed out on the server side by completely rejecting keep-alive - Connection("close"), - // These 2 headers are the simplest way to prevent IE from caching GET requests - RawHeader("Pragma", "no-cache"), - `Cache-Control`(List(`no-cache`(Nil))) - ) -} diff --git a/src/main/scala/xyz/driver/core/rest/HttpRestServiceTransport.scala b/src/main/scala/xyz/driver/core/rest/HttpRestServiceTransport.scala deleted file mode 100644 index e31635b..0000000 --- a/src/main/scala/xyz/driver/core/rest/HttpRestServiceTransport.scala +++ /dev/null @@ -1,103 +0,0 @@ -package xyz.driver.core.rest - -import akka.actor.ActorSystem -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.Materializer -import akka.stream.scaladsl.TcpIdleTimeoutException -import org.slf4j.MDC -import xyz.driver.core.Name -import xyz.driver.core.reporting.Reporter -import xyz.driver.core.rest.errors.{ExternalServiceException, ExternalServiceTimeoutException} - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -class HttpRestServiceTransport( - applicationName: Name[App], - applicationVersion: String, - val actorSystem: ActorSystem, - val executionContext: ExecutionContext, - reporter: Reporter) - extends ServiceTransport { - - protected implicit val execution: ExecutionContext = executionContext - - protected val httpClient: HttpClient = new SingleRequestHttpClient(applicationName, applicationVersion, actorSystem) - - def sendRequestGetResponse(context: ServiceRequestContext)(requestStub: HttpRequest): Future[HttpResponse] = { - val tags = Map( - // open tracing semantic tags - "span.kind" -> "client", - "service" -> applicationName.value, - "http.url" -> requestStub.uri.toString, - "http.method" -> requestStub.method.value, - "peer.hostname" -> requestStub.uri.authority.host.toString, - // google's tracing console provides extra search features if we define these tags - "/http/path" -> requestStub.uri.path.toString, - "/http/method" -> requestStub.method.value.toString, - "/http/url" -> requestStub.uri.toString - ) - reporter.traceAsync(s"http_call_rpc", tags) { implicit span => - val requestTime = System.currentTimeMillis() - - val request = requestStub - .withHeaders(context.contextHeaders.toSeq.map { - case (ContextHeaders.TrackingIdHeader, _) => - RawHeader(ContextHeaders.TrackingIdHeader, context.trackingId) - case (ContextHeaders.StacktraceHeader, _) => - RawHeader( - ContextHeaders.StacktraceHeader, - Option(MDC.get("stack")) - .orElse(context.contextHeaders.get(ContextHeaders.StacktraceHeader)) - .getOrElse("")) - case (header, headerValue) => RawHeader(header, headerValue) - }: _*) - - reporter.debug(s"Sending request to ${request.method} ${request.uri}") - - val response = httpClient.makeRequest(request) - - response.onComplete { - case Success(r) => - val responseLatency = System.currentTimeMillis() - requestTime - reporter.debug( - s"Response from ${request.uri} to request $requestStub is successful in $responseLatency ms: $r") - - case Failure(t: Throwable) => - val responseLatency = System.currentTimeMillis() - requestTime - reporter.warn( - s"Failed to receive response from ${request.method.value} ${request.uri} in $responseLatency ms", - t) - }(executionContext) - - response.recoverWith { - case _: TcpIdleTimeoutException => - val serviceCalled = s"${requestStub.method.value} ${requestStub.uri}" - Future.failed(ExternalServiceTimeoutException(serviceCalled)) - case t: Throwable => Future.failed(t) - } - }(context.spanContext) - } - - def sendRequest(context: ServiceRequestContext)(requestStub: HttpRequest)( - implicit mat: Materializer): Future[Unmarshal[ResponseEntity]] = { - - sendRequestGetResponse(context)(requestStub) flatMap { response => - if (response.status == StatusCodes.NotFound) { - Future.successful(Unmarshal(HttpEntity.Empty: ResponseEntity)) - } else if (response.status.isFailure()) { - val serviceCalled = s"${requestStub.method} ${requestStub.uri}" - Unmarshal(response.entity).to[String] flatMap { errorString => - import spray.json._ - import xyz.driver.core.json._ - val serviceException = util.Try(serviceExceptionFormat.read(errorString.parseJson)).toOption - Future.failed(ExternalServiceException(serviceCalled, errorString, serviceException)) - } - } else { - Future.successful(Unmarshal(response.entity)) - } - } - } -} diff --git a/src/main/scala/xyz/driver/core/rest/PatchDirectives.scala b/src/main/scala/xyz/driver/core/rest/PatchDirectives.scala deleted file mode 100644 index f33bf9d..0000000 --- a/src/main/scala/xyz/driver/core/rest/PatchDirectives.scala +++ /dev/null @@ -1,104 +0,0 @@ -package xyz.driver.core.rest - -import akka.http.javadsl.server.Rejections -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport -import akka.http.scaladsl.model.{ContentTypeRange, HttpCharsets, MediaType} -import akka.http.scaladsl.server._ -import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} -import spray.json._ - -import scala.concurrent.Future -import scala.util.{Failure, Success, Try} - -trait PatchDirectives extends Directives with SprayJsonSupport { - - /** Media type for patches to JSON values, as specified in [[https://tools.ietf.org/html/rfc7396 RFC 7396]]. */ - val `application/merge-patch+json`: MediaType.WithFixedCharset = - MediaType.applicationWithFixedCharset("merge-patch+json", HttpCharsets.`UTF-8`) - - /** Wraps a JSON value that represents a patch. - * The patch must given in the format specified in [[https://tools.ietf.org/html/rfc7396 RFC 7396]]. */ - case class PatchValue(value: JsValue) { - - /** Applies this patch to a given original JSON value. In other words, merges the original with this "diff". */ - def applyTo(original: JsValue): JsValue = mergeJsValues(original, value) - } - - /** Witness that the given patch may be applied to an original domain value. - * @tparam A type of the domain value - * @param patch the patch that may be applied to a domain value - * @param format a JSON format that enables serialization and deserialization of a domain value */ - case class Patchable[A](patch: PatchValue, format: RootJsonFormat[A]) { - - /** Applies the patch to a given domain object. The result will be a combination - * of the original value, updates with the fields specified in this witness' patch. */ - def applyTo(original: A): A = { - val serialized = format.write(original) - val merged = patch.applyTo(serialized) - val deserialized = format.read(merged) - deserialized - } - } - - implicit def patchValueUnmarshaller: FromEntityUnmarshaller[PatchValue] = - Unmarshaller.byteStringUnmarshaller - .andThen(sprayJsValueByteStringUnmarshaller) - .forContentTypes(ContentTypeRange(`application/merge-patch+json`)) - .map(js => PatchValue(js)) - - implicit def patchableUnmarshaller[A]( - implicit patchUnmarshaller: FromEntityUnmarshaller[PatchValue], - format: RootJsonFormat[A]): FromEntityUnmarshaller[Patchable[A]] = { - patchUnmarshaller.map(patch => Patchable[A](patch, format)) - } - - protected def mergeObjects(oldObj: JsObject, newObj: JsObject, maxLevels: Option[Int] = None): JsObject = { - JsObject((oldObj.fields.keys ++ newObj.fields.keys).map({ key => - val oldValue = oldObj.fields.getOrElse(key, JsNull) - val newValue = newObj.fields.get(key).fold(oldValue)(mergeJsValues(oldValue, _, maxLevels.map(_ - 1))) - key -> newValue - })(collection.breakOut): _*) - } - - protected def mergeJsValues(oldValue: JsValue, newValue: JsValue, maxLevels: Option[Int] = None): JsValue = { - def mergeError(typ: String): Nothing = - deserializationError(s"Expected $typ value, got $newValue") - - if (maxLevels.exists(_ < 0)) oldValue - else { - (oldValue, newValue) match { - case (_: JsString, newString @ (JsString(_) | JsNull)) => newString - case (_: JsString, _) => mergeError("string") - case (_: JsNumber, newNumber @ (JsNumber(_) | JsNull)) => newNumber - case (_: JsNumber, _) => mergeError("number") - case (_: JsBoolean, newBool @ (JsBoolean(_) | JsNull)) => newBool - case (_: JsBoolean, _) => mergeError("boolean") - case (_: JsArray, newArr @ (JsArray(_) | JsNull)) => newArr - case (_: JsArray, _) => mergeError("array") - case (oldObj: JsObject, newObj: JsObject) => mergeObjects(oldObj, newObj) - case (_: JsObject, JsNull) => JsNull - case (_: JsObject, _) => mergeError("object") - case (JsNull, _) => newValue - } - } - } - - def mergePatch[T](patchable: Patchable[T], retrieve: => Future[Option[T]]): Directive1[T] = - Directive { inner => requestCtx => - onSuccess(retrieve)({ - case Some(oldT) => - Try(patchable.applyTo(oldT)) - .transform[Route]( - mergedT => scala.util.Success(inner(Tuple1(mergedT))), { - case jsonException: DeserializationException => - Success(reject(Rejections.malformedRequestContent(jsonException.getMessage, jsonException))) - case t => Failure(t) - } - ) - .get // intentionally re-throw all other errors - case None => reject() - })(requestCtx) - } -} - -object PatchDirectives extends PatchDirectives diff --git a/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala b/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala deleted file mode 100644 index 2854257..0000000 --- a/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala +++ /dev/null @@ -1,67 +0,0 @@ -package xyz.driver.core.rest - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.headers.`User-Agent` -import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} -import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult, ThrottleMode} -import xyz.driver.core.Name - -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -class PooledHttpClient( - baseUri: Uri, - applicationName: Name[App], - applicationVersion: String, - requestRateLimit: Int = 64, - requestQueueSize: Int = 1024)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) - extends HttpClient { - - private val host = baseUri.authority.host.toString() - private val port = baseUri.effectivePort - private val scheme = baseUri.scheme - - protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) - - private val clientConnectionSettings: ClientConnectionSettings = - ClientConnectionSettings(actorSystem).withUserAgentHeader( - Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) - - private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) - .withConnectionSettings(clientConnectionSettings) - - private val pool = if (scheme.equalsIgnoreCase("https")) { - Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings) - } else { - Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings) - } - - private val queue = Source - .queue[(HttpRequest, Promise[HttpResponse])](requestQueueSize, OverflowStrategy.dropNew) - .via(pool) - .throttle(requestRateLimit, 1.second, maximumBurst = requestRateLimit, ThrottleMode.shaping) - .toMat(Sink.foreach({ - case ((Success(resp), p)) => p.success(resp) - case ((Failure(e), p)) => p.failure(e) - }))(Keep.left) - .run - - def makeRequest(request: HttpRequest): Future[HttpResponse] = { - val responsePromise = Promise[HttpResponse]() - - queue.offer(request -> responsePromise).flatMap { - case QueueOfferResult.Enqueued => - responsePromise.future - case QueueOfferResult.Dropped => - Future.failed(new Exception(s"Request queue to the host $host is overflown")) - case QueueOfferResult.Failure(ex) => - Future.failed(ex) - case QueueOfferResult.QueueClosed => - Future.failed(new Exception("Queue was closed (pool shut down) while running the request")) - } - } -} diff --git a/src/main/scala/xyz/driver/core/rest/ProxyRoute.scala b/src/main/scala/xyz/driver/core/rest/ProxyRoute.scala deleted file mode 100644 index c0e9f99..0000000 --- a/src/main/scala/xyz/driver/core/rest/ProxyRoute.scala +++ /dev/null @@ -1,26 +0,0 @@ -package xyz.driver.core.rest - -import akka.http.scaladsl.server.{RequestContext, Route, RouteResult} -import com.typesafe.config.Config -import xyz.driver.core.Name - -import scala.concurrent.ExecutionContext - -trait ProxyRoute extends DriverRoute { - implicit val executionContext: ExecutionContext - val config: Config - val httpClient: HttpClient - - protected def proxyToService(serviceName: Name[Service]): Route = { ctx: RequestContext => - val httpScheme = config.getString(s"services.${serviceName.value}.httpScheme") - val baseUrl = config.getString(s"services.${serviceName.value}.baseUrl") - - val originalUri = ctx.request.uri - val originalRequest = ctx.request - - val newUri = originalUri.withScheme(httpScheme).withHost(baseUrl) - val newRequest = originalRequest.withUri(newUri) - - httpClient.makeRequest(newRequest).map(RouteResult.Complete) - } -} diff --git a/src/main/scala/xyz/driver/core/rest/RestService.scala b/src/main/scala/xyz/driver/core/rest/RestService.scala deleted file mode 100644 index 91b3550..0000000 --- a/src/main/scala/xyz/driver/core/rest/RestService.scala +++ /dev/null @@ -1,88 +0,0 @@ -package xyz.driver.core.rest - -import akka.http.scaladsl.model._ -import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} -import akka.stream.Materializer - -import scala.concurrent.{ExecutionContext, Future} -import scalaz.{ListT, OptionT} -import scalaz.syntax.equal._ -import scalaz.Scalaz.stringInstance - -trait RestService extends Service { - - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ - import spray.json._ - - protected implicit val exec: ExecutionContext - protected implicit val materializer: Materializer - - implicit class ResponseEntityFoldable(entity: Unmarshal[ResponseEntity]) { - def fold[T](default: => T)(implicit um: Unmarshaller[ResponseEntity, T]): Future[T] = - if (entity.value.isKnownEmpty()) Future.successful[T](default) else entity.to[T] - } - - protected def unitResponse(request: Future[Unmarshal[ResponseEntity]]): OptionT[Future, Unit] = - OptionT[Future, Unit](request.flatMap(_.to[String]).map(_ => Option(()))) - - protected def optionalResponse[T](request: Future[Unmarshal[ResponseEntity]])( - implicit um: Unmarshaller[ResponseEntity, Option[T]]): OptionT[Future, T] = - OptionT[Future, T](request.flatMap(_.fold(Option.empty[T]))) - - protected def listResponse[T](request: Future[Unmarshal[ResponseEntity]])( - implicit um: Unmarshaller[ResponseEntity, List[T]]): ListT[Future, T] = - ListT[Future, T](request.flatMap(_.fold(List.empty[T]))) - - protected def jsonEntity(json: JsValue): RequestEntity = - HttpEntity(ContentTypes.`application/json`, json.compactPrint) - - protected def mergePatchJsonEntity(json: JsValue): RequestEntity = - HttpEntity(PatchDirectives.`application/merge-patch+json`, json.compactPrint) - - protected def get(baseUri: Uri, path: String, query: Seq[(String, String)] = Seq.empty) = - HttpRequest(HttpMethods.GET, endpointUri(baseUri, path, query)) - - protected def post(baseUri: Uri, path: String, httpEntity: RequestEntity) = - HttpRequest(HttpMethods.POST, endpointUri(baseUri, path), entity = httpEntity) - - protected def postJson(baseUri: Uri, path: String, json: JsValue) = - HttpRequest(HttpMethods.POST, endpointUri(baseUri, path), entity = jsonEntity(json)) - - protected def put(baseUri: Uri, path: String, httpEntity: RequestEntity) = - HttpRequest(HttpMethods.PUT, endpointUri(baseUri, path), entity = httpEntity) - - protected def putJson(baseUri: Uri, path: String, json: JsValue) = - HttpRequest(HttpMethods.PUT, endpointUri(baseUri, path), entity = jsonEntity(json)) - - protected def patch(baseUri: Uri, path: String, httpEntity: RequestEntity) = - HttpRequest(HttpMethods.PATCH, endpointUri(baseUri, path), entity = httpEntity) - - protected def patchJson(baseUri: Uri, path: String, json: JsValue) = - HttpRequest(HttpMethods.PATCH, endpointUri(baseUri, path), entity = jsonEntity(json)) - - protected def mergePatchJson(baseUri: Uri, path: String, json: JsValue) = - HttpRequest(HttpMethods.PATCH, endpointUri(baseUri, path), entity = mergePatchJsonEntity(json)) - - protected def delete(baseUri: Uri, path: String, query: Seq[(String, String)] = Seq.empty) = - HttpRequest(HttpMethods.DELETE, endpointUri(baseUri, path, query)) - - protected def endpointUri(baseUri: Uri, path: String): Uri = - baseUri.withPath(Uri.Path(path)) - - protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]): Uri = - baseUri.withPath(Uri.Path(path)).withQuery(Uri.Query(query: _*)) - - protected def responseToListResponse[T: JsonFormat](pagination: Option[Pagination])( - response: HttpResponse): Future[ListResponse[T]] = { - import DefaultJsonProtocol._ - val resourceCount = response.headers - .find(_.name() === ContextHeaders.ResourceCount) - .map(_.value().toInt) - .getOrElse(0) - val meta = ListResponse.Meta(resourceCount, pagination.getOrElse(Pagination(resourceCount, 1))) - Unmarshal(response.entity).to[List[T]].map(ListResponse(_, meta)) - } - - protected def responseToListResponse[T: JsonFormat](pagination: Pagination)( - response: HttpResponse): Future[ListResponse[T]] = responseToListResponse(Some(pagination))(response) -} diff --git a/src/main/scala/xyz/driver/core/rest/ServiceDescriptor.scala b/src/main/scala/xyz/driver/core/rest/ServiceDescriptor.scala deleted file mode 100644 index 646fae8..0000000 --- a/src/main/scala/xyz/driver/core/rest/ServiceDescriptor.scala +++ /dev/null @@ -1,16 +0,0 @@ -package xyz.driver.core -package rest -import scala.annotation.implicitNotFound - -@implicitNotFound( - "Don't know how to communicate with service ${S}. Make sure an implicit ServiceDescriptor is" + - "available. A good place to put one is in the service's companion object.") -trait ServiceDescriptor[S] { - - /** The service's name. Must be unique among all services. */ - def name: String - - /** Get an instance of the service. */ - def connect(transport: HttpRestServiceTransport, url: String): S - -} diff --git a/src/main/scala/xyz/driver/core/rest/SingleRequestHttpClient.scala b/src/main/scala/xyz/driver/core/rest/SingleRequestHttpClient.scala deleted file mode 100644 index 964a5a2..0000000 --- a/src/main/scala/xyz/driver/core/rest/SingleRequestHttpClient.scala +++ /dev/null @@ -1,29 +0,0 @@ -package xyz.driver.core.rest - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.headers.`User-Agent` -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} -import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} -import akka.stream.ActorMaterializer -import xyz.driver.core.Name - -import scala.concurrent.Future - -class SingleRequestHttpClient(applicationName: Name[App], applicationVersion: String, actorSystem: ActorSystem) - extends HttpClient { - - protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) - private val client = Http()(actorSystem) - - private val clientConnectionSettings: ClientConnectionSettings = - ClientConnectionSettings(actorSystem).withUserAgentHeader( - Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) - - private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) - .withConnectionSettings(clientConnectionSettings) - - def makeRequest(request: HttpRequest): Future[HttpResponse] = { - client.singleRequest(request, settings = connectionPoolSettings) - } -} diff --git a/src/main/scala/xyz/driver/core/rest/Swagger.scala b/src/main/scala/xyz/driver/core/rest/Swagger.scala deleted file mode 100644 index 5ceac54..0000000 --- a/src/main/scala/xyz/driver/core/rest/Swagger.scala +++ /dev/null @@ -1,144 +0,0 @@ -package xyz.driver.core.rest - -import akka.http.scaladsl.model.{ContentType, ContentTypes, HttpEntity} -import akka.http.scaladsl.server.Route -import akka.http.scaladsl.server.directives.FileAndResourceDirectives.ResourceFile -import akka.stream.ActorAttributes -import akka.stream.scaladsl.{Framing, StreamConverters} -import akka.util.ByteString -import com.github.swagger.akka.SwaggerHttpService -import com.github.swagger.akka.model._ -import com.typesafe.config.Config -import com.typesafe.scalalogging.Logger -import io.swagger.models.Scheme -import io.swagger.models.auth.{ApiKeyAuthDefinition, In} -import io.swagger.util.Json - -import scala.util.control.NonFatal - -class Swagger( - override val host: String, - accessSchemes: List[String], - version: String, - override val apiClasses: Set[Class[_]], - val config: Config, - val logger: Logger) - extends SwaggerHttpService { - - override val schemes = accessSchemes.map { s => - Scheme.forValue(s) - } - - // Note that the reason for overriding this is a subtle chain of causality: - // - // 1. Some of our endpoints require a single trailing slash and will not - // function if it is omitted - // 2. Swagger omits trailing slashes in its generated api doc - // 3. To work around that, a space is added after the trailing slash in the - // swagger Path annotations - // 4. This space is removed manually in the code below - // - // TODO: Ideally we'd like to drop this custom override and fix the issue in - // 1, by dropping the slash requirement and accepting api endpoints with and - // without trailing slashes. This will require inspecting and potentially - // fixing all service endpoints. - override def generateSwaggerJson: String = { - import io.swagger.models.{Swagger => JSwagger} - - import scala.collection.JavaConverters._ - try { - val swagger: JSwagger = reader.read(apiClasses.asJava) - - val paths = if (swagger.getPaths == null) { - Map.empty - } else { - swagger.getPaths.asScala - } - - // Removing trailing spaces - val fixedPaths = paths.map { - case (key, path) => - key.trim -> path - } - - swagger.setPaths(fixedPaths.asJava) - - Json.pretty().writeValueAsString(swagger) - } catch { - case NonFatal(t) => - logger.error("Issue with creating swagger.json", t) - throw t - } - } - - override val securitySchemeDefinitions = Map( - "token" -> { - val definition = new ApiKeyAuthDefinition("Authorization", In.HEADER) - definition.setDescription("Authentication token") - definition - } - ) - - override val basePath: String = config.getString("swagger.basePath") - override val apiDocsPath: String = config.getString("swagger.docsPath") - - override val info = Info( - config.getString("swagger.apiInfo.description"), - version, - config.getString("swagger.apiInfo.title"), - config.getString("swagger.apiInfo.termsOfServiceUrl"), - contact = Some( - Contact( - config.getString("swagger.apiInfo.contact.name"), - config.getString("swagger.apiInfo.contact.url"), - config.getString("swagger.apiInfo.contact.email") - )), - license = Some( - License( - config.getString("swagger.apiInfo.license"), - config.getString("swagger.apiInfo.licenseUrl") - )), - vendorExtensions = Map.empty[String, AnyRef] - ) - - /** A very simple templating extractor. Gets a resource from the classpath and subsitutes any `{{key}}` with a value. */ - private def getTemplatedResource( - resourceName: String, - contentType: ContentType, - substitution: (String, String)): Route = get { - Option(this.getClass.getClassLoader.getResource(resourceName)) flatMap ResourceFile.apply match { - case Some(ResourceFile(url, length @ _, _)) => - extractSettings { settings => - val stream = StreamConverters - .fromInputStream(() => url.openStream()) - .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher)) - .via(Framing.delimiter(ByteString("\n"), 4096, true).map(_.utf8String)) - .map { line => - line.replaceAll(s"\\{\\{${substitution._1}\\}\\}", substitution._2) - } - .map(line => ByteString(line + "\n")) - complete( - HttpEntity(contentType, stream) - ) - } - case None => reject - } - } - - def swaggerUI: Route = - pathEndOrSingleSlash { - getTemplatedResource( - "swagger-ui/index.html", - ContentTypes.`text/html(UTF-8)`, - "title" -> config.getString("swagger.apiInfo.title")) - } ~ getFromResourceDirectory("swagger-ui") - - def swaggerUINew: Route = - pathEndOrSingleSlash { - getTemplatedResource( - "swagger-ui-dist/index.html", - ContentTypes.`text/html(UTF-8)`, - "title" -> config.getString("swagger.apiInfo.title")) - } ~ getFromResourceDirectory("swagger-ui-dist") - -} diff --git a/src/main/scala/xyz/driver/core/rest/auth/AlwaysAllowAuthorization.scala b/src/main/scala/xyz/driver/core/rest/auth/AlwaysAllowAuthorization.scala deleted file mode 100644 index 5007774..0000000 --- a/src/main/scala/xyz/driver/core/rest/auth/AlwaysAllowAuthorization.scala +++ /dev/null @@ -1,14 +0,0 @@ -package xyz.driver.core.rest.auth - -import xyz.driver.core.auth.{Permission, User} -import xyz.driver.core.rest.ServiceRequestContext - -import scala.concurrent.Future - -class AlwaysAllowAuthorization[U <: User] extends Authorization[U] { - override def userHasPermissions(user: U, permissions: Seq[Permission])( - implicit ctx: ServiceRequestContext): Future[AuthorizationResult] = { - val permissionsMap = permissions.map(_ -> true).toMap - Future.successful(AuthorizationResult(authorized = permissionsMap, ctx.permissionsToken)) - } -} diff --git a/src/main/scala/xyz/driver/core/rest/auth/AuthProvider.scala b/src/main/scala/xyz/driver/core/rest/auth/AuthProvider.scala deleted file mode 100644 index e1a94e1..0000000 --- a/src/main/scala/xyz/driver/core/rest/auth/AuthProvider.scala +++ /dev/null @@ -1,75 +0,0 @@ -package xyz.driver.core.rest.auth - -import akka.http.scaladsl.server.directives.Credentials -import com.typesafe.scalalogging.Logger -import scalaz.OptionT -import xyz.driver.core.auth.{AuthToken, Permission, User} -import xyz.driver.core.rest.errors.{ExternalServiceException, UnauthorizedException} -import xyz.driver.core.rest.{AuthorizedServiceRequestContext, ContextHeaders, ServiceRequestContext, serviceContext} - -import scala.concurrent.{ExecutionContext, Future} - -abstract class AuthProvider[U <: User]( - val authorization: Authorization[U], - log: Logger, - val realm: String -)(implicit execution: ExecutionContext) { - - import akka.http.scaladsl.server._ - import Directives.{authorize => akkaAuthorize, _} - - def this(authorization: Authorization[U], log: Logger)(implicit executionContext: ExecutionContext) = - this(authorization, log, "driver.xyz") - - /** - * Specific implementation on how to extract user from request context, - * can either need to do a network call to auth server or extract everything from self-contained token - * - * @param ctx set of request values which can be relevant to authenticate user - * @return authenticated user - */ - def authenticatedUser(implicit ctx: ServiceRequestContext): OptionT[Future, U] - - protected def authenticator(context: ServiceRequestContext): AsyncAuthenticator[U] = { - case Credentials.Missing => - log.info(s"Request (${context.trackingId}) missing authentication credentials") - Future.successful(None) - case Credentials.Provided(authToken) => - authenticatedUser(context.withAuthToken(AuthToken(authToken))).run.recover({ - case ExternalServiceException(_, _, Some(UnauthorizedException(_))) => None - }) - } - - /** - * Verifies that a user agent is properly authenticated, and (optionally) authorized with the specified permissions - */ - def authorize( - context: ServiceRequestContext, - permissions: Permission*): Directive1[AuthorizedServiceRequestContext[U]] = { - authenticateOAuth2Async[U](realm, authenticator(context)) flatMap { authenticatedUser => - val authCtx = context.withAuthenticatedUser(context.authToken.get, authenticatedUser) - onSuccess(authorization.userHasPermissions(authenticatedUser, permissions)(authCtx)) flatMap { - case AuthorizationResult(authorized, token) => - val allAuthorized = permissions.forall(authorized.getOrElse(_, false)) - akkaAuthorize(allAuthorized) tflatMap { _ => - val cachedPermissionsCtx = token.fold(authCtx)(authCtx.withPermissionsToken) - provide(cachedPermissionsCtx) - } - } - } - } - - /** - * Verifies if request is authenticated and authorized to have `permissions` - */ - def authorize(permissions: Permission*): Directive1[AuthorizedServiceRequestContext[U]] = { - serviceContext flatMap (authorize(_, permissions: _*)) - } -} - -object AuthProvider { - val AuthenticationTokenHeader: String = ContextHeaders.AuthenticationTokenHeader - val PermissionsTokenHeader: String = ContextHeaders.PermissionsTokenHeader - val SetAuthenticationTokenHeader: String = "set-authorization" - val SetPermissionsTokenHeader: String = "set-permissions" -} diff --git a/src/main/scala/xyz/driver/core/rest/auth/Authorization.scala b/src/main/scala/xyz/driver/core/rest/auth/Authorization.scala deleted file mode 100644 index 1a5e9be..0000000 --- a/src/main/scala/xyz/driver/core/rest/auth/Authorization.scala +++ /dev/null @@ -1,11 +0,0 @@ -package xyz.driver.core.rest.auth - -import xyz.driver.core.auth.{Permission, User} -import xyz.driver.core.rest.ServiceRequestContext - -import scala.concurrent.Future - -trait Authorization[U <: User] { - def userHasPermissions(user: U, permissions: Seq[Permission])( - implicit ctx: ServiceRequestContext): Future[AuthorizationResult] -} diff --git a/src/main/scala/xyz/driver/core/rest/auth/AuthorizationResult.scala b/src/main/scala/xyz/driver/core/rest/auth/AuthorizationResult.scala deleted file mode 100644 index efe28c9..0000000 --- a/src/main/scala/xyz/driver/core/rest/auth/AuthorizationResult.scala +++ /dev/null @@ -1,22 +0,0 @@ -package xyz.driver.core.rest.auth - -import xyz.driver.core.auth.{Permission, PermissionsToken} - -import scalaz.Scalaz.mapMonoid -import scalaz.Semigroup -import scalaz.syntax.semigroup._ - -final case class AuthorizationResult(authorized: Map[Permission, Boolean], token: Option[PermissionsToken]) -object AuthorizationResult { - val unauthorized: AuthorizationResult = AuthorizationResult(authorized = Map.empty, None) - - implicit val authorizationSemigroup: Semigroup[AuthorizationResult] = new Semigroup[AuthorizationResult] { - private implicit val authorizedBooleanSemigroup = Semigroup.instance[Boolean](_ || _) - private implicit val permissionsTokenSemigroup = - Semigroup.instance[Option[PermissionsToken]]((a, b) => b.orElse(a)) - - override def append(a: AuthorizationResult, b: => AuthorizationResult): AuthorizationResult = { - AuthorizationResult(a.authorized |+| b.authorized, a.token |+| b.token) - } - } -} diff --git a/src/main/scala/xyz/driver/core/rest/auth/CachedTokenAuthorization.scala b/src/main/scala/xyz/driver/core/rest/auth/CachedTokenAuthorization.scala deleted file mode 100644 index 66de4ef..0000000 --- a/src/main/scala/xyz/driver/core/rest/auth/CachedTokenAuthorization.scala +++ /dev/null @@ -1,55 +0,0 @@ -package xyz.driver.core.rest.auth - -import java.nio.file.{Files, Path} -import java.security.{KeyFactory, PublicKey} -import java.security.spec.X509EncodedKeySpec - -import pdi.jwt.{Jwt, JwtAlgorithm} -import xyz.driver.core.auth.{Permission, User} -import xyz.driver.core.rest.ServiceRequestContext - -import scala.concurrent.Future -import scalaz.syntax.std.boolean._ - -class CachedTokenAuthorization[U <: User](publicKey: => PublicKey, issuer: String) extends Authorization[U] { - override def userHasPermissions(user: U, permissions: Seq[Permission])( - implicit ctx: ServiceRequestContext): Future[AuthorizationResult] = { - import spray.json._ - - def extractPermissionsFromTokenJSON(tokenObject: JsObject): Option[Map[String, Boolean]] = - tokenObject.fields.get("permissions").collect { - case JsObject(fields) => - fields.collect { - case (key, JsBoolean(value)) => key -> value - } - } - - val result = for { - token <- ctx.permissionsToken - jwt <- Jwt.decode(token.value, publicKey, Seq(JwtAlgorithm.RS256)).toOption - jwtJson = jwt.parseJson.asJsObject - - // Ensure jwt is for the currently authenticated user and the correct issuer, otherwise return None - _ <- jwtJson.fields.get("sub").contains(JsString(user.id.value)).option(()) - _ <- jwtJson.fields.get("iss").contains(JsString(issuer)).option(()) - - permissionsMap <- extractPermissionsFromTokenJSON(jwtJson) - - authorized = permissions.map(p => p -> permissionsMap.getOrElse(p.toString, false)).toMap - } yield AuthorizationResult(authorized, Some(token)) - - Future.successful(result.getOrElse(AuthorizationResult.unauthorized)) - } -} - -object CachedTokenAuthorization { - def apply[U <: User](publicKeyFile: Path, issuer: String): CachedTokenAuthorization[U] = { - lazy val publicKey: PublicKey = { - val publicKeyBase64Encoded = new String(Files.readAllBytes(publicKeyFile)).trim - val publicKeyBase64Decoded = java.util.Base64.getDecoder.decode(publicKeyBase64Encoded) - val spec = new X509EncodedKeySpec(publicKeyBase64Decoded) - KeyFactory.getInstance("RSA").generatePublic(spec) - } - new CachedTokenAuthorization[U](publicKey, issuer) - } -} diff --git a/src/main/scala/xyz/driver/core/rest/auth/ChainedAuthorization.scala b/src/main/scala/xyz/driver/core/rest/auth/ChainedAuthorization.scala deleted file mode 100644 index 131e7fc..0000000 --- a/src/main/scala/xyz/driver/core/rest/auth/ChainedAuthorization.scala +++ /dev/null @@ -1,27 +0,0 @@ -package xyz.driver.core.rest.auth - -import xyz.driver.core.auth.{Permission, User} -import xyz.driver.core.rest.ServiceRequestContext - -import scala.concurrent.{ExecutionContext, Future} -import scalaz.Scalaz.{futureInstance, listInstance} -import scalaz.syntax.semigroup._ -import scalaz.syntax.traverse._ - -class ChainedAuthorization[U <: User](authorizations: Authorization[U]*)(implicit execution: ExecutionContext) - extends Authorization[U] { - - override def userHasPermissions(user: U, permissions: Seq[Permission])( - implicit ctx: ServiceRequestContext): Future[AuthorizationResult] = { - def allAuthorized(permissionsMap: Map[Permission, Boolean]): Boolean = - permissions.forall(permissionsMap.getOrElse(_, false)) - - authorizations.toList.foldLeftM[Future, AuthorizationResult](AuthorizationResult.unauthorized) { - (authResult, authorization) => - if (allAuthorized(authResult.authorized)) Future.successful(authResult) - else { - authorization.userHasPermissions(user, permissions).map(authResult |+| _) - } - } - } -} diff --git a/src/main/scala/xyz/driver/core/rest/directives/AuthDirectives.scala b/src/main/scala/xyz/driver/core/rest/directives/AuthDirectives.scala deleted file mode 100644 index ff3424d..0000000 --- a/src/main/scala/xyz/driver/core/rest/directives/AuthDirectives.scala +++ /dev/null @@ -1,19 +0,0 @@ -package xyz.driver.core -package rest -package directives - -import akka.http.scaladsl.server.{Directive1, Directives => AkkaDirectives} -import xyz.driver.core.auth.{Permission, User} -import xyz.driver.core.rest.auth.AuthProvider - -/** Authentication and authorization directives. */ -trait AuthDirectives extends AkkaDirectives { - - /** Authenticate a user based on service request headers and check if they have all given permissions. */ - def authenticateAndAuthorize[U <: User]( - authProvider: AuthProvider[U], - permissions: Permission*): Directive1[AuthorizedServiceRequestContext[U]] = { - authProvider.authorize(permissions: _*) - } - -} diff --git a/src/main/scala/xyz/driver/core/rest/directives/CorsDirectives.scala b/src/main/scala/xyz/driver/core/rest/directives/CorsDirectives.scala deleted file mode 100644 index 5a6bbfd..0000000 --- a/src/main/scala/xyz/driver/core/rest/directives/CorsDirectives.scala +++ /dev/null @@ -1,72 +0,0 @@ -package xyz.driver.core -package rest -package directives - -import akka.http.scaladsl.model.HttpMethods._ -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.model.{HttpResponse, StatusCodes} -import akka.http.scaladsl.server.{Route, Directives => AkkaDirectives} - -/** Directives to handle Cross-Origin Resource Sharing (CORS). */ -trait CorsDirectives extends AkkaDirectives { - - /** Route handler that injects Cross-Origin Resource Sharing (CORS) headers depending on the request - * origin. - * - * In a microservice environment, it can be difficult to know in advance the exact origin - * from which requests may be issued [1]. For example, the request may come from a web page served from - * any of the services, on any namespace or from other documentation sites. In general, only a set - * of domain suffixes can be assumed to be known in advance. Unfortunately however, browsers that - * implement CORS require exact specification of allowed origins, including full host name and scheme, - * in order to send credentials and headers with requests to other origins. - * - * This route wrapper provides a simple way alleviate CORS' exact allowed-origin requirement by - * dynamically echoing the origin as an allowed origin if and only if its domain is whitelisted. - * - * Note that the simplicity of this implementation comes with two notable drawbacks: - * - * - All OPTION requests are "hijacked" and will not be passed to the inner route of this wrapper. - * - * - Allowed methods and headers can not be customized on a per-request basis. All standard - * HTTP methods are allowed, and allowed headers are specified for all inner routes. - * - * This handler is not suited for cases where more fine-grained control of responses is required. - * - * [1] Assuming browsers communicate directly with the services and that requests aren't proxied through - * a common gateway. - * - * @param allowedSuffixes The set of domain suffixes (e.g. internal.example.org, example.org) of allowed - * origins. - * @param allowedHeaders Header names that will be set in `Access-Control-Allow-Headers`. - * @param inner Route into which CORS headers will be injected. - */ - def cors(allowedSuffixes: Set[String], allowedHeaders: Seq[String])(inner: Route): Route = { - optionalHeaderValueByType[Origin](()) { maybeOrigin => - val allowedOrigins: HttpOriginRange = maybeOrigin match { - // Note that this is not a security issue: the client will never send credentials if the allowed - // origin is set to *. This case allows us to deal with clients that do not send an origin header. - case None => HttpOriginRange.* - case Some(requestOrigin) => - val allowedOrigin = requestOrigin.origins.find(origin => - allowedSuffixes.exists(allowed => origin.host.host.address endsWith allowed)) - allowedOrigin.map(HttpOriginRange(_)).getOrElse(HttpOriginRange.*) - } - - respondWithHeaders( - `Access-Control-Allow-Origin`.forRange(allowedOrigins), - `Access-Control-Allow-Credentials`(true), - `Access-Control-Allow-Headers`(allowedHeaders: _*), - `Access-Control-Expose-Headers`(allowedHeaders: _*) - ) { - options { // options is used during preflight check - complete( - HttpResponse(StatusCodes.OK) - .withHeaders(`Access-Control-Allow-Methods`(OPTIONS, POST, PUT, GET, DELETE, PATCH, TRACE))) - } ~ inner // in case of non-preflight check we don't do anything special - } - } - } - -} - -object CorsDirectives extends CorsDirectives diff --git a/src/main/scala/xyz/driver/core/rest/directives/Directives.scala b/src/main/scala/xyz/driver/core/rest/directives/Directives.scala deleted file mode 100644 index 0cd4ef1..0000000 --- a/src/main/scala/xyz/driver/core/rest/directives/Directives.scala +++ /dev/null @@ -1,6 +0,0 @@ -package xyz.driver.core -package rest -package directives - -trait Directives extends AuthDirectives with CorsDirectives with PathMatchers with Unmarshallers -object Directives extends Directives diff --git a/src/main/scala/xyz/driver/core/rest/directives/PathMatchers.scala b/src/main/scala/xyz/driver/core/rest/directives/PathMatchers.scala deleted file mode 100644 index 8ba184f..0000000 --- a/src/main/scala/xyz/driver/core/rest/directives/PathMatchers.scala +++ /dev/null @@ -1,79 +0,0 @@ -package xyz.driver.core -package rest -package directives - -import java.time.Instant -import java.util.UUID - -import akka.http.scaladsl.model.Uri.Path -import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched} -import akka.http.scaladsl.server.{PathMatcher, PathMatcher1, PathMatchers => AkkaPathMatchers} -import eu.timepit.refined.collection.NonEmpty -import eu.timepit.refined.refineV -import xyz.driver.core.time.Time - -import scala.util.control.NonFatal - -/** Akka-HTTP path matchers for custom core types. */ -trait PathMatchers { - - private def UuidInPath[T]: PathMatcher1[Id[T]] = - AkkaPathMatchers.JavaUUID.map((id: UUID) => Id[T](id.toString.toLowerCase)) - - def IdInPath[T]: PathMatcher1[Id[T]] = UuidInPath[T] | new PathMatcher1[Id[T]] { - def apply(path: Path) = path match { - case Path.Segment(segment, tail) => Matched(tail, Tuple1(Id[T](segment))) - case _ => Unmatched - } - } - - def UuidIdInPath[T]: PathMatcher1[UuidId[T]] = - AkkaPathMatchers.JavaUUID.map((id: UUID) => UuidId[T](id)) - - def NumericIdInPath[T]: PathMatcher1[NumericId[T]] = - AkkaPathMatchers.LongNumber.map((id: Long) => NumericId[T](id)) - - def NameInPath[T]: PathMatcher1[Name[T]] = new PathMatcher1[Name[T]] { - def apply(path: Path) = path match { - case Path.Segment(segment, tail) => Matched(tail, Tuple1(Name[T](segment))) - case _ => Unmatched - } - } - - private def timestampInPath: PathMatcher1[Long] = - PathMatcher("""[+-]?\d*""".r) flatMap { string => - try Some(string.toLong) - catch { case _: IllegalArgumentException => None } - } - - def InstantInPath: PathMatcher1[Instant] = - new PathMatcher1[Instant] { - def apply(path: Path): PathMatcher.Matching[Tuple1[Instant]] = path match { - case Path.Segment(head, tail) => - try Matched(tail, Tuple1(Instant.parse(head))) - catch { - case NonFatal(_) => Unmatched - } - case _ => Unmatched - } - } | timestampInPath.map(Instant.ofEpochMilli) - - def TimeInPath: PathMatcher1[Time] = InstantInPath.map(instant => Time(instant.toEpochMilli)) - - def NonEmptyNameInPath[T]: PathMatcher1[NonEmptyName[T]] = new PathMatcher1[NonEmptyName[T]] { - def apply(path: Path) = path match { - case Path.Segment(segment, tail) => - refineV[NonEmpty](segment) match { - case Left(_) => Unmatched - case Right(nonEmptyString) => Matched(tail, Tuple1(NonEmptyName[T](nonEmptyString))) - } - case _ => Unmatched - } - } - - def RevisionInPath[T]: PathMatcher1[Revision[T]] = - PathMatcher("""[\da-fA-F]{8}-[\da-fA-F]{4}-[\da-fA-F]{4}-[\da-fA-F]{4}-[\da-fA-F]{12}""".r) flatMap { string => - Some(Revision[T](string)) - } - -} diff --git a/src/main/scala/xyz/driver/core/rest/directives/Unmarshallers.scala b/src/main/scala/xyz/driver/core/rest/directives/Unmarshallers.scala deleted file mode 100644 index 93a9a52..0000000 --- a/src/main/scala/xyz/driver/core/rest/directives/Unmarshallers.scala +++ /dev/null @@ -1,50 +0,0 @@ -package xyz.driver.core -package rest -package directives - -import java.util.UUID - -import akka.http.scaladsl.marshalling.{Marshaller, Marshalling} -import akka.http.scaladsl.unmarshalling.Unmarshaller -import spray.json.{JsString, JsValue, JsonParser, JsonReader, JsonWriter} - -/** Akka-HTTP unmarshallers for custom core types. */ -trait Unmarshallers { - - implicit def idUnmarshaller[A]: Unmarshaller[String, Id[A]] = - Unmarshaller.strict[String, Id[A]] { str => - Id[A](UUID.fromString(str).toString) - } - - implicit def uuidIdUnmarshaller[A]: Unmarshaller[String, UuidId[A]] = - Unmarshaller.strict[String, UuidId[A]] { str => - UuidId[A](UUID.fromString(str)) - } - - implicit def numericIdUnmarshaller[A]: Unmarshaller[Long, NumericId[A]] = - Unmarshaller.strict[Long, NumericId[A]] { x => - NumericId[A](x) - } - - implicit def paramUnmarshaller[T](implicit reader: JsonReader[T]): Unmarshaller[String, T] = - Unmarshaller.firstOf( - Unmarshaller.strict((JsString(_: String)) andThen reader.read), - stringToValueUnmarshaller[T] - ) - - implicit def revisionFromStringUnmarshaller[T]: Unmarshaller[String, Revision[T]] = - Unmarshaller.strict[String, Revision[T]](Revision[T]) - - val jsValueToStringMarshaller: Marshaller[JsValue, String] = - Marshaller.strict[JsValue, String](value => Marshalling.Opaque[String](() => value.compactPrint)) - - def valueToStringMarshaller[T](implicit jsonFormat: JsonWriter[T]): Marshaller[T, String] = - jsValueToStringMarshaller.compose[T](jsonFormat.write) - - val stringToJsValueUnmarshaller: Unmarshaller[String, JsValue] = - Unmarshaller.strict[String, JsValue](value => JsonParser(value)) - - def stringToValueUnmarshaller[T](implicit jsonFormat: JsonReader[T]): Unmarshaller[String, T] = - stringToJsValueUnmarshaller.map[T](jsonFormat.read) - -} diff --git a/src/main/scala/xyz/driver/core/rest/errors/serviceException.scala b/src/main/scala/xyz/driver/core/rest/errors/serviceException.scala deleted file mode 100644 index f2962c9..0000000 --- a/src/main/scala/xyz/driver/core/rest/errors/serviceException.scala +++ /dev/null @@ -1,27 +0,0 @@ -package xyz.driver.core.rest.errors - -sealed abstract class ServiceException(val message: String) extends Exception(message) - -final case class InvalidInputException(override val message: String = "Invalid input") extends ServiceException(message) - -final case class InvalidActionException(override val message: String = "This action is not allowed") - extends ServiceException(message) - -final case class UnauthorizedException( - override val message: String = "The user's authentication credentials are invalid or missing") - extends ServiceException(message) - -final case class ResourceNotFoundException(override val message: String = "Resource not found") - extends ServiceException(message) - -final case class ExternalServiceException( - serviceName: String, - serviceMessage: String, - serviceException: Option[ServiceException]) - extends ServiceException(s"Error while calling '$serviceName': $serviceMessage") - -final case class ExternalServiceTimeoutException(serviceName: String) - extends ServiceException(s"$serviceName took too long to respond") - -final case class DatabaseException(override val message: String = "Database access error") - extends ServiceException(message) diff --git a/src/main/scala/xyz/driver/core/rest/headers/Traceparent.scala b/src/main/scala/xyz/driver/core/rest/headers/Traceparent.scala deleted file mode 100644 index 866476d..0000000 --- a/src/main/scala/xyz/driver/core/rest/headers/Traceparent.scala +++ /dev/null @@ -1,33 +0,0 @@ -package xyz.driver.core -package rest -package headers - -import akka.http.scaladsl.model.headers.{ModeledCustomHeader, ModeledCustomHeaderCompanion} -import xyz.driver.core.reporting.SpanContext - -import scala.util.Try - -/** Encapsulates a trace context in an HTTP header for propagation across services. - * - * This implementation corresponds to the W3C editor's draft specification (as of 2018-08-28) - * https://w3c.github.io/distributed-tracing/report-trace-context.html. The 'flags' field is - * ignored. - */ -final case class Traceparent(spanContext: SpanContext) extends ModeledCustomHeader[Traceparent] { - override def renderInRequests = true - override def renderInResponses = true - override val companion: Traceparent.type = Traceparent - override def value: String = f"01-${spanContext.traceId}-${spanContext.spanId}-00" -} -object Traceparent extends ModeledCustomHeaderCompanion[Traceparent] { - override val name = "traceparent" - override def parse(value: String) = Try { - val Array(version, traceId, spanId, _) = value.split("-") - require( - version == "01", - s"Found unsupported version '$version' in traceparent header. Only version '01' is supported.") - new Traceparent( - new SpanContext(traceId, spanId) - ) - } -} diff --git a/src/main/scala/xyz/driver/core/rest/package.scala b/src/main/scala/xyz/driver/core/rest/package.scala deleted file mode 100644 index 3be8f02..0000000 --- a/src/main/scala/xyz/driver/core/rest/package.scala +++ /dev/null @@ -1,313 +0,0 @@ -package xyz.driver.core.rest - -import java.net.InetAddress - -import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable} -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.Materializer -import akka.stream.scaladsl.Flow -import akka.util.ByteString -import scalaz.Scalaz.{intInstance, stringInstance} -import scalaz.syntax.equal._ -import scalaz.{Functor, OptionT} -import xyz.driver.core.rest.auth.AuthProvider -import xyz.driver.core.rest.headers.Traceparent -import xyz.driver.tracing.TracingDirectives - -import scala.concurrent.Future -import scala.util.Try - -trait Service - -object Service - -trait HttpClient { - def makeRequest(request: HttpRequest): Future[HttpResponse] -} - -trait ServiceTransport { - - def sendRequestGetResponse(context: ServiceRequestContext)(requestStub: HttpRequest): Future[HttpResponse] - - def sendRequest(context: ServiceRequestContext)(requestStub: HttpRequest)( - implicit mat: Materializer): Future[Unmarshal[ResponseEntity]] -} - -sealed trait SortingOrder -object SortingOrder { - case object Asc extends SortingOrder - case object Desc extends SortingOrder -} - -final case class SortingField(name: String, sortingOrder: SortingOrder) -final case class Sorting(sortingFields: Seq[SortingField]) - -final case class Pagination(pageSize: Int, pageNumber: Int) { - require(pageSize > 0, "Page size must be greater than zero") - require(pageNumber > 0, "Page number must be greater than zero") - - def offset: Int = pageSize * (pageNumber - 1) -} - -final case class ListResponse[+T](items: Seq[T], meta: ListResponse.Meta) - -object ListResponse { - - def apply[T](items: Seq[T], size: Int, pagination: Option[Pagination]): ListResponse[T] = - ListResponse( - items = items, - meta = ListResponse.Meta(size, pagination.fold(1)(_.pageNumber), pagination.fold(size)(_.pageSize))) - - final case class Meta(itemsCount: Int, pageNumber: Int, pageSize: Int) - - object Meta { - def apply(itemsCount: Int, pagination: Pagination): Meta = - Meta(itemsCount, pagination.pageNumber, pagination.pageSize) - } - -} - -object `package` { - implicit class OptionTRestAdditions[T](optionT: OptionT[Future, T]) { - def responseOrNotFound(successCode: StatusCodes.Success = StatusCodes.OK)( - implicit F: Functor[Future], - em: ToEntityMarshaller[T]): Future[ToResponseMarshallable] = { - optionT.fold[ToResponseMarshallable](successCode -> _, StatusCodes.NotFound -> None) - } - } - - object ContextHeaders { - val AuthenticationTokenHeader: String = "Authorization" - val PermissionsTokenHeader: String = "Permissions" - val AuthenticationHeaderPrefix: String = "Bearer" - val ClientFingerprintHeader: String = "X-Client-Fingerprint" - val TrackingIdHeader: String = "X-Trace" - val StacktraceHeader: String = "X-Stacktrace" - val OriginatingIpHeader: String = "X-Forwarded-For" - val ResourceCount: String = "X-Resource-Count" - val PageCount: String = "X-Page-Count" - val TraceHeaderName: String = TracingDirectives.TraceHeaderName - val SpanHeaderName: String = TracingDirectives.SpanHeaderName - } - - val AllowedHeaders: Seq[String] = - Seq( - "Origin", - "X-Requested-With", - "Content-Type", - "Content-Length", - "Accept", - "X-Trace", - "Access-Control-Allow-Methods", - "Access-Control-Allow-Origin", - "Access-Control-Allow-Headers", - "Server", - "Date", - ContextHeaders.ClientFingerprintHeader, - ContextHeaders.TrackingIdHeader, - ContextHeaders.TraceHeaderName, - ContextHeaders.SpanHeaderName, - ContextHeaders.StacktraceHeader, - ContextHeaders.AuthenticationTokenHeader, - ContextHeaders.OriginatingIpHeader, - ContextHeaders.ResourceCount, - ContextHeaders.PageCount, - "X-Frame-Options", - "X-Content-Type-Options", - "Strict-Transport-Security", - AuthProvider.SetAuthenticationTokenHeader, - AuthProvider.SetPermissionsTokenHeader, - "Traceparent" - ) - - def allowOrigin(originHeader: Option[Origin]): `Access-Control-Allow-Origin` = - `Access-Control-Allow-Origin`( - originHeader.fold[HttpOriginRange](HttpOriginRange.*)(h => HttpOriginRange(h.origins: _*))) - - def serviceContext: Directive1[ServiceRequestContext] = { - def fixAuthorizationHeader(headers: Seq[HttpHeader]): collection.immutable.Seq[HttpHeader] = { - headers.map({ header => - if (header.name === ContextHeaders.AuthenticationTokenHeader && !header.value.startsWith( - ContextHeaders.AuthenticationHeaderPrefix)) { - Authorization(OAuth2BearerToken(header.value)) - } else header - })(collection.breakOut) - } - extractClientIP flatMap { remoteAddress => - mapRequest(req => req.withHeaders(fixAuthorizationHeader(req.headers))) tflatMap { _ => - extract(ctx => extractServiceContext(ctx.request, remoteAddress)) - } - } - } - - def respondWithCorsAllowedHeaders: Directive0 = { - respondWithHeaders( - List[HttpHeader]( - `Access-Control-Allow-Headers`(AllowedHeaders: _*), - `Access-Control-Expose-Headers`(AllowedHeaders: _*) - )) - } - - def respondWithCorsAllowedOriginHeaders(origin: Origin): Directive0 = { - respondWithHeader { - `Access-Control-Allow-Origin`(HttpOriginRange(origin.origins: _*)) - } - } - - def respondWithCorsAllowedMethodHeaders(methods: Set[HttpMethod]): Directive0 = { - respondWithHeaders( - List[HttpHeader]( - Allow(methods.to[collection.immutable.Seq]), - `Access-Control-Allow-Methods`(methods.to[collection.immutable.Seq]) - )) - } - - def extractServiceContext(request: HttpRequest, remoteAddress: RemoteAddress): ServiceRequestContext = - new ServiceRequestContext( - extractTrackingId(request), - extractOriginatingIP(request, remoteAddress), - extractContextHeaders(request)) - - def extractTrackingId(request: HttpRequest): String = { - request.headers - .find(_.name === ContextHeaders.TrackingIdHeader) - .fold(java.util.UUID.randomUUID.toString)(_.value()) - } - - def extractFingerprintHash(request: HttpRequest): Option[String] = { - request.headers - .find(_.name === ContextHeaders.ClientFingerprintHeader) - .map(_.value()) - } - - def extractOriginatingIP(request: HttpRequest, remoteAddress: RemoteAddress): Option[InetAddress] = { - request.headers - .find(_.name === ContextHeaders.OriginatingIpHeader) - .flatMap(ipName => Try(InetAddress.getByName(ipName.value)).toOption) - .orElse(remoteAddress.toOption) - } - - def extractStacktrace(request: HttpRequest): Array[String] = - request.headers.find(_.name == ContextHeaders.StacktraceHeader).fold("")(_.value()).split("->") - - def extractContextHeaders(request: HttpRequest): Map[String, String] = { - request.headers - .filter { h => - h.name === ContextHeaders.AuthenticationTokenHeader || h.name === ContextHeaders.TrackingIdHeader || - h.name === ContextHeaders.PermissionsTokenHeader || h.name === ContextHeaders.StacktraceHeader || - h.name === ContextHeaders.TraceHeaderName || h.name === ContextHeaders.SpanHeaderName || - h.name === ContextHeaders.OriginatingIpHeader || h.name === ContextHeaders.ClientFingerprintHeader || - h.name === Traceparent.name - } - .map { header => - if (header.name === ContextHeaders.AuthenticationTokenHeader) { - header.name -> header.value.stripPrefix(ContextHeaders.AuthenticationHeaderPrefix).trim - } else { - header.name -> header.value - } - } - .toMap - } - - private[rest] def escapeScriptTags(byteString: ByteString): ByteString = { - @annotation.tailrec - def dirtyIndices(from: Int, descIndices: List[Int]): List[Int] = { - val index = byteString.indexOf('/', from) - if (index === -1) descIndices.reverse - else { - val (init, tail) = byteString.splitAt(index) - if ((init endsWith "<") && (tail startsWith "/sc")) { - dirtyIndices(index + 1, index :: descIndices) - } else { - dirtyIndices(index + 1, descIndices) - } - } - } - - val indices = dirtyIndices(0, Nil) - - indices.headOption.fold(byteString) { head => - val builder = ByteString.newBuilder - builder ++= byteString.take(head) - - (indices :+ byteString.length).sliding(2).foreach { - case Seq(start, end) => - builder += ' ' - builder ++= byteString.slice(start, end) - case Seq(_) => // Should not match; sliding on at least 2 elements - assert(indices.nonEmpty, s"Indices should have been nonEmpty: $indices") - } - builder.result - } - } - - val sanitizeRequestEntity: Directive0 = { - mapRequest(request => request.mapEntity(entity => entity.transformDataBytes(Flow.fromFunction(escapeScriptTags)))) - } - - val paginated: Directive1[Pagination] = - parameters(("pageSize".as[Int] ? 100, "pageNumber".as[Int] ? 1)).as(Pagination) - - private def extractPagination(pageSizeOpt: Option[Int], pageNumberOpt: Option[Int]): Option[Pagination] = - (pageSizeOpt, pageNumberOpt) match { - case (Some(size), Some(number)) => Option(Pagination(size, number)) - case (None, None) => Option.empty[Pagination] - case (_, _) => throw new IllegalArgumentException("Pagination's parameters are incorrect") - } - - val optionalPagination: Directive1[Option[Pagination]] = - parameters(("pageSize".as[Int].?, "pageNumber".as[Int].?)).as(extractPagination) - - def paginationQuery(pagination: Pagination) = - Seq("pageNumber" -> pagination.pageNumber.toString, "pageSize" -> pagination.pageSize.toString) - - def completeWithPagination[T](handler: Option[Pagination] => Future[ListResponse[T]])( - implicit marshaller: ToEntityMarshaller[Seq[T]]): Route = { - optionalPagination { pagination => - onSuccess(handler(pagination)) { - case ListResponse(resultPart, ListResponse.Meta(count, _, pageSize)) => - val pageCount = if (pageSize == 0) 0 else (count / pageSize) + (if (count % pageSize == 0) 0 else 1) - val headers = List( - RawHeader(ContextHeaders.ResourceCount, count.toString), - RawHeader(ContextHeaders.PageCount, pageCount.toString)) - - respondWithHeaders(headers)(complete(ToResponseMarshallable(resultPart))) - } - } - } - - private def extractSorting(sortingString: Option[String]): Sorting = { - val sortingFields = sortingString.fold(Seq.empty[SortingField])( - _.split(",") - .filter(_.length > 0) - .map { sortingParam => - if (sortingParam.startsWith("-")) { - SortingField(sortingParam.substring(1), SortingOrder.Desc) - } else { - val fieldName = if (sortingParam.startsWith("+")) sortingParam.substring(1) else sortingParam - SortingField(fieldName, SortingOrder.Asc) - } - } - .toSeq) - - Sorting(sortingFields) - } - - val sorting: Directive1[Sorting] = parameter("sort".as[String].?).as(extractSorting) - - def sortingQuery(sorting: Sorting): Seq[(String, String)] = { - val sortingString = sorting.sortingFields - .map { sortingField => - sortingField.sortingOrder match { - case SortingOrder.Asc => sortingField.name - case SortingOrder.Desc => s"-${sortingField.name}" - } - } - .mkString(",") - Seq("sort" -> sortingString) - } -} diff --git a/src/main/scala/xyz/driver/core/rest/serviceDiscovery.scala b/src/main/scala/xyz/driver/core/rest/serviceDiscovery.scala deleted file mode 100644 index 55f1a2e..0000000 --- a/src/main/scala/xyz/driver/core/rest/serviceDiscovery.scala +++ /dev/null @@ -1,24 +0,0 @@ -package xyz.driver.core.rest - -import xyz.driver.core.Name - -trait ServiceDiscovery { - - def discover[T <: Service](serviceName: Name[Service]): T -} - -trait SavingUsedServiceDiscovery { - private val usedServices = new scala.collection.mutable.HashSet[String]() - - def saveServiceUsage(serviceName: Name[Service]): Unit = usedServices.synchronized { - usedServices += serviceName.value - } - - def getUsedServices: Set[String] = usedServices.synchronized { usedServices.toSet } -} - -class NoServiceDiscovery extends ServiceDiscovery with SavingUsedServiceDiscovery { - - def discover[T <: Service](serviceName: Name[Service]): T = - throw new IllegalArgumentException(s"Service with name $serviceName is unknown") -} diff --git a/src/main/scala/xyz/driver/core/rest/serviceRequestContext.scala b/src/main/scala/xyz/driver/core/rest/serviceRequestContext.scala deleted file mode 100644 index d2e4bc3..0000000 --- a/src/main/scala/xyz/driver/core/rest/serviceRequestContext.scala +++ /dev/null @@ -1,87 +0,0 @@ -package xyz.driver.core.rest - -import java.net.InetAddress - -import xyz.driver.core.auth.{AuthToken, PermissionsToken, User} -import xyz.driver.core.generators -import scalaz.Scalaz.{mapEqual, stringInstance} -import scalaz.syntax.equal._ -import xyz.driver.core.reporting.SpanContext -import xyz.driver.core.rest.auth.AuthProvider -import xyz.driver.core.rest.headers.Traceparent - -import scala.util.Try - -class ServiceRequestContext( - val trackingId: String = generators.nextUuid().toString, - val originatingIp: Option[InetAddress] = None, - val contextHeaders: Map[String, String] = Map.empty[String, String]) { - def authToken: Option[AuthToken] = - contextHeaders.get(AuthProvider.AuthenticationTokenHeader).map(AuthToken.apply) - - def permissionsToken: Option[PermissionsToken] = - contextHeaders.get(AuthProvider.PermissionsTokenHeader).map(PermissionsToken.apply) - - def withAuthToken(authToken: AuthToken): ServiceRequestContext = - new ServiceRequestContext( - trackingId, - originatingIp, - contextHeaders.updated(AuthProvider.AuthenticationTokenHeader, authToken.value) - ) - - def withAuthenticatedUser[U <: User](authToken: AuthToken, user: U): AuthorizedServiceRequestContext[U] = - new AuthorizedServiceRequestContext( - trackingId, - originatingIp, - contextHeaders.updated(AuthProvider.AuthenticationTokenHeader, authToken.value), - user - ) - - override def hashCode(): Int = - Seq[Any](trackingId, originatingIp, contextHeaders) - .foldLeft(31)((result, obj) => 31 * result + obj.hashCode()) - - override def equals(obj: Any): Boolean = obj match { - case ctx: ServiceRequestContext => - trackingId === ctx.trackingId && - originatingIp == originatingIp && - contextHeaders === ctx.contextHeaders - case _ => false - } - - def spanContext: SpanContext = { - val validHeader = Try { - contextHeaders(Traceparent.name) - }.flatMap { value => - Traceparent.parse(value) - } - validHeader.map(_.spanContext).getOrElse(SpanContext.fresh()) - } - - override def toString: String = s"ServiceRequestContext($trackingId, $contextHeaders)" -} - -class AuthorizedServiceRequestContext[U <: User]( - override val trackingId: String = generators.nextUuid().toString, - override val originatingIp: Option[InetAddress] = None, - override val contextHeaders: Map[String, String] = Map.empty[String, String], - val authenticatedUser: U) - extends ServiceRequestContext { - - def withPermissionsToken(permissionsToken: PermissionsToken): AuthorizedServiceRequestContext[U] = - new AuthorizedServiceRequestContext[U]( - trackingId, - originatingIp, - contextHeaders.updated(AuthProvider.PermissionsTokenHeader, permissionsToken.value), - authenticatedUser) - - override def hashCode(): Int = 31 * super.hashCode() + authenticatedUser.hashCode() - - override def equals(obj: Any): Boolean = obj match { - case ctx: AuthorizedServiceRequestContext[U] => super.equals(ctx) && ctx.authenticatedUser == authenticatedUser - case _ => false - } - - override def toString: String = - s"AuthorizedServiceRequestContext($trackingId, $contextHeaders, $authenticatedUser)" -} diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala deleted file mode 100644 index b5e8678..0000000 --- a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala +++ /dev/null @@ -1,108 +0,0 @@ -package xyz.driver.core.storage - -import java.io.ByteArrayInputStream -import java.net.URL -import java.nio.file.Path -import java.util.Date - -import akka.Done -import akka.stream.scaladsl.{Sink, Source, StreamConverters} -import akka.util.ByteString -import com.aliyun.oss.OSSClient -import com.aliyun.oss.model.ObjectPermission -import com.typesafe.config.Config -import xyz.driver.core.time.provider.TimeProvider - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class AliyunBlobStorage( - client: OSSClient, - bucketId: String, - timeProvider: TimeProvider, - chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext) - extends SignedBlobStorage { - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - client.putObject(bucketId, name, new ByteArrayInputStream(content)) - name - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - client.putObject(bucketId, name, content.toFile) - name - } - - override def exists(name: String): Future[Boolean] = Future { - client.doesObjectExist(bucketId, name) - } - - override def list(prefix: String): Future[Set[String]] = Future { - client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut) - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - val inputStream = obj.getObjectContent - Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray - } - } - - override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - StreamConverters - .asInputStream() - .mapMaterializedValue(is => - Future { - client.putObject(bucketId, name, is) - Done - }) - } - - override def delete(name: String): Future[String] = Future { - client.deleteObject(bucketId, name) - name - } - - override def url(name: String): Future[Option[URL]] = Future { - // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm - Option(client.getObjectAcl(bucketId, name)).map { acl => - val isPrivate = acl.getPermission == ObjectPermission.Private - val bucket = client.getBucketInfo(bucketId).getBucket - val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint - new URL(s"https://$bucketId.$endpointUrl/$name") - } - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - if (client.doesObjectExist(bucketId, name)) { - val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis) - Some(client.generatePresignedUrl(bucketId, name, expiration)) - } else { - None - } - } -} - -object AliyunBlobStorage { - val DefaultChunkSize: Int = 8192 - - def apply(config: Config, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val clientId = config.getString("storage.aliyun.clientId") - val clientSecret = config.getString("storage.aliyun.clientSecret") - val endpoint = config.getString("storage.aliyun.endpoint") - this(clientId, clientSecret, endpoint, bucketId, timeProvider) - } - - def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val client = new OSSClient(endpoint, clientId, clientSecret) - new AliyunBlobStorage(client, bucketId, timeProvider) - } -} diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala deleted file mode 100644 index 0cde96a..0000000 --- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala +++ /dev/null @@ -1,50 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.{Sink, Source} -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration.Duration - -/** Binary key-value store, typically implemented by cloud storage. */ -trait BlobStorage { - - /** Upload data by value. */ - def uploadContent(name: String, content: Array[Byte]): Future[String] - - /** Upload data from an existing file. */ - def uploadFile(name: String, content: Path): Future[String] - - def exists(name: String): Future[Boolean] - - /** List available keys. The prefix determines which keys should be listed - * and depends on the implementation (for instance, a file system backed - * blob store will treat a prefix as a directory path). */ - def list(prefix: String): Future[Set[String]] - - /** Get all the content of a given object. */ - def content(name: String): Future[Option[Array[Byte]]] - - /** Stream data asynchronously and with backpressure. */ - def download(name: String): Future[Option[Source[ByteString, Any]]] - - /** Get a sink to upload data. */ - def upload(name: String): Future[Sink[ByteString, Future[Done]]] - - /** Delete a stored value. */ - def delete(name: String): Future[String] - - /** - * Path to specified resource. Checks that the resource exists and returns None if - * it is not found. Depending on the implementation, may throw. - */ - def url(name: String): Future[Option[URL]] -} - -trait SignedBlobStorage extends BlobStorage { - def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] -} diff --git a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala deleted file mode 100644 index e12c73d..0000000 --- a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala +++ /dev/null @@ -1,82 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.{Files, Path, StandardCopyOption} - -import akka.stream.scaladsl.{FileIO, Sink, Source} -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} - -/** A blob store that is backed by a local filesystem. All objects are stored relative to the given - * root path. Slashes ('/') in blob names are treated as usual path separators and are converted - * to directories. */ -class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage { - - private def ensureParents(file: Path): Path = { - Files.createDirectories(file.getParent()) - file - } - - private def file(name: String) = root.resolve(name) - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - Files.write(ensureParents(file(name)), content) - name - } - override def uploadFile(name: String, content: Path): Future[String] = Future { - Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING) - name - } - - override def exists(name: String): Future[Boolean] = Future { - val path = file(name) - Files.exists(path) && Files.isReadable(path) - } - - override def list(prefix: String): Future[Set[String]] = Future { - val dir = file(prefix) - Files - .list(dir) - .iterator() - .asScala - .map(p => root.relativize(p)) - .map(_.toString) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map { - case true => - Some(Files.readAllBytes(file(name))) - case false => None - } - - override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future { - if (Files.exists(file(name))) { - Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed)) - } else { - None - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val f = ensureParents(file(name)) - FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done)) - } - - override def delete(name: String): Future[String] = exists(name).map { e => - if (e) { - Files.delete(file(name)) - } - name - } - - override def url(name: String): Future[Option[URL]] = exists(name) map { - case true => - Some(root.resolve(name).toUri.toURL) - case false => - None - } -} diff --git a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala deleted file mode 100644 index 95164c7..0000000 --- a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala +++ /dev/null @@ -1,96 +0,0 @@ -package xyz.driver.core.storage - -import java.io.{FileInputStream, InputStream} -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.Sink -import akka.util.ByteString -import com.google.api.gax.paging.Page -import com.google.auth.oauth2.ServiceAccountCredentials -import com.google.cloud.storage.Storage.BlobListOption -import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions} - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)( - implicit ec: ExecutionContext) - extends BlobStorage with SignedBlobStorage { - - private val bucket: Bucket = client.get(bucketId) - require(bucket != null, s"Bucket $bucketId does not exist.") - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - bucket.create(name, content).getBlobId.getName - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName - } - - override def exists(name: String): Future[Boolean] = Future { - bucket.get(name) != null - } - - override def list(prefix: String): Future[Set[String]] = Future { - val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix)) - page - .iterateAll() - .asScala - .map(_.getName()) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(bucket.get(name)).map(blob => blob.getContent()) - } - - override def download(name: String) = Future { - Option(bucket.get(name)).map { blob => - ChannelStream.fromChannel(() => blob.reader(), chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val blob = bucket.create(name, Array.emptyByteArray) - ChannelStream.toChannel(() => blob.writer(), chunkSize) - } - - override def delete(name: String): Future[String] = Future { - client.delete(BlobId.of(bucketId, name)) - name - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit)) - } - - override def url(name: String): Future[Option[URL]] = Future { - val protocol: String = "https" - val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/" - Option(bucket.get(name)).map { blob => - new URL(protocol, resourcePath, blob.getName) - } - } -} - -object GcsBlobStorage { - final val DefaultChunkSize = 8192 - - private def newClient(key: InputStream): Storage = - StorageOptions - .newBuilder() - .setCredentials(ServiceAccountCredentials.fromStream(key)) - .build() - .getService() - - def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)( - implicit ec: ExecutionContext): GcsBlobStorage = { - val client = newClient(new FileInputStream(keyfile.toFile)) - new GcsBlobStorage(client, bucketId, chunkSize) - } - -} diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala deleted file mode 100644 index fc652be..0000000 --- a/src/main/scala/xyz/driver/core/storage/channelStreams.scala +++ /dev/null @@ -1,112 +0,0 @@ -package xyz.driver.core.storage - -import java.nio.ByteBuffer -import java.nio.channels.{ReadableByteChannel, WritableByteChannel} - -import akka.stream._ -import akka.stream.scaladsl.{Sink, Source} -import akka.stream.stage._ -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.concurrent.{Future, Promise} -import scala.util.control.NonFatal - -class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int) - extends GraphStage[SourceShape[ByteString]] { - - val out = Outlet[ByteString]("ChannelSource.out") - val shape = SourceShape(out) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends OutHandler { - override def onPull(): Unit = { - try { - val buffer = ByteBuffer.allocate(chunkSize) - if (channel.read(buffer) > 0) { - buffer.flip() - push(out, ByteString.fromByteBuffer(buffer)) - } else { - completeStage() - } - } catch { - case NonFatal(_) => - channel.close() - } - } - override def onDownstreamFinish(): Unit = { - channel.close() - } - } - - setHandler(out, Handler) - } - -} - -class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int) - extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { - - val in = Inlet[ByteString]("ChannelSink.in") - val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { - val promise = Promise[Done]() - val logic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends InHandler { - override def onPush(): Unit = { - try { - val data = grab(in) - channel.write(data.asByteBuffer) - pull(in) - } catch { - case NonFatal(e) => - channel.close() - promise.failure(e) - } - } - - override def onUpstreamFinish(): Unit = { - channel.close() - completeStage() - promise.success(Done) - } - - override def onUpstreamFailure(ex: Throwable): Unit = { - channel.close() - promise.failure(ex) - } - } - - setHandler(in, Handler) - - override def preStart(): Unit = { - pull(in) - } - } - (logic, promise.future) - } - -} - -object ChannelStream { - - def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = { - Source - .fromGraph(new ChannelSource(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - - def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = { - Sink - .fromGraph(new ChannelSink(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - -} diff --git a/src/main/scala/xyz/driver/core/swagger.scala b/src/main/scala/xyz/driver/core/swagger.scala deleted file mode 100644 index 0c1e15d..0000000 --- a/src/main/scala/xyz/driver/core/swagger.scala +++ /dev/null @@ -1,164 +0,0 @@ -package xyz.driver.core - -import java.lang.annotation.Annotation -import java.lang.reflect.Type -import java.util - -import com.fasterxml.jackson.databind.{BeanDescription, ObjectMapper} -import com.fasterxml.jackson.databind.`type`.ReferenceType -import io.swagger.converter._ -import io.swagger.jackson.AbstractModelConverter -import io.swagger.models.{Model, ModelImpl} -import io.swagger.models.properties._ -import io.swagger.util.{Json, PrimitiveType} -import spray.json._ - -object swagger { - - def configureCustomSwaggerModels( - customPropertiesExamples: Map[Class[_], Property], - customObjectsExamples: Map[Class[_], JsValue]) = { - ModelConverters - .getInstance() - .addConverter(new CustomSwaggerJsonConverter(Json.mapper(), customPropertiesExamples, customObjectsExamples)) - } - - object CustomSwaggerJsonConverter { - - def stringProperty(pattern: Option[String] = None, example: Option[String] = None): Property = { - make(new StringProperty()) { sp => - sp.required(true) - example.foreach(sp.example) - pattern.foreach(sp.pattern) - } - } - - def enumProperty[V](values: V*): Property = { - make(new StringProperty()) { sp => - for (v <- values) sp._enum(v.toString) - sp.setRequired(true) - } - } - - def numericProperty(example: Option[AnyRef] = None): Property = { - make(PrimitiveType.DECIMAL.createProperty()) { dp => - dp.setRequired(true) - example.foreach(dp.setExample) - } - } - - def booleanProperty(): Property = { - make(new BooleanProperty()) { bp => - bp.setRequired(true) - } - } - } - - @SuppressWarnings(Array("org.wartremover.warts.Null")) - class CustomSwaggerJsonConverter( - mapper: ObjectMapper, - customProperties: Map[Class[_], Property], - customObjects: Map[Class[_], JsValue]) - extends AbstractModelConverter(mapper) { - import CustomSwaggerJsonConverter._ - - override def resolveProperty( - `type`: Type, - context: ModelConverterContext, - annotations: Array[Annotation], - chain: util.Iterator[ModelConverter]): Property = { - val javaType = Json.mapper().constructType(`type`) - - Option(javaType.getRawClass) - .flatMap { cls => - customProperties.get(cls) - } - .orElse { - `type` match { - case rt: ReferenceType if isOption(javaType.getRawClass) && chain.hasNext => - val nextType = rt.getContentType - val nextResolved = Option(resolveProperty(nextType, context, annotations, chain)).getOrElse( - chain.next().resolveProperty(nextType, context, annotations, chain)) - nextResolved.setRequired(false) - Option(nextResolved) - case t if chain.hasNext => - val nextResolved = chain.next().resolveProperty(t, context, annotations, chain) - nextResolved.setRequired(true) - Option(nextResolved) - case _ => - Option.empty[Property] - } - } - .orNull - } - - @SuppressWarnings(Array("org.wartremover.warts.Null")) - override def resolve(`type`: Type, context: ModelConverterContext, chain: util.Iterator[ModelConverter]): Model = { - - val javaType = Json.mapper().constructType(`type`) - - (getEnumerationInstance(javaType.getRawClass) match { - case Some(_) => Option.empty[Model] // ignore scala enums - case None => - val customObjectModel = customObjects.get(javaType.getRawClass).map { objectExampleJson => - val properties = objectExampleJson.asJsObject.fields.mapValues(parseJsonValueToSwaggerProperty).flatMap { - case (key, value) => value.map(v => key -> v) - } - - val beanDesc = _mapper.getSerializationConfig.introspect[BeanDescription](javaType) - val name = _typeName(javaType, beanDesc) - - make(new ModelImpl()) { model => - model.name(name) - properties.foreach { case (field, property) => model.addProperty(field, property) } - } - } - - customObjectModel.orElse { - if (chain.hasNext) { - val next = chain.next() - Option(next.resolve(`type`, context, chain)) - } else { - Option.empty[Model] - } - } - }).orNull - } - - private def parseJsonValueToSwaggerProperty(jsValue: JsValue): Option[Property] = { - import scala.collection.JavaConverters._ - - jsValue match { - case JsArray(elements) => - elements.headOption.flatMap(parseJsonValueToSwaggerProperty).map { itemProperty => - new ArrayProperty(itemProperty) - } - case JsObject(subFields) => - val subProperties = subFields.mapValues(parseJsonValueToSwaggerProperty).flatMap { - case (key, value) => value.map(v => key -> v) - } - Option(new ObjectProperty(subProperties.asJava)) - case JsBoolean(_) => Option(booleanProperty()) - case JsNumber(value) => Option(numericProperty(example = Option(value))) - case JsString(value) => Option(stringProperty(example = Option(value))) - case _ => Option.empty[Property] - } - } - - private def getEnumerationInstance(cls: Class[_]): Option[Enumeration] = { - if (cls.getFields.map(_.getName).contains("MODULE$")) { - val javaUniverse = scala.reflect.runtime.universe - val m = javaUniverse.runtimeMirror(Thread.currentThread().getContextClassLoader) - val moduleMirror = m.reflectModule(m.staticModule(cls.getName)) - moduleMirror.instance match { - case enumInstance: Enumeration => Some(enumInstance) - case _ => None - } - } else { - None - } - } - - private def isOption(cls: Class[_]): Boolean = cls.equals(classOf[scala.Option[_]]) - } -} diff --git a/src/main/scala/xyz/driver/core/tagging/tagging.scala b/src/main/scala/xyz/driver/core/tagging/tagging.scala deleted file mode 100644 index 5b6599e..0000000 --- a/src/main/scala/xyz/driver/core/tagging/tagging.scala +++ /dev/null @@ -1,62 +0,0 @@ -package xyz.driver.core - -import scala.collection.generic.CanBuildFrom -import scala.language.{higherKinds, implicitConversions} - -/** - * @author sergey - * @since 9/11/18 - */ -package object tagging { - - implicit class Taggable[V <: Any](val v: V) extends AnyVal { - def tagged[Tag]: V @@ Tag = v.asInstanceOf[V @@ Tag] - } - -} - -package tagging { - - sealed trait Tagged[+V, +Tag] - - object Tagged { - implicit class TaggedOps[V, Tag](val v: V @@ Tag) extends AnyVal { - def tagless: V = v - } - - implicit def orderingMagnet[V, Tag](implicit ord: Ordering[V]): Ordering[V @@ Tag] = - ord.asInstanceOf[Ordering[V @@ Tag]] - - } - - sealed trait Trimmed - - object Trimmed { - - implicit def apply[V](trimmable: V)(implicit ev: CanBeTrimmed[V]): V @@ Trimmed = { - ev.trim(trimmable).tagged[Trimmed] - } - - sealed trait CanBeTrimmed[T] { - def trim(trimmable: T): T - } - - implicit object StringCanBeTrimmed extends CanBeTrimmed[String] { - def trim(str: String): String = str.trim() - } - - implicit def nameCanBeTrimmed[T]: CanBeTrimmed[Name[T]] = new CanBeTrimmed[Name[T]] { - def trim(name: Name[T]): Name[T] = Name[T](name.value.trim()) - } - - implicit def option2Trimmed[V: CanBeTrimmed](option: Option[V]): Option[V @@ Trimmed] = - option.map(Trimmed(_)) - - implicit def coll2Trimmed[T, C[_] <: Traversable[_]](coll: C[T])( - implicit ev: C[T] <:< Traversable[T], - tr: CanBeTrimmed[T], - bf: CanBuildFrom[Nothing, T @@ Trimmed, C[T @@ Trimmed]]): C[T @@ Trimmed] = - ev(coll).map(Trimmed(_)(tr)).to[C] - } - -} diff --git a/src/main/scala/xyz/driver/core/time.scala b/src/main/scala/xyz/driver/core/time.scala deleted file mode 100644 index 1622068..0000000 --- a/src/main/scala/xyz/driver/core/time.scala +++ /dev/null @@ -1,209 +0,0 @@ -package xyz.driver.core - -import java.text.SimpleDateFormat -import java.time.{Clock, Instant, ZoneId, ZoneOffset} -import java.util._ -import java.util.concurrent.TimeUnit - -import xyz.driver.core.date.Month - -import scala.concurrent.duration._ -import scala.language.implicitConversions -import scala.util.Try - -object time { - - // The most useful time units - val Second = 1000L - val Seconds = Second - val Minute = 60 * Seconds - val Minutes = Minute - val Hour = 60 * Minutes - val Hours = Hour - val Day = 24 * Hours - val Days = Day - val Week = 7 * Days - val Weeks = Week - - final case class Time(millis: Long) extends AnyVal { - - def isBefore(anotherTime: Time): Boolean = implicitly[Ordering[Time]].lt(this, anotherTime) - - def isAfter(anotherTime: Time): Boolean = implicitly[Ordering[Time]].gt(this, anotherTime) - - def advanceBy(duration: Duration): Time = Time(millis + duration.toMillis) - - def durationTo(anotherTime: Time): Duration = Duration.apply(anotherTime.millis - millis, TimeUnit.MILLISECONDS) - - def durationFrom(anotherTime: Time): Duration = Duration.apply(millis - anotherTime.millis, TimeUnit.MILLISECONDS) - - def toDate(timezone: TimeZone): date.Date = { - val cal = Calendar.getInstance(timezone) - cal.setTimeInMillis(millis) - date.Date(cal.get(Calendar.YEAR), date.Month(cal.get(Calendar.MONTH)), cal.get(Calendar.DAY_OF_MONTH)) - } - - def toInstant: Instant = Instant.ofEpochMilli(millis) - } - - object Time { - implicit def timeOrdering: Ordering[Time] = Ordering.by(_.millis) - - implicit def apply(instant: Instant): Time = Time(instant.toEpochMilli) - } - - /** - * Encapsulates a time and timezone without a specific date. - */ - final case class TimeOfDay(localTime: java.time.LocalTime, timeZone: TimeZone) { - - /** - * Is this time before another time on a specific day. Day light savings safe. These are zero-indexed - * for month/day. - */ - def isBefore(other: TimeOfDay, day: Int, month: Month, year: Int): Boolean = { - toCalendar(day, month, year).before(other.toCalendar(day, month, year)) - } - - /** - * Is this time after another time on a specific day. Day light savings safe. - */ - def isAfter(other: TimeOfDay, day: Int, month: Month, year: Int): Boolean = { - toCalendar(day, month, year).after(other.toCalendar(day, month, year)) - } - - def sameTimeAs(other: TimeOfDay, day: Int, month: Month, year: Int): Boolean = { - toCalendar(day, month, year).equals(other.toCalendar(day, month, year)) - } - - /** - * Enforces the same formatting as expected by [[java.sql.Time]] - * @return string formatted for `java.sql.Time` - */ - def timeString: String = { - localTime.format(TimeOfDay.getFormatter) - } - - /** - * @return a string parsable by [[java.util.TimeZone]] - */ - def timeZoneString: String = { - timeZone.getID - } - - /** - * @return this [[TimeOfDay]] as [[java.sql.Time]] object, [[java.sql.Time.valueOf]] will - * throw when the string is not valid, but this is protected by [[timeString]] method. - */ - def toTime: java.sql.Time = { - java.sql.Time.valueOf(timeString) - } - - private def toCalendar(day: Int, month: Int, year: Int): Calendar = { - val cal = Calendar.getInstance(timeZone) - cal.set(year, month, day, localTime.getHour, localTime.getMinute, localTime.getSecond) - cal.clear(Calendar.MILLISECOND) - cal - } - } - - object TimeOfDay { - def now(): TimeOfDay = { - TimeOfDay(java.time.LocalTime.now(), TimeZone.getDefault) - } - - /** - * Throws when [s] is not parsable by [[java.time.LocalTime.parse]], uses default [[java.util.TimeZone]] - */ - def parseTimeString(tz: TimeZone = TimeZone.getDefault)(s: String): TimeOfDay = { - TimeOfDay(java.time.LocalTime.parse(s), tz) - } - - def fromString(tz: TimeZone)(s: String): Option[TimeOfDay] = { - val op = Try(java.time.LocalTime.parse(s)).toOption - op.map(lt => TimeOfDay(lt, tz)) - } - - def fromStrings(zoneId: String)(s: String): Option[TimeOfDay] = { - val op = Try(TimeZone.getTimeZone(zoneId)).toOption - op.map(tz => TimeOfDay.parseTimeString(tz)(s)) - } - - /** - * Formatter that enforces `HH:mm:ss` which is expected by [[java.sql.Time]] - */ - def getFormatter: java.time.format.DateTimeFormatter = { - java.time.format.DateTimeFormatter.ofPattern("HH:mm:ss") - } - } - - final case class TimeRange(start: Time, end: Time) { - def duration: Duration = FiniteDuration(end.millis - start.millis, MILLISECONDS) - } - - def startOfMonth(time: Time) = { - Time(make(new GregorianCalendar()) { cal => - cal.setTime(new Date(time.millis)) - cal.set(Calendar.DAY_OF_MONTH, cal.getActualMinimum(Calendar.DAY_OF_MONTH)) - }.getTime.getTime) - } - - def textualDate(timezone: TimeZone)(time: Time): String = - make(new SimpleDateFormat("MMMM d, yyyy"))(_.setTimeZone(timezone)).format(new Date(time.millis)) - - def textualTime(timezone: TimeZone)(time: Time): String = - make(new SimpleDateFormat("MMM dd, yyyy hh:mm:ss a"))(_.setTimeZone(timezone)).format(new Date(time.millis)) - - class ChangeableClock(@volatile var instant: Instant, val zone: ZoneId = ZoneOffset.UTC) extends Clock { - - def tick(duration: FiniteDuration): Unit = - instant = instant.plusNanos(duration.toNanos) - - val getZone: ZoneId = zone - - def withZone(zone: ZoneId): Clock = new ChangeableClock(instant, zone = zone) - - override def toString: String = "ChangeableClock(" + instant + "," + zone + ")" - } - - object provider { - - /** - * Time providers are supplying code with current times - * and are extremely useful for testing to check how system is going - * to behave at specific moments in time. - * - * All the calls to receive current time must be made using time - * provider injected to the caller. - */ - @deprecated( - "Use java.time.Clock instead. Note that xyz.driver.core.Time and xyz.driver.core.date.Date will also be deprecated soon!", - "0.13.0") - trait TimeProvider { - def currentTime(): Time - def toClock: Clock - } - - final implicit class ClockTimeProvider(clock: Clock) extends TimeProvider { - def currentTime(): Time = Time(clock.instant().toEpochMilli) - - val toClock: Clock = clock - } - - final class SystemTimeProvider extends TimeProvider { - def currentTime() = Time(System.currentTimeMillis()) - - lazy val toClock: Clock = Clock.systemUTC() - } - - final val SystemTimeProvider = new SystemTimeProvider - - final class SpecificTimeProvider(time: Time) extends TimeProvider { - - def currentTime(): Time = time - - lazy val toClock: Clock = Clock.fixed(time.toInstant, ZoneOffset.UTC) - } - - } -} |