Kafka Connect: Elasticsearch source connector

In this story, I will present the elasticsearch source plugin.

What are connectors and why I need them?

Kafka connectors are plugin designed to transfer data between a Kafka cluster and external systems (others queue systems, relational databases, Hadoop, etc…). You can find a lot of connectors already developed (a list can be found on the confluent web site), allowing to transfer data in an efficient way without writing any line of code.

I found many connect plugins that allow indexing data into Elasticsearch, but there was not a connect plugin to fetch data from Elastic, so I tried to do it myself: here there is the github project with source code, documentation and jars.

Design choices

The source plugin supports dynamic evolving datatype mapping and arrays and nested objects. The data format is compatible with Avro or Json serialization.

The plugin also supports incremental fetching based on a temporal field or on an increment field (e.g. an incremental id).

At least once semantic is implemented (every document is sent at least one time, a duplicate may be present in case of failures but it is possible to skip them using topic compaction).

Why an elasticsearch source?

Actually, there are many reasons: I think that Elasticsearch can be used as a primary data storage in many contexts, such as a backend of a web app. It may be useful to extract data stored into Elastic and transforming it using KSQL or Kafka Streams.

Transferring data from Elastic to Kafka for example may allow doing batch machine learning jobs in a reliable and re-playable way, enriching the data with information coming from other systems.

Last but not least, importing Elasticsearch indices into Kafka means opening the frontier of stream processing to all the data collected from the Beat ecosystem (networks stats, system health, etc..).

How to use it?

You can download the jar from the github page and put it into the connect classpath or set the plugin.path parameter properly.

Then the configuration is done using the standard rest connect api. Assuming to have a json file containing the configuration :

Using the following commands it is possible to start a job and monitor its status:

Now all the indices metric* are sent to Kafka using the es_ string as a topic prefix. The timestamp field will be used to fetch only data that is not already sent to Kafka.

Feedbacks are welcome!

The project is started a few years ago and it is being adopted by commercial companies.

I started the project alone, but now I’m happy to announce that another developer is helping me in maintaining and developing new features.

Pull requests and feature requests are appreciates ;)