Types of Operations
Operations within Teraslice Job are designed to read, transform, write, or monitor data within a Teraslice job.
Readers
A Reader
is the first operation specificed on the job and has two main components, a Slicer and a Fetcher. The purpose of a Reader to distribute and read partitions of data across one or more Workers.
To develop a reader, see the docs.
Slicers
A Slicer
runs on the Execution Controller and its primary function to create Slices.
Slicer
The simplest varient of a "Slicer" that only handles one running "Slicer".
import { v4 as uuid } from 'uuid';
import { Slicer, SlicerRecoveryData } from '@terascope/job-components';
export default class ExampleSlicer extends Slicer {
// If undefined or null is returned here teraslice will
// consider this "slicer" to be done and the execution
// will finish
async slice() {
return {
id: uuid(),
foo: 'bar',
};
}
}
Check out the API docs for more details.
Parallel Slicer
A varient of a "Slicer" for running a parallel stream of slicers. The number of slicers can be configured via the "slicers" configuration on the Job Configuration.
const { ParallelSlicer, pDelay, times } = require('@terascope/job-components');
export default class ExampleSlicer extends ParallelSlicer {
// set is recoverable to true disable support covering an "Execution"
isRecoverable() {
return true;
}
// you can configure the number of "in-flight" slices by setting the maxQueueLength
// in this case we are going to set it to 2 times the number of workers connnected.
maxQueueLength() {
return this.stats.workers.connected * 2;
}
// The `newSlicer` create a new context for a "slice function" similar to "Slicer->slice()".
// If you return `undefined` it will drop support for that slicer, this usefull to limiting the number of supported slicers.
async newSlicer(id) {
const { countPerSlicer } = this.opConfig;
const records = times(countPerSlicer, i => ({ id: `slicer-${id}-${i}` }));
return async () => {
await pDelay(0);
return records.shift();
};
}
}
Check out the API docs for more details.
Fetchers
A Fetcher
runs on a Worker and its primary process Slices. When processing a Slice
the worker will use Slice Request to read a set of data from its data source. The fetcher will then return the data through the pipeline.
const { Fetcher } = require('@terascope/job-components');
export default class ExampleSlicer extends Fetcher {
// This where you fetch the Data from a particular Data Source
// you don't need to call DataEntity.makeArray because
// the framework will do this for you
async fetch(slicerRequest: object) {
const result = [];
for (let i = 0; i < 10; i++) {
result.push({
id: i,
data: [Math.random(), Math.random(), Math.random()],
}));
}
return result;
}
}
Processors
A Job is required to contain a least one Processor. The duty of a processor is to transform or write data to an external service.
To develop a processor, see the docs.
Batch Processor
A variation of "Processor" that deals with a batch of data at a time.
Example:
import { BatchProcessor } from '@terascope/job-components';
export default class ExampleBatchProcessor extends BatchProcessor {
batchedKeys: (string[])[] = [];
async onBatch(dataEntities: DataEntity[]): Promise<DataEntity[]> {
let keys: string = [];
for (const dataEntity of dataEntities) {
keys.push(dataEntity.getKey());
}
this.batchedKeys.push(keys);
return dataEntities;
}
}
Check out the API docs for more details.
Each Processor
A variation of Processor that can process a single DataEntity at a time. This processor should limit the side-effects on the data. If you are going to mutate the data use the MapProcessor.
Example:
import { EachProcessor } from '@terascope/job-components';
export default class ExampleEachProcessor extends EachProcessor {
count = 0;
// NOTE: this is NOT an async function and should not return anything.
forEach(dataEntity: DataEntity, index: number, array: DataEntity[]): void {
this.count++;
dataEntity.setMetadata('count', this.count);
}
}
Check out the API docs for more details.
Map Processor
A variation of Processor that can process a single DataEntity at a time. This processor should return a modified DataEntity.
Example:
import { MapProcessor } from '@terascope/job-components';
export default class ExampleMapProcessor extends MapProcessor {
// NOTE: this is NOT an async function
map(dataEntity: DataEntity, index: number, array: DataEntity[]): DataEntity {
dataEntity.foo == 'bar';
return dataEntity;
}
}
Check out the API docs for more details.
Filter Processor
A variation of Processor that can process a single DataEntity at a time. This processor is used to removed data from the batch of data.
Example:
import { FilterProcessor } from '@terascope/job-components';
export default class ExampleFilterProcessor extends FilterProcessor {
// NOTE: this is NOT an async function
filter(dataEntity: DataEntity, index: number, array: DataEntity[]): boolean {
const eventTime = dataEntity.getMetadata('_eventTime');
const fiveMinutesAgo = Date.now() - (5 * 60 * 1000);
return eventTime && eventTime > fiveMinutesAgo;
}
}
Check out the API docs for more details.
APIs
A Job can specify an Operation API which can expose an utility API, a Dead Letter Queue or can be used to monitor/track data going through the pipeline. APIs are configured separately and are attached to the Operation Lifecycle on startup.
To develop a processor, see the docs.
Operation API
This type of API that exposes to functionality to other processors within a job. A Dead Letter Queue is a type of Operation API
.
Example:
import { OperationAPI } from '@terascope/job-components';
export default class ExampleOperationAPI extends OperationAPI {
value: string = 'foo';
// this function should resolve an OpAPI
// an OpAPI can be a function, object, or an instance of class
async createAPI() {
return {
get: () => {
return this.value;
},
update: (value: string) => {
this.value = value;
}
};
}
}
Check out the API docs for more details.
Observer
This type of API only monitors/tracks data and processors, checkout the Worker Lifecycle) for all of the events that can be subscribed to.
Example:
import { Observer } from '@terascope/job-components';
export default class ExampleObserver extends Observer {
onOperationStart(sliceId: string, index: number): boolean {
const opName = this.executionConfig.operations[index]._op;
this.logger.trace(`operation ${opName} is starting slice ${slice}`);
}
onOperationEnd(sliceId: string, index: number, processed: number): boolean {
const opName = this.executionConfig.operations[index]._op;
this.logger.trace(`operation ${opName} is processed ${processed} records for slice ${slice}`);
}
}
Check out the API docs for more details.