Learnitweb

Controlling Kafka Producers and Consumers

In Apache Kafka, you can fine-tune how producers and consumers interact with topics and partitions. This tutorial explains how to:

  1. Control which producer sends to which partition
  2. Control which consumer reads from which partition or topic

Part 1: Controlling Kafka Producers

By default, Kafka decides which partition to send a message to. However, Kafka allows you to override this behavior using the following techniques:

Option 1.1: Use a Message Key (Default Partitioning)

When a producer sends a message with a key, Kafka uses the formula:

partition = hash(key) % number_of_partitions

This ensures that:

  • All messages with the same key go to the same partition.
  • Useful for preserving ordering per key.
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders-topic", "user123", "OrderCreated");

producer.send(record);

Explanation:

  • user123 is the key.
  • Kafka hashes the key and picks a partition.
  • All messages with key "user123" will go to the same partition → ordering guaranteed for that user.

Option 1.2: Manually Specify the Partition

You can override the partition selection by explicitly passing the partition number.

ProducerRecord<String, String> record =
    new ProducerRecord<>("orders-topic", 2, null, "OrderCreated");

producer.send(record);

Explanation:

  • The message will be sent to partition 2, regardless of key or default logic.
  • Use this if you know exactly which partition should receive the message.

Use Case:

  • When using static routing, like all payments go to partition 0, orders to partition 1.

Option 1.3: Implement a Custom Partitioner

Kafka allows you to implement your own logic for partition selection by providing a custom Partitioner class.

Step-by-step:

  1. Create your partitioner class:
public class RegionBasedPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, Object value,
                         int numPartitions) {
        if (key instanceof String region) {
            return switch (region) {
                case "east" -> 0;
                case "west" -> 1;
                default -> 2;
            };
        }
        return 0;
    }

    // Implement close() and configure() as needed
}

2. Register the partitioner:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.myapp.RegionBasedPartitioner");

Explanation:

  • This gives full control over how Kafka chooses the partition.
  • Use case: region-based routing, custom hashing, etc.

Part 2: Controlling Kafka Consumers

By default, Kafka consumers work in groups and are automatically assigned partitions. However, you can override this behavior.

Option 2.1: Use Consumer Groups (Default)

Kafka consumers in the same group divide the partitions among themselves.

Example:

props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders-topic"));

Explanation:

  • Kafka handles partition assignment automatically.
  • Each partition is read by only one consumer in the group.
  • Scaling: Add more consumers to the same group to increase parallelism.

Use Case:

  • Multiple instances of the same microservice processing messages concurrently.

Option 2.2: Manual Partition Assignment (Full Control)

If you need full control over which consumer reads from which partition, use assign() instead of subscribe().

Example

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(Arrays.asList(new TopicPartition("orders-topic", 1)));

Explanation:

  • Consumer will only read from partition 1 of orders-topic.
  • No rebalancing, no automatic partition assignment.
  • You must manage partition allocation manually.

Use Case:

  • Fixed mapping of consumers to partitions (e.g., for ordering, latency tuning, or isolated processing).

Option 2.3: Use Different Consumer Groups

Kafka allows each consumer group to receive its own copy of the data.

Example:

  • Group A: "order-processing"
  • Group B: "order-archiver"

Each group gets the full stream of messages from the topic independently.

props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-archiver");
consumer.subscribe(Arrays.asList("orders-topic"));

Use Case:

  • One group processes orders.
  • Another group logs them to a database.
  • Yet another group streams them to analytics.

Option 2.4: Topic-based Routing

If you want different producers or consumers to handle different data categories, use separate topics.

Example:

  • orders-topic → order events
  • payments-topic → payment events
  • notifications-topic → email/SMS events

Each consumer can subscribe only to the relevant topic:

consumer.subscribe(Arrays.asList("payments-topic"));