Problem

Solution

Broker keeps dropping out of ISR

If availability is important – so monitor the ISR for every partition 

broker will drop out of ISR if they are more than 4000 messages behind the leader. it can sometimes happen – if producers write to kafka in large batches … and then leader fails to replicate … broker doesn’t ACK .

so increase  replica.lag.max.messages 

if  – replica.fetch.max.bytes < message.max.bytes => then workers will have messages that can not be replicated and eventually they will drop out of ISR … (as replicators are like consumers)

Consumer is not consuming data

check next message shouldn’t be larger than  —  fetch.message.max.bytes 

consumer’s fetch.message.max.bytes  should be smaller than broker’s max message size

when you configure the broker , check the max message byte size

validate using consumer offset checker tool 

kaka-run-class.sh kaka.tools.ConsumerOffsetChecker —zkconect …. —group ……

http://kafka.apache.org/documentation.html#newconsumerconfigs

Data not getting replicated evenly across the partitions.

say one partition has 100 msgs and another partition 40 msgs

check hash of the key is evenly generated 

  •  if key is null , kafka sends a message to a random partition and continues with it for 10 mins before working on the next partition … 

generate a random key and write custom random partitioner 

 – 

How to ensure data is not lost even when a replica is down ?

say Replica-3 is down at t=t1 (longest lag) , Replica-2 is down at t=t4 , Replica-1 is down at t=t8

if – controlled.shutdown.enable = true  , then upon restarts , all replicas will wait till the last Leader is back in action – so that no data is lost at the cost of some downtime.

Out of Memory Error

Find how much memory is needed by Kafka

Broker memory –  replica.messages.max.bytes * number of partitioned the broker replicate

Consumer memory – fetch.messages.max.bytes *  number of partitions that the consumer reads 

Producer throws – Error while fetching metadata [{TopicMetadata for topic t1 -> No partition metadata for topic t1 due to kaka.common.LeaderNotAvailableException 

  • Check if you created the topic
  • Is auto.create.topics.enable = true
  • Do retires solve the problem ?

 default.replication.factor should be smaller than number of brokers 

  • use kaka-topics.sh —describe to validate that a leader actually succeeds 

Producer throws –  kaka.common.KafkaException: fetching topic metadata for topics [Set(t1)] from broker [ArrayBuffer(id:0, host: localhost, port:9093)] failed 

    > caused by: java.net.ConnectException: Connection refused

  • Validate  host / port for  —broker.list 
  • Check connectivity using Telnet 
  • Broker may have registered with wrong host. Check ‘advertised.hosts’ configuration
    • check type of IP being used (public / private)

 topic does not get deleted

  • make sure delete.topic.enable = true
  • remember if auto.create.topics.enable = true and clients try to get metadata for that topic , it will get re-created automatically ….

… we need to check the behavior in kafka > 8.x

Which configuration in Flink-Kafka ensure that data from latest offset is read upon Flink restart ?

 To re-read everything available in a Kafka topic, set a new “group.id” and the “auto.offset.reset” to “earliest”

Reference: https://cwiki.apache.org/confluence/display/KAFKA/FAQ

Important Notes:

How does Flink try to guarantee ‘Exactly Once’ processing ?

Flink keeps and internal state of the last read offsets internally. If the Flink job fails for some reason, it restores from that state. This allows users to perform downstream operations with exactly-once semantics. Flink uses a distributed snapshot mechanism, to backup the state periodically. Once a distributed snapshot has been confirmed by all operators, the Kafka source “commits” the offsets into ZK as well. This way, users can restart the job from the offset in ZK.

Maintain partition-locality 

If we want to measure  users’ average time-on-site’ , then partition by user-id, that way all the events related to a single user’s site activity will be available within the same partition, if we need further analysis then re-partition by say viewed-page for the next phase of processing.

Cost of partitions

  • A partition is basically a directory of log files.
  • Each partition must fit entirely on one machine. So if you have only one partition in your topic you cannot scale your write rate or retention beyond the capability of a single machine. If you have 1000 partitions you could potentially use 1000 machines.
  • Each partition is totally ordered. If you want a total order over all writes you probably want to have just one partition.
  • Each partition is not consumed by more than one consumer thread/process in each consumer group. This allows to have each process consume in a single threaded fashion to guarantee ordering to the consumer within the partition (if we split up a partition of ordered messages and handed them out to multiple consumers even though the messages were stored in order they would be processed out of order at times).
  • Many partitions can be consumed by a single process, though. So you can have 1000 partitions all consumed by a single process.
  • Another way to say the above is that the partition count is a bound on the maximum consumer parallelism.
  • More partitions will mean more files and hence can lead to smaller writes if you don’t have enough memory to properly buffer the writes and coalesce them into larger writes
  • Each partition corresponds to several znodes in zookeeper. Zookeeper keeps everything in memory so this can eventually get out of hand.
  • More partitions means longer leader fail-over time. Each partition can be handled quickly (milliseconds) but with thousands of partitions this can add up.
  • When we checkpoint the consumer position we store one offset per partition so the more partitions the more expensive the position checkpoint is.
  • It is possible to later expand the number of partitions BUT when we do so we do not attempt to reorganize the data in the topic. So if you are depending on key-based semantic partitioning in your processing you will have to manually copy data from the old low partition topic to a new higher partition topic if you later need to expand.

Rules for using High-level Consumer

  • if you provide more threads than there are partitions on the topic, some threads will never see a message
  • if you have more partitions than you have threads, some threads will receive data from multiple partitions
  • if you have multiple partitions per thread there is NO guarantee about the order you receive messages, other than that within the partition the offsets will be sequential. For example, you may receive 5 messages from partition 10 and 6 from partition 11, then 5 more from partition 10 followed by 5 more from partition 10 even if partition 11 has data available.
  • adding more processes/threads will cause Kafka to re-balance, possibly changing the assignment of a Partition to a Thread.

Clean Shutdown and Error Handling

Kafka does not update Zookeeper with the message offset last read after every read, instead it waits a short period of time. Due to this delay it is possible that your logic has consumed a message and that fact hasn’t been synced to zookeeper. So if your client exits/crashes you may find messages being replayed next time to start.

Also note that sometimes the loss of a Broker or other event that causes the Leader for a Partition to change can also cause duplicate messages to be replayed.

To help avoid this, make sure you provide a clean way for your client to exit instead of assuming it can be ‘kill -9’d.

Performance Blogs

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Tuning Kafka for Spark Streaming

https://www.linkedin.com/pulse/spark-streaming-performance-tuning-mesos-kafka-chandan-prakash

Direct Kafka should be preferred over Receiver based approach for better efficiency,parallelism and exactly-once semantics

https://www.linkedin.com/pulse/upgrading-running-spark-streaming-application-code-changes-prakash


Excerpts from ~ you-cannot-have-exactly-once-delivery

We’re left with a few options, all equally tenuous. When a message is delivered, it’s acknowledged immediately before processing. The sender receives the ack and calls it a day. However, if the receiver crashes before or during its processing, that data is lost forever. Customer transaction? Sorry, looks like you’re not getting your order. This is the worldview of at-most-once delivery. To be honest, implementing at-most-once semantics is more complicated than this depending on the situation. If there are multiple workers processing tasks or the work queues are replicated, the broker must be strongly consistent (or CP in CAP theorem parlance) so as to ensure a task is not delivered to any other workers once it’s been acked. Apache Kafka uses ZooKeeper to handle this coordination.

On the other hand, we can acknowledge messages after they are processed. If the process crashes after handling a message but before acking (or the ack isn’t delivered), the sender will redeliver. Hello, at-least-once delivery. Furthermore, if you want to deliver messages in order to more than one site, you need an atomic broadcast which is a huge burden on throughput. Fast or consistent. Welcome to the world of distributed systems.

Advertisements