Skip to content

ElasticSearchSink

Jae Hyeon Bae edited this page Jan 23, 2015 · 6 revisions

Behavior

ElasticSearchSink routes messages to ElasticSearch using Netflix Ribbon based REST API.

Routing key is index name and we can implement daily index rolling with IndexInfoBuilder configuration. Like KafkaSink, ElasticSearchSink has an assumption that data is json formatted. If you're using different serialization/deserialization except json, you should manually implement IndexInfoBuilder class.

IndexInfoBuilder

ElasticSearch requires many information including the followings:

  • index name
  • type name
  • the document to index in bytes form, which should be json formatted
  • document id

IndexInfoBuilder interface can be implemented to generate the above information in a configurable way. There is DefaultIndexInfoBuilder which has the following properties.

indexTypeMap

This is the map with routing key as the key. Its value is the pair of index name and type name separated by a colon(:).

idFields

This is the map with routing key as the key. Its value is the list of fields whose values will be concatenated to make the document id in elasticsearch.

timestamp

This describes which field should be picked up as the timestamp field and how it should be parsed. It has two properties, the first one is field denoting the field name and format is Joda pattern string, by default 'dateOptionalTime'

indexSuffixFormatter

If we want to put a certain suffix to the end of index name such as rolling index, we can use this. It has two properties, the first one is type describing the behavior of formatter and properties has additional information. By default, it does not attach anything. date type with dateFormat properties can put time based information to the end of index name.

DataConverter

DataConverter is not configurable by json setting but it can be Jackson injected. For example, if we implement DataConverterProvider as the following:

public class DataConverterProvider implements Provider<DataConverter> {
    @Override
    public DataConverter get() {
        return new DataConverter() {
            @Override
            public Map<String, Object> convert(Map<String, Object> map) {
                // manipulate map
                return map;
            }
        };
    }
}

and bind the above provider with

bind(DataConverter.class).toProvider(DataConverterProvider.class).asEagerSingleton();

DataConverter will be called before building index info for the record.

ObjectMapper

This will be automatically injected from Jackson.

Properties

ElasticSearchSink is ThreadPoolQueuedSink and has the following common properties:

  • queue4Sink
  • batchSize
  • batchTimeout
  • jobQueueSize
  • corePoolSize
  • maxPoolSize
  • jobTimeout

addressList

This is the list of elasticsearch servers to connect initially. Since ElasticSearchSink is communicating with ElasticSearch through REST API, the port number should be HTTP port opened in ElasticSearch. Hostname and port should be separated by a colon(:) such as hostname:9200. If you integrate with Eureka, vip address can be used such as vipAddress:9200.

Clone this wiki locally