Neutron

Neutron is a purely functional Apache Pulsar client for Scala, build on top of fs2 and the Java Pulsar client.

It is published for Scala 2.13. You can include it in your project by adding the following dependencies.

sbt
libraryDependencies ++= Seq(
  "com.chatroulette" %% "neutron-core" % "0.0.10",
  "com.chatroulette" %% "neutron-circe" % "0.0.10",
  "com.chatroulette" %% "neutron-function" % "0.0.10"
)
Maven
<dependencies>
  <dependency>
    <groupId>com.chatroulette</groupId>
    <artifactId>neutron-core_2.13</artifactId>
    <version>0.0.10</version>
  </dependency>
  <dependency>
    <groupId>com.chatroulette</groupId>
    <artifactId>neutron-circe_2.13</artifactId>
    <version>0.0.10</version>
  </dependency>
  <dependency>
    <groupId>com.chatroulette</groupId>
    <artifactId>neutron-function_2.13</artifactId>
    <version>0.0.10</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.chatroulette:neutron-core_2.13:0.0.10"
  implementation "com.chatroulette:neutron-circe_2.13:0.0.10"
  implementation "com.chatroulette:neutron-function_2.13:0.0.10"
}

Quick start

Here’s a quick consumer / producer example using Neutron. Note: both are fully asynchronous.

import scala.concurrent.duration._
import cats.effect._
import cr.pulsar.Pulsar.PulsarURL
import fs2.Stream
import cr.pulsar._
import cr.pulsar.schema.utf8._

object Demo extends IOApp.Simple {
  val url = PulsarURL("pulsar://localhost:6650")

  val topic =
    Topic.simple("my-topic", Topic.Type.NonPersistent)

  val subs =
    Subscription.Builder
      .withName("my-sub")
      .withType(Subscription.Type.Shared)
      .build

  val resources: Resource[IO, (Consumer[IO, String], Producer[IO, String])] =
    for {
      pulsar <- Pulsar.make[IO](url)
      consumer <- Consumer.make[IO, String](pulsar, topic, subs)
      producer <- Producer.make[IO, String](pulsar, topic)
    } yield consumer -> producer

  val run: IO[Unit] =
    Stream
      .resource(resources)
      .flatMap {
        case (consumer, producer) =>
          val consume =
            consumer
              .autoSubscribe
              .evalMap(IO.println)

          val produce =
            Stream
              .emit("test data")
              .covary[IO]
              .metered(3.seconds)
              .evalMap(producer.send_)

          consume.concurrently(produce)
      }
      .compile
      .drain

}

Schema

As of version 0.0.6, Neutron ships with support for Pulsar Schema. The simplest way to get started is to use the given UTF-8 encoding, which makes use of the native Schema.BYTES.

import cr.pulsar.schema.Schema
import cr.pulsar.schema.utf8._

val schema = Schema[String] // summon instance

This brings into scope an Schema[String] instance, required to initialize consumers and producers. There’s also a default instance Schema[A], for any cats.Inject[A, Array[Byte]] instance (based on Schema.BYTES as well).

At Chatroulette, we use JSON-serialised data for which we derive a Schema.JSON based on Circe codecs and Avro schemas. Those interested in doing the same can leverage the Circe integration by adding the neutron-circe dependency.

ℹ️ When using schemas, prefer to create the producer(s) before the consumer(s) for fail-fast semantics.

We also need instances for Circe’s Decoder and Encoder, and for JsonSchema, which expects an Avro schema, used by Pulsar. Once you have it, you are an import away from having JSON schema support.

import cr.pulsar.schema.Schema
import cr.pulsar.schema.circe._

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto._

case class Event(id: Long, name: String)
object Event {
  implicit val jsonEncoder: Encoder[Event] = deriveEncoder
  implicit val jsonDecoder: Decoder[Event] = deriveDecoder

  implicit val jsonSchema: JsonSchema[Event] = JsonSchema.derive
}

val schema = Schema[Event] // summon an instance

The JsonSchema can be created directly using JsonSchema.derive[A], which uses avro4s under the hood. In fact, this is the recommended way but if you want to get something quickly up and running, you could also use auto-derivation.

import cr.pulsar.schema.Schema
import cr.pulsar.schema.circe.auto._

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto._

case class Foo(tag: String)
object Foo {
  implicit val jsonEncoder: Encoder[Foo] = deriveEncoder
  implicit val jsonDecoder: Decoder[Foo] = deriveDecoder
}

val schema = Schema[Foo] // summon an instance

Notice that avro4s is marked as Provided, meaning you need to explicitly add it to your classpath.

Schema Compatibility Check Strategy

Whenever using schemas, make sure you fully understand the different strategies, which only operate at the namespace level (e.g. see how integration tests are set up in the docker-compose.yml shell script).

For instance, when using the BACKWARD mode, a producer and consumer will fail to initialize if the schemas are incompatible, even if your custom JSON decoder can deserialize the previous model, the Pulsar broker doesn’t know about it. E.g. say we have this model in our new application.

case class Event(uuid: UUID, value: String)

The generated Avro schema will look as follows.

{
  "type" : "record",
  "name" : "Event",
  "namespace" : "cr.pulsar.domain",
  "fields" : [ {
    "name" : "uuid",
    "type" : {
      "type" : "string",
      "logicalType" : "uuid"
    }
  }, {
    "name" : "value",
    "type" : "string"
  } ]
}

And later on, we introduce a breaking change in the model, adding a new mandatory field.

case class Event(uuid: UUID, value: String, code: Int)

This will be rejected at runtime, validated by Pulsar Schemas, when using the BACKWARD mode. The only changes allowed in this mode are:

  • Add optional fields
  • Delete fields

See the generated Avro schema below.

{
  "type" : "record",
  "name" : "Event",
  "namespace" : "cr.pulsar.domain",
  "fields" : [ {
    "name" : "uuid",
    "type" : {
      "type" : "string",
      "logicalType" : "uuid"
    }
  }, {
    "name" : "value",
    "type" : "string"
  }, {
    "name" : "code",
    "type" : "int"
  } ]
}

Instead, we should make the new field optional with a default value for this to work.

case class Event(uuid: UUID, value: String, code: Option[Int] = None)

This is now accepted by Pulsar since any previous Event still not consumed from a Pulsar topic can still be processed by the new consumers expecting the new schema.

{
  "type" : "record",
  "name" : "Event",
  "namespace" : "cr.pulsar.domain",
  "fields" : [ {
    "name" : "uuid",
    "type" : {
      "type" : "string",
      "logicalType" : "uuid"
    }
  }, {
    "name" : "value",
    "type" : "string"
  }, {
    "name" : "code",
    "type" : [ "null", "int" ],
    "default" : null
  } ]
}

See the difference with the previous schema? This one has a default: null in addition to the extra null type.