Manage Flows
Each flow
is a continuous aggregation query in GreptimeDB.
It continuously updates the aggregated data based on the incoming data.
This document describes how to create, update, and delete a flow.
Create or update a flow
The grammar to create a flow is:
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS
<SQL>;
flow-name
is an unique identifier in the catalog level.sink-table-name
is the table name where the materialized aggregated data is stored. It can be an existing table or a new one.flow
will create the sink table if it doesn't exist.EXPIRE AFTER
is an optional interval to expire the data from the Flow engine. For more details, please refer to theEXPIRE AFTER
part.COMMENT
is the description of the flow.SQL
part defines the continuous aggregation query. It defines the source tables provide data for the flow. Each flow can have multiple source tables. Please Refer to Write a Query for the details.
A simple example to create a flow:
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, INTERVAL '5 minutes', '2024-05-20 00:00:00');
The created flow will compute count(item)
for every 5 minutes and store the result in my_sink_table
. All data comes within 1 hour will be used in the flow. For the tumble()
function, refer to define time window part.
EXPIRE AFTER
clause
The Flow engine operates with two concepts of time: data timestamp and processing time.
The data timestamp is the time stored in the time index column of the source table, while the processing time refers to the moment when the Flow engine executes the aggregation operation.
The EXPIRE AFTER
clause specifies the interval after which the data will expire.
Any data with a timestamp older than the processing time minus the interval time will be expired.
For example, if the Flow engine processes the aggregation operation at 10:00:00 and the INTERVAL '1 hour'
is set,
any data older than 1 hour from the processing time (data before 09:00:00) will be expired.
Only data timestamped from 09:00:00 onwards will be used in the aggregation.
The EXPIRE
operation only expire data from the Flow engine, it does not affect the data in the source table.
Delete a flow
To delete a flow, use the following DROP FLOW
clause:
DROP FLOW [IF EXISTS] <name>
For example:
DROP FLOW IF EXISTS my_flow;