How to Use Elasticsearch Ingest Pipelines with Kafka Connect Elasticsearch Sink Connector

Kafka Connect Elasticsearch Sink

TL;DR

Specify your pipeline with the index.default_pipeline setting in the index (or index template) settings.

The Problem

We need to index the log data into the Elasticsearch cluster using a Kafka Connect Elasticsearch Sink Connector 1, the data should be split into daily indices, and we need to specify the Elasticsearch ingest pipeline.

The documentation of the connector doesn’t mention anything about ingest pipelines. After a quick consultation with the Internet you discover that there is an open issue that Kafka Connect Elasticsearch Sink Connector doesn’t support specifying an Elasticsearch ingest pipeline. WAT?

The Workaround

Say2, our pipeline3 just renames an attribute, e.g.:

PUT _ingest/pipeline/my_pipeline_id
{
  "description" : "renames the field name",
  "processors" : [
    {
      "rename": {
          "field": "original_field_name",
          "target_field": "target_field_name"
        }
    }
  ]
}

The Elasticsearch ingest pipeline for indexing can be specified in several ways:

  1. for each index request as a URL parameter,
  2. per bulk index request as a URL parameter,
  3. for every bulk index request operation,
  4. index settings (a dynamic attribute),
  5. index template.

First three options are not supported by Kafka Connect. The fourth option is not convenient in our case because the data should be split into time-based (e.g. daily) indices and we don’t want to do repetitive tasks4. The natural option to follow is to define an index template. In the index template we can specify the index.default_pipeline parameter, e.g.

PUT _index_template/template_1
{
  "index_patterns": ["daily_log*"],
  "template": {
    "settings": {
      "index.default_pipeline": "my_pipeline_id"
    }
  }
}

Note, that for indexing not to fail, we should create the Elasticsearch ingest pipeline5 before setting up the index template.

That is it, now when Kafka Connect will create a new daily index the Elasticsearch ingest pipeline is going to be applied to every document without any issues, for free, and in no time.

Bonus

One thing to note is that only one pipeline can be specified for index.default_pipeline which might sound a bit limiting. A clever trick to overcome that limitation is to use a series of pipeline processors that can invoke other pipelines in the specified order, i.e. pipeline of pipelines.

Also, there is an index setting called index.final_pipeline that if specified is going to be executed after all other pipelines.

Testing pipelines can be done using the _simulate API.

Fin

Thanks for reading and leave comments or any other feedback on this blog post in the Github issue. Examples were tested to work with Elasticsearch and Kibana 7.8.1.


  1. or any other technology that doesn’t support, or it is just not possible to specify the Elasticsearch ingest pipeline. ↩︎

  2. yes, I know that the same job can be done with the Kafka Connect Transformations. ↩︎

  3. let’s leave out the Kafka Connector setup. ↩︎

  4. set index.default_pipeline=my_pipeline_id for every new daily index with, say, a cron-job at midnight. ↩︎

  5. technically, before an index is created that matches the template pattern. ↩︎

Related