aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/Topic.scala
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2018-07-31 12:13:47 -0700
committerGitHub <noreply@github.com>2018-07-31 12:13:47 -0700
commite6552f3b31b55396c652c196c5c3a9c3a6cfed71 (patch)
treef6826eac8bff8470683547006d1e64d2bc425d55 /src/main/scala/xyz/driver/core/messaging/Topic.scala
parentdb0c9bebee4cbc587d4da0a624f671ffcf7a649f (diff)
downloaddriver-core-e6552f3b31b55396c652c196c5c3a9c3a6cfed71.tar.gz
driver-core-e6552f3b31b55396c652c196c5c3a9c3a6cfed71.tar.bz2
driver-core-e6552f3b31b55396c652c196c5c3a9c3a6cfed71.zip
Add message bus and topic abstractions (#181)v1.12.0
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/Topic.scala')
-rw-r--r--src/main/scala/xyz/driver/core/messaging/Topic.scala43
1 files changed, 43 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/Topic.scala b/src/main/scala/xyz/driver/core/messaging/Topic.scala
new file mode 100644
index 0000000..32fd764
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/messaging/Topic.scala
@@ -0,0 +1,43 @@
+package xyz.driver.core
+package messaging
+
+import java.nio.ByteBuffer
+
+/** A topic is a named group of messages that all share a common schema.
+ * @tparam Message type of messages sent over this topic */
+trait Topic[Message] {
+
+ /** Name of this topic (must be unique). */
+ def name: String
+
+ /** Convert a message to its wire format that will be sent over a bus. */
+ def serialize(message: Message): ByteBuffer
+
+ /** Convert a message from its wire format. */
+ def deserialize(message: ByteBuffer): Message
+
+}
+
+object Topic {
+
+ /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */
+ def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] {
+ def name = name0
+ override def serialize(message: ByteBuffer): ByteBuffer = message
+ override def deserialize(message: ByteBuffer): ByteBuffer = message
+ }
+
+ /** Create a topic that represents data as UTF-8 encoded strings. */
+ def string(name0: String): Topic[String] = new Topic[String] {
+ def name = name0
+ override def serialize(message: String): ByteBuffer = {
+ ByteBuffer.wrap(message.getBytes("utf-8"))
+ }
+ override def deserialize(message: ByteBuffer): String = {
+ val bytes = new Array[Byte](message.remaining())
+ message.get(bytes)
+ new String(bytes, "utf-8")
+ }
+ }
+
+}