Skip to main content

window

This processor is used to gather data within a certain time frame and return a DataWindow representing that time window. This is similar to accumulate except it doesn't wait for empty slices to release the data and instead accumulates and releases data based on a time window.

The processor can be configured to gather data by either tumbling windows or sliding windows. The window duration can either be set to track the window duration by server time or base the window off of a date field value in the data.

Usage

Tumbling window

Example of a job using the window processor and using a tumbling window with a duration of 7 seconds. This has an event_window_expiration setting of 10 seconds. Once 10 seconds passes with no new incoming data, it will return a window based on whatever documents have been accumulated.

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"standard"
],
"operations" : [
{
"_op": "test-reader"
},
{
"_op": "window",
"window_length": 7000,
"window_type": "tumbling",
"time_field": "time",
"window_time_setting": "event",
"event_window_expiration": 10000
}
]
}

Output of the example job

// each record is one second apart
const data = [
{ time: '2020-08-18T21:04:00.000Z' },
{ time: '2020-08-18T21:04:01.000Z' },
{ time: '2020-08-18T21:04:02.000Z' },
{ time: '2020-08-18T21:04:03.000Z' },
{ time: '2020-08-18T21:04:04.000Z' },
{ time: '2020-08-18T21:04:05.000Z' },
{ time: '2020-08-18T21:04:06.000Z' },
{ time: '2020-08-18T21:04:07.000Z' },
{ time: '2020-08-18T21:04:08.000Z' }
];

// first three records are only 3 seconds apart
results = await process.run(data.slice(0, 3))

// no return data expected
results === [];

// the next 3 records are only six seconds apart
results = await process.run(data.slice(3, 6))

// no results expected
results === [];

// the next records span 9 second from the first one
results = await process.run(data.slice(6, 9))

// 7 seconds worth of records are returned, the rest are kept for the next window
results === {
dataArray: [
{ time: '2020-08-18T21:04:00.000Z' },
{ time: '2020-08-18T21:04:01.000Z' },
{ time: '2020-08-18T21:04:02.000Z' },
{ time: '2020-08-18T21:04:03.000Z' },
{ time: '2020-08-18T21:04:04.000Z' },
{ time: '2020-08-18T21:04:05.000Z' },
{ time: '2020-08-18T21:04:06.000Z' },
{ time: '2020-08-18T21:04:07.000Z' },
]
};

// no new incoming data
results = await process.run([]);

results === [];


// we wait 10 seconds for the event_window_expiration timer to expire
await pDelay(10000);

results = await process.run([])

// get the rest of the data
results === {
dataArray: [
{ time: '2020-08-18T21:04:08.000Z' }
]
};

Sliding window

Example of a job with a sliding window. The sliding_window_interval is 2 seconds. This has an event_window_expiration setting of 10 seconds. Once 10 seconds passes with no new incoming data, it will return a window based on whatever documents have been accumulated.

{
"name" : "testing",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"standard"
],
"operations" : [
{
"_op": "test-reader"
},
{
"_op": "window",
"window_length": 3000,
"window_type": "sliding",
"sliding_window_interval": 2000,
"time_field": "time",
"window_time_setting": "event",
"event_window_expiration": 10000
}
]
}

Output of the example job

// each record is one second apart
const data = [
{ time: '2020-08-18T21:04:00.000Z' },
{ time: '2020-08-18T21:04:01.000Z' },
{ time: '2020-08-18T21:04:02.000Z' },
{ time: '2020-08-18T21:04:03.000Z' },
{ time: '2020-08-18T21:04:04.000Z' },
{ time: '2020-08-18T21:04:05.000Z' },
{ time: '2020-08-18T21:04:06.000Z' },
{ time: '2020-08-18T21:04:07.000Z' },
{ time: '2020-08-18T21:04:08.000Z' },
{ time: '2020-08-18T21:04:09.000Z' }
];

results = await process.run(data)

// each DataWindow encompasses 3 seconds
// there is a 2 second difference for the start of each DataWindow
results === [
{
dataArray: [
{ time: '2020-08-18T21:04:00.000Z' },
{ time: '2020-08-18T21:04:01.000Z' },
{ time: '2020-08-18T21:04:02.000Z' },
{ time: '2020-08-18T21:04:03.000Z' }
]
},
{
dataArray: [
{ time: '2020-08-18T21:04:02.000Z' },
{ time: '2020-08-18T21:04:03.000Z' },
{ time: '2020-08-18T21:04:04.000Z' },
{ time: '2020-08-18T21:04:05.000Z' },
]
},
{
dataArray: [
{ time: '2020-08-18T21:04:04.000Z' },
{ time: '2020-08-18T21:04:05.000Z' },
{ time: '2020-08-18T21:04:06.000Z' },
{ time: '2020-08-18T21:04:07.000Z' },
]
}
];

// no new data
results = await process.run([]

results === [];

// we wait for the event_window_expiration timer
await pDelay(10000);

results = await process.run([])
// once the event_window_expiration passes it pushes out whatever the current window has collected
results === {
dataArray: [
{ time: '2020-08-18T21:04:06.000Z' },
{ time: '2020-08-18T21:04:07.000Z' },
{ time: '2020-08-18T21:04:08.000Z' },
{ time: '2020-08-18T21:04:09.000Z' }
]
};

Parameters

ConfigurationDescriptionTypeNotes
_opName of operation, it must reflect the exact name of the fileStringrequired
time_fieldfield name that holds the time valueStringoptional, defaults to @timestamp
window_time_settingMay be set to clock or event timeStringoptional, defaults to event
window_lengthLength of time for each window in millisecondsNumberoptional, defaults to 30000
window_typeType of window, tumbling or slidingStringoptional, defaults to tumbling
sliding_window_intervalDetermines when to start a new sliding window, in millisecondsNumberoptional, defaults to 0, which means it start a new window on every slice
event_window_expirationDetermines how long to hold event based windows in milliseconds, 0 means no expirationNumberoptional, defaults to 0