Skip to main content

elasticsearch_bulk

The elasticsearch_bulk operator is a high throughput bulk sender to an elasticsearch index.

There are four types of bulk requests: index, create, update and delete.

This operation requires that the incoming data-entities to this processors have a _key metadata field set to the id of the record for update, create and delete requests.

Although not needed for index bulk requests, setting the _key on the record will create the new record with that id as opposed to one that is automatically generated for you.

When using the elasticsearch_reader or the elasticsearch_reader_api to fetch records, the _key will automatically be set to the elasticsearch records _id. You can use other processors to remove or alter that if it is not wanted.

Usage

Send index batch request, setting the _id of the records

By default we make an index bulk request, since the records have a _key, that is used as the new elasticsearch _id of the record

Example Job

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"elasticsearch"
],
"operations" : [
{
"_op": "elasticsearch_reader",
"connection": "es-1",
"index": "test_index",
"date_field_name": "created"
},
{
"_op": "elasticsearch_bulk",
"index": "new_index",
"type": "events"
}
]
}

Below is a representation of the incoming data and the resulting bulk request being made to elasticsearch


const records = [
new DataEntity({ some: 'data' }, { _key: '1234' }),
new DataEntity({ other: 'stuff' }, { _key: '5678' })
]

// will be converted to this bulk request

[
{ index: { _index: 'new_index', _type: 'events', _id: '1234' }},
{ some: 'data' },
{ index: { _index: 'new_index', _type: 'events', _id: '5678' }},
{ other: 'stuff' }
]

Send an update batch request, only updating selected fields

We can make an update batch request and limit what fields are being updated, in this job, only the name and job fields will be updated

Example Job

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"elasticsearch"
],
"operations" : [
{
"_op": "elasticsearch_reader",
"index": "test_index",
"date_field_name": "created",
"connection": "es-1"
},
{
"_op": "elasticsearch_bulk",
"index": "new_index",
"type": "events",
"update": true,
"update_fields": ["name", "job"]
}
]
}

Below is a representation of the incoming data and the resulting bulk request being made to elasticsearch


const records = [
new DataEntity({ some: 'data', name: 'someName', job: 'to be awesome!' }, { _key: '1234' }),
]

// will be converted to this bulk request

[
{ update: { _index: 'new_index', _type: 'events', _id: '1234' }},
{ doc: { name: 'someName', job: 'to be awesome!' } },
]

Send upsert batch request and use scripts to make additional changes

By default we make an index bulk request, since the records have a _key, that is used as the new elasticsearch _id of the record

Example Job

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"elasticsearch"
],
"operations" : [
{
"_op": "elasticsearch_reader",
"index": "test_index",
"date_field_name": "created",
"connection": "es-1"
},
{
"_op": "elasticsearch_bulk",
"index": "new_index",
"type": "events",
"upsert": true,
"script": "ctx._source.count += add",
"script_params": { "add": "add" }
}
]
}

Below is a representation of the incoming data and the resulting bulk request being made to elasticsearch


const records = [
new DataEntity({ count: 1, add: 2 }, { _key: '1234' }),
]

// will be converted to this bulk request

[
{ update: { _index: 'new_index', _type: 'events', _id: '1234' }},
{
upsert: { count: 1, add: 2 },
script: {
source: 'ctx._source.count += add',
params: {
add: 2
}
}
}
]

Parameters

ConfigurationDescriptionTypeNotes
_opName of operation, it must reflect the exact name of the fileStringrequired
sizethe maximum number of docs it will send in a given request, anything past it will be split up and sentNumberrequired, typically the index selector returns up to double the length of the original documents due to the metadata involved with bulk requests. This number is essentially doubled to to maintain the notion that we split by actual documents and not the metadata
connectionName of the elasticsearch connection to use when sending dataStringoptional, defaults to the 'default' connection created for elasticsearch
indexIndex to where the data will be sent to, it must be lowercaseStringrequired
typeSet the type of the data for elasticsearchStringoptional defaults to '_doc', is required for elasticsearch v5
deleteUse the id_field from the incoming records to bulk delete documentsBooleanoptional, defaults to false
upsertSpecify if the incoming records should be used to perform an upsert. If update_fields is also specified then existing records will be updated with those fields otherwise the full incoming record will be insertedBooleanoptional, defaults to false
createSpecify if the incoming records should be used to perform an create event ("put-if-absent" behavior)Booleanoptional, defaults to false
updateSpecify if the data should update existing records, if false it will index themBooleanoptional, defaults to false
update_fieldsif you are updating the documents, you can specify fields to update here (it should be an array containing all the field names you want), it defaults to sending the entire documentArrayoptional, defaults to []
script_fileName of the script file to run as part of an update requestStringoptional
scriptInline script to include in each indexing request. Only very simple painless scripts are currently supportedStringoptional
script_paramskey -> value parameter mappings. The value will be extracted from the incoming data and passed to the script as param based on the keyObjectoptional
update_retry_on_conflictIf there is a version conflict from an update how often should it be retriedNumberoptional, defaults to 0
api_namename of api to be used by elasticsearch bulk senderStringoptional, defaults to 'elasticsearch_sender_api'

API usage in a job

In elasticsearch_assets v3, many core components were made into teraslice apis. When you use an elasticsearch processor it will automatically setup the api for you, but if you manually specify the api, then there are restrictions on what configurations you can put on the operation so that clashing of configurations are minimized. The api configs take precedence.

If submitting the job in long form, here is a list of parameters that will throw an error if also specified on the opConfig, since these values should be placed on the api:

  • index

SHORT FORM (no api specified)

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"elasticsearch"
],
"operations" : [
{
"_op": "elasticsearch_reader",
"index": "test_index",
"field": "uuid",
"size": 1000,
"key_type": "base64url"
},
{
"_op": "elasticsearch_bulk",
"index": "other_index",
"size": 1000,
"type": "events"
}
]
}

this configuration will be expanded out to the long form underneath the hood LONG FORM (api is specified)

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"elasticsearch"
],
"apis" : [
{
"_name": "elasticsearch_reader_api",
"index": "test_index",
"field": "uuid",
"size": 1000,
"key_type": "base64url",
"connection": "default"
},
{
"_name": "elasticsearch_sender_api",
"index": "other_index",
"size": 1000,
"type": "events",
"connection": "default"
}
],
"operations" : [
{
"_op" : "id_reader",
"api_name" : "elasticsearch_reader_api"
},
{
"_op": "elasticsearch_bulk",
"api_name" : "elasticsearch_sender_api"
}
]
}

Dead Letter Queue Support

The elasticsearch_bulk processor supports the dead letter queue api as of version 3.5.0. When the dead_letter_queue functionality is active records that are rejected by elasticsearch with a _bulk_sender_rejection error are forwarded to the kafka topic specified in the dead letter queue api configs. Records that do not have the error are still written to the designated cluster as usual.

To trigger this behavior add the property and value _dead_letter_action: kafka_dead_letter to the elasticsearch_bulk _op configs.

Example Job:

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"elasticsearch"
],
"apis": [
{
"_name": "kafka_dead_letter",
"connection": "KAFKA_CONNECTION",
"topic": "KAFKA_TOPIC",
"size": 10000
}
]
"operations" : [
{
"_op": "elasticsearch_reader",
"index": "INDEX_NAME",
"date_field_name": "created",
"connection": "ES_CLUSTER_CONNECTION"
},
{
"_op": "elasticsearch_bulk",
"connection": "ES_CLUSTER_CONNECTION",
"index": "INDEX_NAME",
"type": "events",
"_dead_letter_action": "kafka_dead_letter"
}
]
}