Skip to main content

kafka_dead_letter

This is a Teraslice api. It extends the dead letter queue functionality.

Any record that fails using the tryRecord operation api, or any record directly used by the rejectRecord operation api will be collected and sent to a kafka topic at the end of a slice.

More specifically it is sent at the onSliceFinalizing operation lifecycle event.

It is useful to keep a kafka topic of all failed entities to inspect or reprocess them later on or you could have a job run in parallel processing the queue of failed records.

Usage

Use kafka_dead_letter to send failed records to a topic

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"kafka"
],
"apis" : [
{
"_name": "kafka_dead_letter",
"topic": "failed_record_topic",
"size": 1000
}
],
"operations" : [
{
"_op" : "test-reader"
},
{
"_op" : "some_processor",
"_dead_letter_action": "kafka_dead_letter"
}
]
}

Here is a custom processor for the job described above

// located at /some_processor/processor.ts

export default class SomeProcessor extends BatchProcessor {
async onBatch(data) {
const results = [];

for (const record of data) {
if (isNumber(record.field)) {
const field = record.field * 2;
results.push({ field });
} else {
this.rejectRecord(record, `record.field is not a number`)
}
}

return results;
}
}

Here is top level overview on how this would behave based off the processor and job above.

const data = [
{ field: 3 },
{ field: 'Uh oh, i am not a number' },
{ field: 10 },
{ notField: 'Uh oh, i am not a number' },
];

const results = await processor.run(data);

results === [{ field: 6 }, { field: 20 }];

/*
records:
{ field: 'Uh oh, i am not a number' }
{ notField: 'Uh oh, i am not a number' }

are sent to topic "failed_record_topic" at the end of the slice
*/

Parameters

ConfigurationDescriptionTypeNotes
_nameName of api operation, it must reflect the exact name of the fileStringrequired
topicName of the Kafka topic to send recordsStringrequired
sizeHow many messages will be batched and sent to kafka together.Numberoptional, defaults to 10000
compressionType of compression to use on record sent to topic, may be set to none, gzip, snappy, lz4 and inheritStringoptional, defaults to gzip
waitHow long to wait for size messages to become available on the producer, in milliseconds.String/Duration/Numberoptional, defaults to 500
connectionName of the kafka connection to use when sending dataStringoptional, defaults to the 'default' connection in the kafka terafoundation connector config
metadata_refreshHow often the producer will poll the broker for metadata information. Set to -1 to disable polling.String/Duration/Numberoptional, defaults to "5 minutes"
_encodingUsed for specifying the data encoding type when using DataEntity.fromBuffer. May be set to json or rawStringoptional, defaults to json