Apache Kafka is the leading data landing platform. It is the de-facto standard for collecting and then streaming data to different systems. It is also used a filter system in many cases where messages from a topic are read and then put on a different topic after processing, much like unix pipes. It is a messaging/queuing system with at-least-once delivery semantics.
Kafka is different from other messaging system in that it delegates offset management to consumers. Generally, messaging systems maintain a counter of where a consumer was while reading from a topic. By moving offset management to each client, it simplifies processing and reduces complexity (compared to other exactly-once delivery semantic systems). More details on the philosophy behind Kafka can be found at its website.
Kafka works with topics, each topic is divided into one or more partitions. Messages in each partitions are ordered. When a consumer reads a topic, it actually reads data from all of the partitions. Obviously, Kafka provides means to read data from a single partition if client wanted. As a consumer reads data from a partition, it advances its offset, this offset when stored becomes an acknowledgement. Kafka does not have traditional message acknowledgements.
Consumer Offset Management
With Kafka, consumers have different options to manage topic offsets:
- With Kafka < 0.9.0
- Outsource offset management to Zookeeper
- Manage offsets in a Kafka topic, Kafka > 0.8.1
- Store in any other system: local disk, S3, Database etc.
- With Kafka 0.9.0, a new consumer API allows a much cleaner way to manage offsets in Kafka.
As we can see, offset management is a topic getting much attention in Kafka. This is because seldom does a consumer just reads messages from a topic. Most of the times we have:
- Huge amounts of data to replay
- Situation where we want to consume messages as fast as possible.
- Data processing/Transformation to be done on messages.
- Restart from a previous failure
These requirements does not go well with the kafka's definition of 'consumed'. Kafka considers a messages consumed as soon as it was given to the consumer. In most situation, 'consumed' would be defined differently. That is why applications seek greater control on offset management.
To complicate matters further, Kafka (< 0.9.0) has two consumer clients: A Highlevel Consumer and another called a Simple Consumer. Simple consumer is anything but simple. With Kafka 0.9.0 though, there is a new consumer API that replaces both the previous APIs. Based on the client you use, certain configuration parameters behave differently.
Before we move ahead and look each type of offset management, there are a few configuration switches that behave peculiarly and some others are important to understand.
auto.offset.reset: This can have two possible values -
largest. If this value is set to
smallest, the client would start consuming from smallest available offset. Now it is important to note that the smallest value may not always be the value you initially started consuming, let’s say 0. This is because of the fact that Kafka has log retention policy. Log retention is governed by:
retention.msdefaults to 7 days. Kafka would store messages for 7 days by default for each topic before discarding them. So, if you kept pushing data to Kafka for 14 days, and use
smallestas a value to
auto.offset.reset, your clients would start from the smallest available offset after messages were discarded based on the retention policy.
offsets.storage: This determines where offsets are stored, either
kafka(only >= 0.8.2). This is a setting for Highlevel consumer only.
auto.commit.enable: Determines if offsets need to be commited automatically every
This configuration parameter behaves differently for Highlevel and Simple consumers.
A. If you are working with Highlevel Consumer, reading from a topic
foo and you have:
- A consumer in a new consumer group - since this is a new consumer, there are no offsets stored anywhere.
auto.offset.resetsetting would decide as to where to read messages from: beginning of the topic or end of the topic.
- Restart a previously failed consumer in an existing consumer group - since this consumer consumed some messages and stored offsets on the storage (kafka or zookeeper), the re-started consumer will just read offsets from the storage and completely ignore
B. If you are working with Simple Consumer, reading from a topic
foo then for every run of the consumer it check
auto.offset.reset value and starts reading messages accordingly. The consumer will have to manage offsets itself and can ask Kafka to read from any available position.
Offsets in Zookeeper
Storing offsets in zookeeper was the default behaviour in Kafka < 0.8.1 for the high level consumer. Zookeeper may become a bottleneck under heavy load. Storing offsets in zookeeper is highly non-advisable.
auto.commit.enable set to
false, one could manually control when offsets are committed thus allowing to define 'consumed'.
commitOffsets API allows a HLC to commit offsets at a time of it choosing BUT it commits offsets for all partitions. It does not allow control over which partition offsets to commit. Which may be fine for straightforward applications. For applications that want to run multiple threads doing processing on messages, this single offset commit may not work.
Changing Offset would require manually changing offsets in zookeeper.
With Kafka 0.9.0, the new consumer takes a map with specific offsets to be committed. The API methods are called
commitAsync. This release also adds a
commitOffsets method to the old highlevel consumer API
Offsets in Kafka
With 0.8.2, Kafka allowed to store offsets inside a Kafka topic named
__consumer_offsets. This is one of the most recommended methods of storing offsets. This method relies on Offset Management API. This API stores offsets in Kafka by default. For a highlevel consumer that wants to store offsets in zookeeper but still use offset management API, it should change
For a Simple consumer that wishes to store offsets in zookeeper but use this API, it needs to store offsets manually in zookeeper directly.
Storing offsets anywhere else would simply mean using Simple Consumer for Kafka <0.9.0. This means writing more code and managing Kafka internals. But by storing offsets outside Kafka and zookeeper, application need not use Offset management API. One can store offsets on S3, local files, ftp locations etc.
Changing offsets would require just using a different offset than what is stored in the offset store.
Offsets in Kafka 0.9.0
With 0.9.0, Kafka has done away with two different consumers. There is only one consumer API which simplifies offset management greatly. Detailed java code example here.
This new API still allows to commit offsets automatically but also allows to commit manually with control over what to commit for each partition.
Changing offsets with new API is easy with
What is the bottom line?
Going with the new consumer API would be an easy choice. The amount of code one writes is less, clean and simple.
Regardless of which method an application goes with, the fundamental question of when a message is considered 'consumed' remains and forces the application towards at-least-once delivery. It needs to take care of situations when a messages was considered consumed, stored in Database or stored in Hadoop or Stored on local file, and when offsets were stored. There may be a failure in between message storage and offset storage. And when a failure happens between those two events, we have stale offsets in our store of choice. So next time, applications starts from that stale state, which introduces duplicate messages. Which is why at-least-once delivery.
Making message and offset storage atomic in any combination of techniques above would give us exactly-once delivery. If the application uses something like HBase and make sure that there is unique key for each message, we get each message 'exactly-once', though technically that is not same as exactly-once delivery.
Why do i get ERROR OOME with size error?
This error has nothing to do with offset management. Kafka uses internal byte buffers to store messages (message chunks to be precise) for each consumer. This buffer is governed by below configurations:
- fetch.size or fetch.message.max.bytes and
- queuedchunks.max or queued.max.message.chunks
These settings determine how much memory a consumer is goign to take. For, older clients, it is:
(number of consumer threads) * (queuedchunks.max) * (fetch.size) or
(number of consumer threads) * (queued.max.message.chunks) * (fetch.message.max.bytes)
For the new consumer API it is:
(number of partitions) * (max.partition.fetch.bytes)
When consumers get busy with processing and are slow (due to RAM and/or CPU pressure) to consume and remove data from internal buffers, an internal call to pre-fetch these buffer would end up in the error above.
Solution to this error would be to:
- increase heap size for the application
- reduce number of consumer threads
- consume some partition from another machine
- Slow down
The last option above is handy if we have started a new consumer/consumer group knowing that
- There is a initial huge data set to be consumed and
- Daily data rate is not significant
- And we want to use this same consumer for ongoing processing of messages
- do not want multiple machines running for this one consumer
Slowing down would mean measuring message consumption rates in the application and putting threads to sleep :-)