RxKafka Release 0.1.0
I've just released a Scala library for Kafka called RxKafka which turns Kafka consumers and producers into RxScala's observables and observers.
Here's how it looks like:
Topics
The topics in RxKafka
are typed, the following is the Topic
trait:
trait Topic[T] {
val name: String
type Message = T
val serializer: Serializer[T]
val deserializer: Deserializer[T]
}
Topics can be defined by extending the trait, each topic has its own name (used by Kafka), and the type of the message it contains (often needed by the serialization process).
The core
module supports basic Java serialization, and exposes an abstract SerializableTopic
class that can be easily extended to define new topics that will be serialized using Java serialization, as follows:
case object TestTopic extends SerializableTopic[Test]("test")
case object AnotherTestTopic extends SerializableTopic[AnotherTest]("another-test")
Test
and AnotherTest
are simple case classes representing the message.
The json4s
module provides a similar class, called JsonTopic
, that can be used as follows:
case object JsonTestTopic extends JsonTopic[Test]("test")
case object AnotherJsonTestTopic extends JsonTopic[AnotherTest]("another-test")
Consumers
Two types of consumers can be created, consumers consuming a single topic, or multiple topics.
Single-Topic Consumer
To create and use an RxKafka consumer tied to a single topic:
val consumer = KafkaConsumer(TestTopic)
// an iterator can be accessed on the consumer
val iterator = consumer.iterator
// or the consumer can be turned into an Observable[T] where T is the type of the message (in this case it will be Test)
val observable = consumer.toObservable
// you can also call subscribe directly on the Consumer (it's just a proxy to the Observable's subscribe method)
consumer.subscribe { message =>
// ...
}
// to close the Kafka connector
consumer.close
Multiple-Topics Consumer
To create and use an RxKafka consumer tied to multiple topics:
val consumer = KafkaConsumer(List(TestTopic, AnotherTestTopic))
// a map of iterators can be accessed on the consumer as a Map[String, Iterator[Any]]
val iterators = consumer.iterators
// or the consumer can be turned into an Observable[Any] which will merge all topics together:
val observable = consumer.toObservable
// you can also call subscribe directly on the `Consumer`
consumer.subscribe {
case test: Test => // ...
case anotherTest: AnotherTest => // ...
}
// to close the Kafka connector
consumer.close
Producers
Two types of producers can be created, producers producing a single topic, or multiple topics.
Single-Topic Producer
To create and use an RxKafka producer for a single topic:
val producer = KafkaProducer(TestTopic)
// plublish a message
producer.publish(Test("test"))
// or the producer can be turned into an Observer[T] where T is the type of the message (in this case it will be Test)
val observer = producer.toObserver
// then you can use the Observer API
observer.onNext(Test("test"))
observer.onComplete()
// stop the Kafka producer
producer.close
Multiple-Topics Producer
val producer = KafkaProducer()
// plublish a message to a particular topic
producer.publish(TestTopic, Test("test"))
producer.publish(AnotherTestTopic, AnotherTest("test"))
// or the producer can be turned into an Observer[(Topic[T], T)]
val observer = producer.toObserver
// then you can use the Observer API
observer.onNext(TestTopic, Test("test"))
observer.onNext(AnotherTestTopic, AnotherTest("test"))
observer.onComplete()
// stop the Kafka producer
producer.close
For more details and up-to-date information, view the project on GitHub.