Apache Flink’s window triggers determine when a window’s computation is emitted (and optionally cleared) during stream processing.
Every window in Flink has an associated trigger that evaluates each incoming element and any relevant timer events to decide whether to FIRE
(emit results), PURGE
(drop window contents), or both.
In other words, the trigger monitors window conditions and tells Flink when to produce the aggregated output of that window. By default, Flink provides built-in triggers for common scenarios, but they have limitations and this is where custom triggers become useful.
Build-in Triggers
Flink comes with a few built-in triggers for standard behaviors:
EventTimeTrigger: fires when the event-time watermark passes the end of the window (i.e. window closes in event time). This is the default for event-time windows and ensures results are emitted once the window’s end timestamp is reached according to the stream’s watermarks.
ProcessingTimeTrigger: fires when the processing-time clock reaches the end of the window. This is the default for processing-time windows.
CountTrigger: fires once the number of elements in the window reaches a specified count threshold
PurgingTrigger: a wrapper that turns any other trigger into a purging trigger, meaning it will clear the window’s content whenever it fires.
These built-in triggers cover basic cases -- time-driven
or count-driven
firing, but they operate independently. If you override a window’s trigger with one of these, you replace the default trigger, not supplement it. For example, if you apply a CountTrigger to an event-time window, the window will only fire based on count and ignore the event-time watermark. This means built-in triggers can’t be combined out-of-the-box – you can’t directly get a window that fires on either a count or a time condition using only the provided triggers.
In practice, complex scenarios often require custom triggers. Custom triggers let you define arbitrary conditions (or combinations of conditions) for firing windows, such as “fire on count or time, whichever comes first” or “fire when a special event occurs.”
By writing a custom trigger, you can overcome the limitations of the built-ins and implement custom early results, session-specific logic, late firing policies, and more.
Creating Custom Triggers
To implement a custom trigger in Flink (DataStream API), you create a class that extends the abstract Trigger<T, W>
class, where T
is the type of elements in the window and W is the window type (e.g. TimeWindow).
In your subclass, you override several methods that Flink uses to drive the trigger’s behavior. You can find a full list and description with those methods here.
A TriggerContext
is provided to these methods, which offers utilities to register and delete timers (see more here) and to access partitioned state tied to the window. You can use Flink's state within triggers to track information like element counts or flags. For instance, if implementing a count-based trigger, you might keep a ValueState<Integer>
for the count of elements seen so far in the window.
Note: Triggers must be serializable as they are part of the job graph sent to workers. Also, remember that returning FIRE
does not clear the window state by default – the window remains and may accumulate more data, potentially firing again later. If you want the window to be cleared when it fires (emitting results only once), you should return FIRE_AND_PURGE
or use a PurgingTrigger
wrapper.
We’ll see examples of both behaviors below, so let's walk through 🚶two real-world examples using custom triggers in Flink.
The full source code can be found here and if you are looking for production options you can check the Ververica Cloud for free.
Example 1: Tumbling Window and Count-Based Early Firing
Scenario: Imagine an e-commerce application tracking user events – e.g. page views
, add-to-cart
, checkout initiations
– as a stream. We want to compute some analytics per user in a tumbling time window (say, 1 hour windows per user).
However, if a user is very active, we don’t want to wait until the end of the hour to get intermediate results. We decide that for each user’s window, the result should be emitted early after 5 events (we will use a small threshold for demonstration purposes) have been received in that window, rather than waiting the full hour.
We will implement a custom trigger that tracks the count of elements in the window. It will fire when the count reaches 5, and also ensure the window will fire at the end of the time window.
The important part is within the CustomCountTrigger class:
@Override
public TriggerResult onElement(UserEvent userEvent,
long timestamp,
TimeWindow timeWindow,
TriggerContext triggerContext) throws Exception {
// Get or initialize the current count
ValueState<Integer> countState = triggerContext.getPartitionedState(countStateDesc);
Integer count = countState.value();
if (count == null) {
count = 0;
}
// Increment count for every element
count += 1;
countState.update(count);
// If this is the first element, register an event-time timer for end-of-window
// (Timers are set at window end timestamp, so when watermark passes window.end, onEventTime will fire)
if (count == 1) {
long windowEnd = timeWindow.getEnd(); // end timestamp of this TimeWindow
triggerContext.registerEventTimeTimer(windowEnd);
}
// Check if we've reached 5 events in this window
if (count >= 5) {
// Fire (emit the window) *now*, but do NOT purge (we return FIRE, not FIRE_AND_PURGE).
// This means the window contents remain, and the window will possibly fire again at end-of-window.
return TriggerResult.FIRE;
} else {
// Not yet reached 5, so continue accumulating
return TriggerResult.CONTINUE;
} }
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
// We don't use processing-time timers in this trigger.
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
// This is called when the event-time timer for the window fires (i.e., watermark reached window end).
if (timestamp == timeWindow.getEnd()) {
// Window end reached, so fire the window result.
// We return FIRE_AND_PURGE to emit the result and clear the window state.
return TriggerResult.FIRE_AND_PURGE;
}
// If it's not the window-end timer (e.g., some other timer), we ignore.
return TriggerResult.CONTINUE; }
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
// Clean up the count state when the window is purged/closed.
ValueState<Integer> countState = triggerContext.getPartitionedState(countStateDesc);
countState.clear();
// We don't need to manually delete the event-time timer for window end – Flink does that when window closes.
}
Let's break this down. On each element:
It registers an event-time timer for the end-of-window when the first element arrives (to ensure the window eventually fires at the end if the count trigger didn’t already fire) – similar to what Flink’s default EventTimeTrigger does internally.
If count reaches 5, it returns
TriggerResult.FIRE
. We chooseFIRE
(notFIRE_AND_PURGE
) so that the window’s contents are not thrown away; this means more elements can still accumulate for the final output. We could also decide to reset the count state to 0 here if we wanted to allow another early firing every 5 events, but in this example we only fire once early at 5 and then rely on the final timer.In
onEventTime()
, when the watermark hits the window end timestamp, we returnFIRE_AND_PURGE
. This emits the final window result and clears out the window state (theclear()
method will be called to drop the count state). We useFIRE_AND_PURGE
here because once the window end is reached, the window should close and not hold resources.The
clear()
method ensures we remove the count from state to avoid leaks. Flink will callclear()
after the window is purged (either via our final fire or if the window is disposed for any reason).
This custom trigger demonstrates combining a count condition with an event-time window trigger. Without a custom trigger, achieving “5 events or end-of-window” would not be possible with Flink’s built-ins alone
If you run the above example you should see the following output:
User user_1 had 5 events in window [2025-02-01 00:00:00.0,2025-02-01 01:00:00.0)
User user_1 had 6 events in window [2025-02-01 00:00:00.0,2025-02-01 01:00:00.0)
User user_2 had 5 events in window [2025-02-01 00:00:00.0,2025-02-01 01:00:00.0)
Notice here that the window gets triggered for user_1
when it gets 5 events and also when the window ends.
Example 2: Session Inactivity With Event Signals
Scenario: Now let’s consider a session window example. In many applications, events are grouped by sessions (a period of user activity separated by inactivity). Flink’s session windows can close a window after a period of inactivity. However, sometimes a session might also end due to a specific event –for example, a user explicitly logs out
or completes a checkout
.
In an e-commerce or login-based app, you may define a session to end either after 30 minutes of inactivity or when a “logout” or “checkout complete” event occurs.
Flink’s built-in session windows don’t know about the semantics of a logout event, they’d only close based on the timeout. We can create a custom trigger to handle both: fire the window when a special session-end event is seen, or if the session times out with no activity.
The important part is within the SessionEndTrigger class:
@Override
public TriggerResult onElement(UserEvent userEvent, long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
// Register an event-time timer for the end-of-window (session timeout)
triggerContext.registerEventTimeTimer(timeWindow.getEnd());
// If this event is a session terminating event, fire and purge the window
if ("LOGOUT".equals(userEvent.getEventType()) || "CHECKOUT_COMPLETE".equals(userEvent.getEventType())) {
return TriggerResult.FIRE_AND_PURGE;
}
// Otherwise, continue
return TriggerResult.CONTINUE; }
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
// not used in this trigger
return TriggerResult.CONTINUE; }
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timestamp == timeWindow.getEnd()) {
// Session inactivity timeout reached, fire and purge the window
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
// If windows merge, register a new timer for the new window end and let the old timers lapse.
// (Flink will call onEventTime for the exact timestamps that occur; by re-registering the new end we ensure final firing.)
ctx.registerEventTimeTimer(window.getEnd());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp());
}
Let's break this down. On each element:
We register a timer, which ensures an event-time timer is set for the current end-of-window. (If the session window extends or merges,
window.getEnd()
will be updated accordingly on each trigger invocation.)We then check the event’s type. If it is a designated end-of-session event (in this case,
"LOGOUT"
or"CHECKOUT_COMPLETE"
), we immediately returnFIRE_AND_PURGE
. This emits the window’s results and clears the window state right away, effectively closing the session as soon as that event is encountered.If it’s not an end signal, we return
CONTINUE
to keep collecting events.In
onEventTime()
, we check if the timer is at the window’s end timestamp. If yes, that means the watermark has reached the session’s end (i.e., no new events for 15 minutes), so we returnFIRE_AND_PURGE
to close out the session. (If the timer firing is for some other timestamp, we ignore it withCONTINUE
.)We don’t use
onProcessingTime()
here, so it just continues.The
clear()
method doesn’t have to do much because we didn’t use manual state (timers will be cleaned up by Flink after purging). If we had used any ValueState in the trigger, we’d clear it here.
If you run the code above you should see the following output:
Session for user user_1 [2025-02-01 00:20:00.0,2025-02-01 00:35:20.0) -> Events: [login, click, click, LOGOUT]
Session for user user_2 [2025-02-01 02:31:35.0,2025-02-01 02:48:19.0) -> Events: [login, click, CHECKOUT_COMPLETE]
Session for user user_1 [2025-02-01 01:20:00.0,2025-02-01 01:35:25.0) -> Events: [login, click, click]
Conclusion
Custom triggers in Apache Flink provide a powerful mechanism to control window evaluation beyond the standard time or count policies. In this post, we saw that triggers decide when a window’s results are emitted and can fire multiple times or clear the window state. Flink’s built-in triggers (event-time, processing-time, count, etc.) cover basic needs, but they cannot be combined or tailored to complex conditions. By implementing a custom trigger, you can overcome these limitations – for example, firing early based on element count while still observing event-time boundaries, or ending a window when a domain-specific event occurs.
Key takeaways and best practices:
✅ Understand the Window Lifecycle: A trigger that fires (returns FIRE
) does not close the window; the window can accumulate more data and fire again. To close it, use FIRE_AND_PURGE
or design the trigger to eventually purge (as we did at the end of time windows or session gaps).
✅ Override Required Methods: At minimum override onElement
, onEventTime
, onProcessingTime
, and clear
. Use onMerge for merging window scenarios (like sessions) to handle timers or state consistently
✅ Use TriggerContext for State and Timers: If your logic needs to count events or track flags, use partitioned state via TriggerContext
. Always clean up state in clear()
to avoid leaks.
✅ When to Use Custom Triggers: Use them when built-in triggers don’t suffice – e.g., combined conditions (time + count), irregular event-driven firing, early and incremental results, or avoiding default behaviors (one common use case is preventing late events from retriggering a window by writing a trigger that ignores late elements). If a simple built-in trigger meets the need, stick with it for simplicity. But as shown, custom triggers shine for implementing business rules directly in the streaming logic.
You made it and reached the end. I hope you enjoyed this 👋 and happy streaming 🌊
Top comments (0)