Teraslice State Storage
State storage operation api for teraslice
Teraslice State Storage provides an LRU caching system, based on mnemonist's LRU map, for teraslice processors. The in memory cache can be backed by a persistant storage system like Elasticsearch if a more robust cache is needed.
The advantage of having the LRU cache backed by a persistant storage system like Elasticsearch is that if the key is not in the cache the processor will search an elasticsearch index for the key and if it is found will add it to the cache. This essentially expands the cache to the size of the underlying elasticsearch index without requiring the same memory resources in Teraslice. The potential drawback is that on data sets with a large key set the processor will be continously seaching elasticsearch for each key which would render the caching mechanism pointless.
Usage
Add the state storoge api must be added to the job file under the 'apis' property to be accessible by a processor.
Job Setup
{
"name": "state-storage-job",
"lifecycle": "persistent",
"workers": 20,
"assets": [
"asset1:0.1.0"
],
"apis": [
{
"_name": "cached_state_storage",
"cache_size": 10000
}
],
"operations": [
{
"_op": "reader"
},
{
"_op": "state_storage_processor",
"state_storage": "cached_state_storage"
},
{
"_op": "sender"
}
]
}
Add the state storage api to the processor with the createAPI function
Processor Example
const { BatchProcessor } = require('@terascope/job-components');
class StateStorageProcessor extends BatchProcessor {
async initialize() {
this.state = await this.createAPI(this.opConfig.state_storage);
}
async onBatch(dataArray) {
for (const doc of dataArray) {
const key = doc.getMetadata('_key');
// use set to save a record to the cache
this.state.set(key, doc);
// use get to retrieve a record
const record = this.state.get(key);
}
}
}
module.exports = StateStorageProcessor;
State Storage Job Settings
cache_size
"cache_size": NUMBER - maximum number of keys held in the cache before evicting unused keys.
Cache Functions:
set
set(KEY, VALUE) - Sets a value for the given key in the cache. If the cache is already full, the least recently used key will be dropped from the cache and the evicted value will be logged by teraslice
this.state.set(1, { name: 'foo' });
this.state.set('abc123', { name: 'bar' });
get
get(KEY) - Retrieves the value associated to the given key in the cache or undefined if the key is not found. If the key is found, the key is moved to the front of the underlying list to be the most recently used item.
this.state.get(1); // { name: 'foo' }
this.state.get('abc123'); // { name: 'bar' }
this.state.get('456def'); // undefined
mset
mset([{ key: KEY1, data: VALUE1}, { key: KEY2, data: VALUE2 }, etc ...]) - Sets multiple key, value pairs. Requires an array of { key: key, data: value } objects
this.state.mset([{ key: 1, data: { name: 'foo' } }, { key: 'abc123', data: { name: 'bar' } }]);
mget
mget([KEY1, KEY2, KEY3, etc...]) - Returns an object of the found keys and values. Required input is an array of keys
this.state.mget([1, 'abc123', '456def']); // { 1: { name: 'foo' }, 'abc123': { name: 'bar' } };
values
values(function) - Processes cache values based on passed function.
const results = [];
function processValues(data) {
results.push(data);
}
this.state.values(processValues); // [ { name: 'foo' }, { name: 'bar' }];
has
has(KEY) - Returns true if key is in the cache otherwise returns false.
this.state.has(1); // true
this.state.has('345def'); // false
clear
clear() - Completely clears the cache.
this.state.clear();
State Storage backed by Elasticsearch
Usage
Add the state storage api to the job with the elasticsearch settings
Job Setup
{
"name": "es-state-storage-job",
"lifecycle": "persistent",
"workers": 20,
"assets": [
"asset1:0.1.0"
],
"apis": [
{
"_name": "elasticsearch_state_storage",
"connection": "ELASTICSEARCH_CLUSTER_URL",
"index": "INDEX_NAME",
"type": "ELASTICSEARCH_TYPE",
"cache_size": 1000000
}
],
"operations": [
{
"_op": "reader"
},
{
"_op": "state_storage_processor",
"state_storage": "elasticsearch_state_storage"
},
{
"_op": "sender"
}
]
}
Add the elasticsearch state storage api to the processor with the createAPI function
Processor Example
const { BatchProcessor } = require('@terascope/job-components');
class StateStorageProcessor extends BatchProcessor {
async initialize() {
this.state = await this.createAPI(this.opConfig.state_storage);
}
async onBatch(dataArray) {
const cachedDocs = this.state.mget(dataArray);
// process docs
}
}
module.exports = StateStorageProcessor;
Elasticsearch State Storage Job Settings
cache_size
"cache_size": NUMBER - Maximum number of keys held in the cache before evicting unused keys, defaults to 2,147,483,647
index
"index": "STRING" - Name of elasticsearch index
type
"type":"STRING" - Elasticsearch type, defaults to _doc
concurrency
"concurrency": NUMBER - Number of concurrent elasticsearch mget requests, defaults to 10
chunk_size
"chunk_size": NUMBER - Number of documents in each elasticsearch mget request, defaults to 2,500
persist
"persist": BOOLEAN - Saves the record to elasticsearch upon caching the document, defaults to false
meta_key_field
"metaKey": "STRING" - Field in the metadata to use as the key for cacheing and searching in elasticsearch
connection
"connection": "STRING" - Terafoundation connection name for elasticsearch cluster
Elasticsearch State Storage API for processing data:
Elasticsearch State Storage operates under the assumption that all records being processed are data entities
const foo = DataEntity.make({ name: 'foo'}, { _key: 1 });
const bar = DataEntity.make({ name: 'bar'}, { _key: 2 });
set
set(DATAENTITY) - Adds the records to the cache. If the cache is already full, the least recently used key will be dropped from the cache and the evicted value will be logged by teraslice
this.state.set(foo);
this.state.set(bar);
get
get(DATAENTITY) - Asynchronous function that returns the cached state of the input. If the record is not cached then it will search the elasticsearch index for the reocrd. If the record is found, the key is moved to the front of the underlying list to be the most recently used item.
this.state.get(foo); // { name: 'foo' }
mset
mset([DATAENTITY1, DATAENTITY2, etc...]) - Asynchronous function that addes records to the cache. If persist is true it will also save the records in the elasticsearch index. Input is a data entity array.
this.state.mset([foo, bar]);
mget
mget([DATAENTITY1, DATAENTITY2, etc...]) - Asynchronous function that returns an object of the cached keys and values. For records not in the cache it will search elasticsearch and add found records to the cache. Input is data entity array
this.state.mget([foo, bar]); // { 1: { name: 'foo' }, 2: { name: 'bar' } };
isCached
isCached(DATA_ENTITY) - Return true if the records key is in the cache otherwise returns false
this.state.isCached(foo); // true
this.state.isCached(other); // false
isKeyCached
isKeyCached(KEY) - Returns true if key is in the cache otherwise returns false.
this.state.isKeyCached(1); // true
this.state.isKeyCached('other'); // false
count
count - Returns the number of records in the cache
this.state.count(); // 2