Apache Kafka

Apache Kafka #

Preliminaries #

Reasons #

  • Decoupling of data streams
  • Distributed, resilient architecture, fault tolerant
  • Horizontal scalable
  • High performance

Use cases #

  • Messaging System
  • Activity Tracking
  • Gather metrics from many different locations
  • Application Logs gathering
  • Stream processing
  • De-coupling of system dependencies
  • Integration with Spark, Flink etc.

Typical Architecture #

Topics and partitions #

  • Topics: Particular stream of data
  • Similar to a table in a database
  • Unlimited in number
  • Name = identifier
  • Split in partitions (i.e. a partition is a part of a topic)
  • Partitions are ordered
  • Each message within a partition gets an incremental unique id (offset)
  • Offsets are per partition
  • Order is guaranteed only within a partition
  • Data is kept only for a limited time (default: one week)
  • Data written to a partition can’t be changed
  • Data is randomly assigned to a partition unless a key is provided
  • Unlimited partitions per topic

Important Topic Settings #

Partitions and Replication #

  • Partition count and replication factor two most important settings regarding performance
  • Preferably number of partitions and replications aren’t changed after the creation of the topic (there can only be more partitions afterwards)
  • Better overpartition than underpartition
  • Rules of thumb for partitions setup:
    • Roughly, each partition can get a throughput of 10MB / sec
    • More partitions implies:
      • Better parallelism, better throughput
      • More files opened on system
      • If a broker fails (unclean shutdown), many concurrent leader elections
      • More latency for replication (in the order of ms)
    • Guidelines:
      • Number of partition per broker between 2000 and 4000
      • Number of partition in the cluster to less than 20000
      • Partitons per topic = (1-2) x (# of brokers), max 10 partitions
  • Rules of thumb for replication setup:
    • At least 2, maximum of 3
    • More replication implies:
      • Better resilience (Replication factor minus 1 brokers can fail)
      • Longer replication time (higher latency if is acks=all)
      • More disk space needed
    • Guidelines:
      • Set it to 3 (implying there are at least 3 brokers available)
      • If replication performace is an issue, get a better broker instead of less RF

Partitions and Segments #

  • Segments are the files which build a partition
  • Segments are built by Kafka one by one (e.g. segment 0 goes from offset 0-983, segment 1 from offset 984-1803 etc.)
  • Active segment: Always the last segment, the only one where data is still written to
  • Two important segment settings:
    • segment.bytes: the max size of a single segment in bytes (default: 1GB). Smaller segments mean:
      • More segments per partition
      • Log compactions happens more often
      • Kafka has to keep more files opened (Error: Too many open files)
      • Guiding question: How fast will I have new segments based on throughput?
    • segment.ms: the time Kafka will wait before committing (ie. closing) the segment if not full (default: 1 week). This setting is normally changed only if a more frequent log compaction is desirable (segment is closing => log compaction)
  • Segments come with two indexes (files):
    • An offset to position index: Allows Kafka where to read to find a message
    • A timestamp to offset index: Allows Kafka to find messages with a timestamp
  • These indexes allow Kafka to find data in constant time

Log Cleanup Policies #

  • Many Kafka clusters make data expire, according to a policy
  • That concept is called log cleanup
  • Policy I: cleanup.policy=delete (Default for all user topics)
    • Delete based on age of data (retention.hours default 1 week)
    • Delete based on max size of log (retention.bytes default -1 (ie. infinite))
  • Policy II: cleanup.policy=compact (Default for topic __consumer_offsets)
    • Ensures that your log contains at least the last known value for a specific key within a partition
    • Delete based on keys of messages
    • Will delete old duplicate keys after the active segment is committed
    • Otherwise infinite time and space retention
    • Guarantees:
      • Any consumer that is reading from the head of a log will still see all the messages sent to the topic (remember: Old duplicates are only deleted after the active segment is committed)
      • Ordering of messages is kept
      • Offset of messages is immutable (offsets are just skipped if a message is missing)
      • Removed records can still be seen by consumers for a period of time (delete.retention.ms, default 1 day)
    • Very useful if just a snapshot instead of full history is required (such as for a current data table in a database without transaction logs)
    • Caveat: Log compaction can fail:
      • Optimization, compaction thread might crash
      • Make sure to assign enough memory to it and that it gets triggered
  • Deleting data from Kafka allows to:
    • Control size of data on disk
    • Overall: Limit maintenance work on Kafka cluster
  • How often does log cleanup happen?
    • Log cleanup happens on partition segments
    • Smaller / more segments means that log cleanup will happen more often
    • Log cleanup shouldn’t happen too often (CPU, RAM)
    • Cleaner checks for work every 15 seconds (log.cleaner.backoff.ms)

Log compression #

  • Topics can be compressed using compression.type setting
  • Options: gzip, snappy, lz4, uncompressed, producer (default: producer)
  • Option producer: Producer chooses the adequate compression method, the broker takes the data as is (saves CPU time)
  • If compression is enabled, make sure you’re sending batches of data
  • Data will be uncompressed by the consumer
  • Compression only makes sense if non-binary data is sent

Brokers #

  • A Kafka cluster is composed of multiple servers (brokers)
  • Each broker is identified with its ID (integer)
  • Each broker contains certain topic partitions
  • After connecting to any broker (= bootstrap broker), you will be connected to the entire cluster
  • 3 good starting number of brokers
  • Topics should have a replication factor > 1 (usually between 2 and 3)
  • Only one broker can be a leader for a given partition
  • Only that leader can receive and serve data for a partition
  • The other brokers will synchronize the data (ISR: in-sync replica)

Producers #

  • Producers write data to topics
  • They only have to specify the topic name and one broker to connect to, and Kafka will automatically take care of routing the data to the right brokers
  • Producers can choose to receive acknowledgement of data writes:
    • Acks=0: Producer won’t wait for acknowledgement (possible data loss)
    • Acks=1: Producer will wait for leader acknowledgement (limited data loss)
    • Acks=all: Leader and replicas acknowledgement (no data loss)
  • Possible for producers to send key with data
  • With key guarantee that all keyed messages go to same partition
  • Thus guaranteed ordering for a specific key

Important Settings for Java Producer Client #

  • bootstrap.servers
  • key.serializer (e.g.
  • value.serializer
  • acks (0, 1, all)
  • retries
  • linger.ms (If messages should be sent to Kafka in a certain interval)

Consumers #

  • Consumers read data from a topic
  • Required: Topic name and one broker
  • Data is read in order for each partition
  • Consumers read data in consumer groups
  • Each consumer within a group reads from exclusive partitions (i.e. there cannot be more consumers than partitions in one group)
  • You cannot have more onsumers than partitions (otherwise some will be inactive)
  • Kafka stores the offsets at which a consumer group has been reading
  • The offsets commit live in a Kafka topic named __consumer_offsets
  • When a consumer has processed data received from Kafka, it should be committing the offsets
  • If a consumer process dies, it will be able to read back from where it left off thanks to consumer offsets.

Important Settings for Java Producer Client #

  • bootstrap.servers: Comma-separated list of brokers
  • key.deserializer (e.g. StringDeserializer.class.getName()): Deserializer used for key
  • value.deserializer: Deserializer used for value
  • group.id: Consumer group name
  • enable.auto.commit: Auto-commit offsets (instead of manual committing)
  • auto.commit.interval.ms: Time interval for committing offsets
  • auto.offset.reset: What to do if no offsets are available (latest, earliest, none (throws exception))

Zookeeper #

  • Kafka does not run without Zookeeper
  • Zookeeper manages brokers (keeps a list of them)
  • Zookeeper helps elect leaders for partitions
  • Zookeeper sends notifications to Kafka in case of changes (e.g. new topic, broker dies / comes up, deleted topics)
  • Zookeeper usually operates in an odd quorum (cluster) of servers (3, 5..)
  • Zookeeper has one leader and x followers

Kafka Guarantees #

  • Messages are appended to a topic-partition in the order they are sent
  • Consumers read messages in the order stored in a topic-partition
  • With a replication factor of N, producers and consumers can tolerate up to N-1 brokers being down
  • This is why a replication factor of 3 is a good idea:
    • Allows for one broker to be taken down for maintenance
    • Allows for another broker to be taken down unexpectedly
  • As long as the number of partitions remains constant for a topic (no new partitions), the same key will always go to the same partition

Delivery semantics for consumers #

  • Consumers decide when to commit offsets. There are several strategies:
  • At most once: Offsets are committed as soon as the message is received. If the processing goes wrong, the message will be lost
  • At least once: Offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of messages. So make sure your processing is idempotent (i.e. processing again the messages won’t impact the system)
  • Exactly once: Very difficult to achieve / needs strong engineering

Commands #

kafka-topics #

Create topic kafka-topics –zookeeper 127.0.0.1:2181
–create
–topic first_topic
–partitions 3
–replication-factor 1

List topics kafka-topics –zookeeper 127.0.0.1:2181
–list

Remove topic kafka-topics –zookeeper 127.0.0.1:2181
–delete
–topic first_topic

Describe topic kafka-topics –zookeeper 127.0.0.1:2181
–describe
–topic first_topic

Alter topic configuration kafka-topics –zookeeper 127.0.0.1:2181
–alter
–topic first_topic
–config

kafka-console-producer #

Publish data over console kafka-console-producer –broker-list 127.0.0.1:9092
–topic first_topic

Publish data with key,value kafka-console-producer –broker-list 127.0.0.1:9092
–topic first_topic
–property parse.key=true
–property key.separator=,

parse.key ensures that the provided string is parsed for key. Subsequently a valid string will be in the form of key,value.

kafka-console-consumer #

Consume data kafka-console-consumer –bootstrap-server 127.0.0.1:9092
–topic first_topic

Consume data from beginning kafka-console-consumer –bootstrap-server 127.0.0.1:9092
–topic first_topic
–from-beginning

Consume data from beginning and from specific partition kafka-console-consumer –bootstrap-server 127.0.0.1:9092
–topic first_topic
–from-beginning
–partition 1

Consume from specific group kafka-console-consumer –bootstrap-server 127.0.0.1:9092
–topic first_topic
–consumer-property group.id=mygroupid

If a group id is designated, offsets will automatically be committed to Kafka. Thus a reconnected consumer will never show every message even with the --from-beginning flag.

Other useful properties (--property key=value):

  • print.key=true
  • key.deserializer=org.apache.kafka.common.serialization.{String,Long,...}Deserializer
  • value.deserializer=org.apache.kafka.common.serialization.{String,Long,...}Deserializer

Streams #

Kafka Streams

Deployment #

Third-Party Tools #