Neutron
Neutron is a purely functional Apache Pulsar client for Scala, build on top of fs2 and the Java Pulsar client.
It is published for Scala 2.13. You can include it in your project by adding the following dependencies.
- sbt
libraryDependencies ++= Seq( "com.chatroulette" %% "neutron-core" % "0.0.10", "com.chatroulette" %% "neutron-circe" % "0.0.10", "com.chatroulette" %% "neutron-function" % "0.0.10" )
- Maven
<dependencies> <dependency> <groupId>com.chatroulette</groupId> <artifactId>neutron-core_2.13</artifactId> <version>0.0.10</version> </dependency> <dependency> <groupId>com.chatroulette</groupId> <artifactId>neutron-circe_2.13</artifactId> <version>0.0.10</version> </dependency> <dependency> <groupId>com.chatroulette</groupId> <artifactId>neutron-function_2.13</artifactId> <version>0.0.10</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.chatroulette:neutron-core_2.13:0.0.10" implementation "com.chatroulette:neutron-circe_2.13:0.0.10" implementation "com.chatroulette:neutron-function_2.13:0.0.10" }
Quick start
Here’s a quick consumer / producer example using Neutron. Note: both are fully asynchronous.
import scala.concurrent.duration._
import cats.effect._
import cr.pulsar.Pulsar.PulsarURL
import fs2.Stream
import cr.pulsar._
import cr.pulsar.schema.utf8._
object Demo extends IOApp.Simple {
val url = PulsarURL("pulsar://localhost:6650")
val topic =
Topic.simple("my-topic", Topic.Type.NonPersistent)
val subs =
Subscription.Builder
.withName("my-sub")
.withType(Subscription.Type.Shared)
.build
val resources: Resource[IO, (Consumer[IO, String], Producer[IO, String])] =
for {
pulsar <- Pulsar.make[IO](url)
consumer <- Consumer.make[IO, String](pulsar, topic, subs)
producer <- Producer.make[IO, String](pulsar, topic)
} yield consumer -> producer
val run: IO[Unit] =
Stream
.resource(resources)
.flatMap {
case (consumer, producer) =>
val consume =
consumer
.autoSubscribe
.evalMap(IO.println)
val produce =
Stream
.emit("test data")
.covary[IO]
.metered(3.seconds)
.evalMap(producer.send_)
consume.concurrently(produce)
}
.compile
.drain
}
Schema
As of version 0.0.6
, Neutron ships with support for Pulsar Schema. The simplest way to get started is to use the given UTF-8 encoding, which makes use of the native Schema.BYTES
.
import cr.pulsar.schema.Schema
import cr.pulsar.schema.utf8._
val schema = Schema[String] // summon instance
This brings into scope an Schema[String]
instance, required to initialize consumers and producers. There’s also a default instance Schema[A]
, for any cats.Inject[A, Array[Byte]]
instance (based on Schema.BYTES
as well).
At Chatroulette, we use JSON-serialised data for which we derive a Schema.JSON
based on Circe codecs and Avro schemas. Those interested in doing the same can leverage the Circe integration by adding the neutron-circe
dependency.
ℹ️ When using schemas, prefer to create the producer(s) before the consumer(s) for fail-fast semantics.
We also need instances for Circe’s Decoder
and Encoder
, and for JsonSchema
, which expects an Avro schema, used by Pulsar. Once you have it, you are an import away from having JSON schema support.
import cr.pulsar.schema.Schema
import cr.pulsar.schema.circe._
import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto._
case class Event(id: Long, name: String)
object Event {
implicit val jsonEncoder: Encoder[Event] = deriveEncoder
implicit val jsonDecoder: Decoder[Event] = deriveDecoder
implicit val jsonSchema: JsonSchema[Event] = JsonSchema.derive
}
val schema = Schema[Event] // summon an instance
The JsonSchema
can be created directly using JsonSchema.derive[A]
, which uses avro4s under the hood. In fact, this is the recommended way but if you want to get something quickly up and running, you could also use auto-derivation.
import cr.pulsar.schema.Schema
import cr.pulsar.schema.circe.auto._
import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto._
case class Foo(tag: String)
object Foo {
implicit val jsonEncoder: Encoder[Foo] = deriveEncoder
implicit val jsonDecoder: Decoder[Foo] = deriveDecoder
}
val schema = Schema[Foo] // summon an instance
Notice that avro4s
is marked as Provided
, meaning you need to explicitly add it to your classpath.
Schema Compatibility Check Strategy
Whenever using schemas, make sure you fully understand the different strategies, which only operate at the namespace level (e.g. see how integration tests are set up in the docker-compose.yml shell script).
For instance, when using the BACKWARD
mode, a producer and consumer will fail to initialize if the schemas are incompatible, even if your custom JSON decoder can deserialize the previous model, the Pulsar broker doesn’t know about it. E.g. say we have this model in our new application.
case class Event(uuid: UUID, value: String)
The generated Avro schema will look as follows.
{
"type" : "record",
"name" : "Event",
"namespace" : "cr.pulsar.domain",
"fields" : [ {
"name" : "uuid",
"type" : {
"type" : "string",
"logicalType" : "uuid"
}
}, {
"name" : "value",
"type" : "string"
} ]
}
And later on, we introduce a breaking change in the model, adding a new mandatory field.
case class Event(uuid: UUID, value: String, code: Int)
This will be rejected at runtime, validated by Pulsar Schemas, when using the BACKWARD mode. The only changes allowed in this mode are:
- Add optional fields
- Delete fields
See the generated Avro schema below.
{
"type" : "record",
"name" : "Event",
"namespace" : "cr.pulsar.domain",
"fields" : [ {
"name" : "uuid",
"type" : {
"type" : "string",
"logicalType" : "uuid"
}
}, {
"name" : "value",
"type" : "string"
}, {
"name" : "code",
"type" : "int"
} ]
}
Instead, we should make the new field optional with a default value for this to work.
case class Event(uuid: UUID, value: String, code: Option[Int] = None)
This is now accepted by Pulsar since any previous Event
still not consumed from a Pulsar topic can still be processed by the new consumers expecting the new schema.
{
"type" : "record",
"name" : "Event",
"namespace" : "cr.pulsar.domain",
"fields" : [ {
"name" : "uuid",
"type" : {
"type" : "string",
"logicalType" : "uuid"
}
}, {
"name" : "value",
"type" : "string"
}, {
"name" : "code",
"type" : [ "null", "int" ],
"default" : null
} ]
}
See the difference with the previous schema? This one has a default: null
in addition to the extra null
type.