Skip to main content

kafka_reader_api

This is a teraslice api, which provides an API to read messages from a Kafka topic and can be utilized by any processor, reader or slicer.

The kafka_reader_api will provide an api factory, which is a singleton that can create, cache and manage multiple kafka readers that can be accessed in any operation through the getAPI method on the operation.

This is a high throughput operation. This reader handles all the complexity of balancing writes across partitions and managing ever-changing brokers.

This uses node-rdkafka underneath the hood.

For this reader to function properly, you will need a running kafka cluster and configure this job with the correct group, topic and partition management options.

This is a recoverable reader, meaning that this job can be stopped, and then pick back up where it left off.

Fetched records will already have metadata associated with it. Please reference the metadata section for more information.

Usage

Example Processor using a kafka reader API

This is an example of a custom fetcher using the kafka_reader_api to make its own consumers to kafka.

Example job

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"kafka"
],
"apis" : [
{
"_name": "kafka_reader_api",
"topic": "kafka-test-fetcher",
"group": "58b1bc77-d950-4e89-a3e1-4e93ad3e6cec",
"size": 10000,
"wait": 8000,
"rollback_on_failure": true,
"_dead_letter_action": "log"
}
],
"operations" : [
{
"_op" : "some_reader",
"api_name" : "kafka_reader_api"
},
{
"_op": "noop"
}
]
}

Here is a custom fetcher for the job described above

// found at  /some_reader/fetcher.ts
export default class SomeReader extends Fetcher {

async initialize() {
await super.initialize();
const apiName = this.opConfig.api_name;
const apiManager = this.getAPI(apiName);
this.api = await apiManager.create(apiName, {});
}

async fetch(slice) {
return this.api.consume({ size: slice.size, wait: this.opConfig.wait });
}
}

Kafka Reader Factory API Methods

size

this will return how many separate reader apis are in the cache

get

parameters:

  • name: String

this will fetch any reader api that is associated with the name provided

getConfig

parameters:

  • name: String

this will fetch any reader api config that is associated with the name provided

create (async)

parameters:

  • name: String
  • configOverrides: Check options below, optional

This will create an instance of a reader api and cache it with the name given. Any config provided in the second argument will override what is specified in the apiConfig and cache it with the name provided. It will throw an error if you try creating another api with the same name parameter

remove (async)

parameters:

  • name: String

This will remove an instance of a reader api from the cache and will follow any cleanup specified in the api code.

entries

This will allow you to iterate over the cache name and client of the cache

keys

This will allow you to iterate over the cache name of the cache

values

This will allow you to iterate over the values of the cache

Example of using the factory methods in a processor

// example of api configuration
const apiConfig = {
_name: 'kafka_reader_api',
topic: 'kafka-test-fetcher',
group: '58b1bc77-d950-4e89-a3e1-4e93ad3e6cec',
size: 10000,
wait: 8000,
rollback_on_failure: true,
_dead_letter_action: 'log'
};


const apiManager = this.getAPI<ElasticReaderFactoryAPI>(apiName);

apiManager.size() === 0

// this will return an api cached at "normalClient" and it will use the default api config
const normalClient = await apiManager.create('normalClient', {})

apiManager.size() === 1

apiManager.get('normalClient') === normalClient

// this will return an api cached at "overrideClient" and it will use the api config but override the index to "other_index" in the new instance.
const overrideClient = await apiManager.create('overrideClient', { topic: 'other_topic', connection: "other" })

apiManager.size() === 2

// this will return the full configuration for this client
apiManger.getConfig('overrideClient') === {
_name: "kafka_reader_api",
topic: 'other_topic',
group: '58b1bc77-d950-4e89-a3e1-4e93ad3e6cec',
size: 10000,
wait: 8000,
rollback_on_failure: true,
_dead_letter_action: 'log'
connection: "other"
}


await apiManger.remove('normalClient');

apiManager.size() === 1

apiManager.get('normalClient') === undefined

Kafka Reader Instance

This is the reader class that is returned from the create method of the APIFactory. This returns an kafka consumer.

consume

(query: { size: number; wait: number }) => Promise<DataEntities[]> parameters:

  • query: an object with size (number of records to fetch) and wait (time in milliseconds to finish slice)
const query: {
size: 10000,
wait: 8000
};

const results = await api.consume(query)

Parameters

ConfigurationDescriptionTypeNotes
_opName of operation, it must reflect the exact name of the fileStringrequired
topicName of the Kafka topic to processStringrequired
groupName of the Kafka consumer groupStringrequired
sizeHow many records to read before a slice is considered complete.Numberoptional, defaults to 10000
connectionName of the kafka connection to use when sending dataStringoptional, defaults to the 'default' connection in the kafka terafoundation connector config
max_poll_intervalThe maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another memberString/Durationoptional, defaults to "5 minutes"
offset_resetHow offset resets should be handled when there are no valid offsets for the consumer group. May be set to smallest, earliest, beginning, largest, latest or errorStringoptional, defaults to smallest
partition_assignment_strategyName of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to range, roundrobin, cooperative-sticky or ""Stringoptional, defaults to ""
rollback_on_failureControls whether the consumer state is rolled back on failure. This will protect against data loss, however this can have an unintended side effect of blocking the job from moving if failures are minor and persistent. NOTE: This currently defaults to false due to the side effects of the behavior, at some point in the future it is expected this will default to true.Booleanoptional, defaults to false
use_commit_syncUse commit sync instead of async (usually not recommended)Booleanoptional, defaults to false
waitHow long to wait for a full chunk of data to be available. Specified in milliseconds if you use a number.String/Duration/Numberoptional, defaults to 30 seconds
_encodingUsed for specifying the data encoding type when using DataEntity.fromBuffer. May be set to json or rawStringoptional, defaults to json

Metadata

When the records are fetched from kafka, metadata will be attached to each record

  • _key is set to the kafka message _key
  • _processTime is set to a a number representing the milliseconds elapsed since the UNIX epoch of when it was first fetched
  • _ingestTime is set to a a number representing the milliseconds elapsed since the UNIX epoch of the timestamp field of the kafka record or when it was first fetched
  • _eventTime is set to a a number representing the milliseconds elapsed since the UNIX epoch of when it was first fetched
  • topic is set from the topic it was from
  • partition is set from the partition it was from
  • offset is set to the records offset
  • size the message size in bytes

Example of metadata from a fetched record

// example record in kafka
{
"ip" : "120.67.248.156",
"url" : "http://lucious.biz",
"uuid" : "a23a8550-0081-453f-9e80-93a90782a5bd",
"created" : "2019-04-26T08:00:23.225-07:00",
"ipv6" : "9e79:7798:585a:b847:f1c4:81eb:0c3d:7eb8",
"location" : "50.15003, -94.89355",
"bytes" : 124
}

const expectedResults = {
"ip" : "120.67.248.156",
"url" : "http://lucious.biz",
"uuid" : "a23a8550-0081-453f-9e80-93a90782a5bd",
"created" : "2019-04-26T08:00:23.225-07:00",
"ipv6" : "9e79:7798:585a:b847:f1c4:81eb:0c3d:7eb8",
"location" : "50.15003, -94.89355",
"bytes" : 124
};

DataEntity.isDataEntity(expectedResults) === true;

expectedResults.getMetadata() === {
_key: "ltyRQW4B8WLke7PkER8L", // the kafka message key
topic: "kafka-test-fetcher",
partition: 0,
offset: 185,
size: 193,
_processTime: 1596663162372,
_ingestTime: 1596663162372,
_eventTime: 1596663162372,
}