[Webinar] AI-Powered Innovation with Confluent & Microsoft Azure | Register Now
Imagine that you have built an Apache Flink® job. It collects records from Apache Kafka®, performs a time-based aggregation on those records, and emits a new record to a different topic. With your excitement high, you run the job for the first time, and are disappointed to discover that nothing happens. You check the input topic and see the data flowing, but when you look at the output topic, it’s empty.
In many cases, this is an indication that there is a problem with watermarks. But what is a watermark?
Let’s start by outlining the problem a watermark is designed to solve.
Each topic in Kafka is divided into partitions. Messages are assigned to a partition based on the message key, or in a round-robin fashion, if no key is provided. This allows topic consumers to scale up. As long as there are enough partitions, consumers can be added to process messages in parallel.
Within each partition, Kafka guarantees order. Messages in the partition will be consumed in the order they were received. This ensures that all records for a given partition key are consumed sequentially.
However, global ordering is impossible when parallelism is involved. In a Kafka topic, this means consuming messages from multiple partitions can result in out-of-order processing.
For many tasks, this may not matter. A job that filters records may not care about the order. It applies the appropriate filter to records as they arrive, no matter which order they appear in.
However, when time is involved, ordering may be critical.
Consider a job that looks for the maximum value of a field within a one-minute time window.
As each record passes through the statement, it will compare the field against the current maximum. If it finds a larger value, it can update the stored maximum. But how does it decide when to stop?
Remember, this statement looks at the maximum value within one minute. As each message is processed, it inspects the timestamp. Then, it needs to decide whether the window has closed. In an ordered stream, this isn’t a problem. However, if messages can arrive out of order, the job is faced with a difficult choice.
Should it close the window, emit the new message, and discard any late messages due to out-of-orderness? Or should it wait to emit the message in case out-of-order messages appear later?
Just because one message has a timestamp greater than the end of the window does not mean later messages won’t have an earlier timestamp, because they may come out of order. These messages are considered “late” because they were processed after an event with a larger timestamp. The job must decide whether to ignore late messages and emit the result, or to wait for late messages, potentially creating delays.
Flink uses watermarks to make the decision. Watermarks define the maximum amount of time the job should wait for out-of-order messages. They are typically calculated by taking the maximum timestamp observed and subtracting the allowed out-of-orderness.
In the example above, if the out-of-orderness were set to one minute, when we encounter the message at 3:26:23, the watermark would be calculated as 3:25:23. Flink would then use this watermark to decide how to deal with the remaining messages. When the message at 3:25:16 arrives, it is discarded because the timestamp is less than the current watermark. If the timestamp happened to equal the current watermark, the record would still be discarded, because it has to be greater than the watermark to be kept. This means the message at 3:25:46 wouldn’t be discarded.
The window will only be closed once the watermark advances beyond the window threshold. The window from 3:25:00 to 3:25:59 will remain open until the watermark advances to at least 3:26:00.
Out-of-orderness in the stream introduces a few important consequences. The first is that late messages may be dropped. This can result in data loss. The second consequence is that it introduces latency as the job waits for late messages. Therefore, setting the watermark strategy becomes a balance between data loss and latency. Increasing the out-of-orderness threshold will reduce the data loss at the cost of additional latency.
Now that we understand how watermarks work, let’s return to the original problem: A new Flink job was deployed, but no records were being emitted. What causes this, and what does it have to do with watermarks?
In the Flink DataStream API, it’s possible to define a job that requires watermarks without defining a watermark strategy. This prevents watermarks from being generated, which means the job doesn’t know how long to wait for out-of-order messages. The default behavior, in this case, is to wait forever. As a result, no messages get produced. The Table API and the SQL API have checks in place to prevent this type of error.
A similar issue can occur if the watermark strategy has been set up with the wrong timestamp field. When watermarks are defined, the field containing the timestamp must be explicitly declared. If it is declared incorrectly, the job can’t properly calculate a watermark and ends up waiting forever.
Sometimes, the out-of-orderness threshold is simply too large. Setting a threshold of 24 hours means the job will need to process a full 24 hours of data before it can emit a result. This can seem like the job is stuck, but in reality, it will eventually produce results—however it might take a long time.
A more complicated reason involves idle partitions. Sometimes, Kafka partitions are not evenly balanced. This can happen if you have a poor choice of partition key, or if a partition becomes blocked for some reason. When Flink calculates the watermarks, it calculates them separately for each partition. Then it uses the minimum watermark from all partitions to determine the overall watermark. However, what happens if one of the partitions becomes idle and stops contributing to that calculation?
Here again, Flink is left with a difficult decision. It could ignore the idle partition and calculate the watermark using the active partitions. However, that could create many late messages once the partition becomes active again. The late messages would be dropped, resulting in significant data loss. On the other hand, Flink can wait for the partition to become active, but it can’t emit anything until the partition starts producing messages. So once again, it is faced with the choice between data loss and latency, and like before, the default behavior is to protect the data at the cost of latency. If the idle partition never becomes active, this can result in a job that doesn’t produce results.
It is possible to configure the watermark strategy to ignore idle partitions if that is the desired behavior. However, it is important to consider why the partition is idle. There might be an upstream issue that should be dealt with first.
Idle partitions are one thing. Idle streams are something else entirely. They often occur when testing a new stream, where it’s common to emit a small batch of events to ensure the stream works. This can also happen in production data streams that experience traffic in bursts.
Suppose the system were to emit 200 events within 20 seconds and then stop. Meanwhile, a window operation divides the stream into one-minute windows. Assuming no more events are emitted after the initial 200, the window may never emit any results.
The problem is that watermarks only advance when events are flowing. After just 20 seconds, all of the events stop flowing. Meanwhile, the window can only close once the watermark reaches the one-minute threshold. So, because the watermark isn’t being updated, the window never closes, and the output stream remains idle.
The Data Streaming Platform in Confluent Cloud uses a default watermark strategy to address issues created by out-of-order events.
When messages are produced in Confluent Cloud, they are automatically assigned a timestamp. This ensures that every message has a default timestamp for calculating watermarks.
It then uses a histogram of the observed timestamps to determine an appropriate watermark. The goal is to drop less than 5% of the messages due to out-of-orderness. There is some variability here. Bursts of out-of-order messages might cause it to drop more than 5% until the histogram recalculates. Meanwhile, if out-of-orderness is stable, it might drop significantly fewer messages.
There are some limitations in this calculation. It requires a minimum of 250 messages per partition before it will generate reasonable watermarks. It will also never set the out-of-orderness to less than 50 milliseconds, or more than seven days.
It also assumes that data will flow continuously. Idle streams can still prevent watermarks from advancing, which may cause issues, even with the default watermark strategy.
However, in an active data stream, users of Confluent Cloud can expect that when they run their jobs for the first time, there will be an effective watermark strategy to ensure messages start flowing relatively quickly.
There may be situations where the default strategy in Confluent Cloud is insufficient. These include:
If the job needs to use a different timestamp from the automatically assigned one
If the incoming topic has fewer than 250 messages per partition
If the job needs to operate outside the minimum (50 milliseconds) or maximum (seven days) out-of-orderness thresholds
If the job cannot tolerate dropping messages, or needs more control over when to drop messages
If these conditions apply, the default strategy should be overridden and replaced with something more suitable.
Dealing with watermarks in Apache Flink® is a common headache, especially for new users. The Data Streaming Platform in Confluent Cloud makes this easier by providing a robust default algorithm. If you want to know more about implementing a data streaming application using Confluent Cloud, check out our new course on Confluent Developer: Apache Flink® Table API: Processing Data Streams in Java.
Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by the Apache Software Foundation is implied by the use of these marks. All other trademarks are the property of their respective owners.
Salesforce has Agentforce, Google launched Agentspace, and Snowflake recently announced Cortex Agents. But there’s a problem: They don’t talk to each other…
Introduction to Flink SQL FEDERATED_SEARCH() on Confluent cloud. FEDERATED_SEARCH() along with ML_PREDICT() enables developers to execute GenAI use cases with data streaming technologies.