Kafka Topics

img

topics: a particular stream of data

basic

  • 没有数量限制
  • name 作为 id
  • 支持所有 format: json, binary, text…
  • data 默认保存一周

partition and offsets

  • topics are split in partitions
  • 每个 partition 内部是 ordered 的
  • 每个 message 在 partition 内部有一个自增的 id, 也叫做 offset
  • message 被发送后不可再更改 immutable
  • message 默认情况下会被随机发送给一个 partition, 除非提供一个 key

Producers

img

write data to topics

Producers: Message keys

img

  • 如果没有 key, message 发送按照 round robin (partition 0 -> 1 -> 2, 按照 partition 的顺序依次发送)
  • 如果有 key, 相同 key 的 message 会被发送到同一个 partition (使用 hashing)
  • 如果需要 message ordered, 通常需要给 message 添加 id

Message

img

producers acks

  • acks=0: producers won’t wait for acknowledgment (possible data loss)
  • acks=1: producers will wait for leader acknowledgement (limited data loss)
  • acks=all: producers will wait for leader and replicas acknowledgement (no data loss)

Consumers

read data from a topic - pull model

img

Consumer Groups & Offsets

all consumers read data as a consumer groups

img

Multiple Consumers on One Topic

img

  • one topic can have multiple consumer groups
  • 每个 consumer group 得到全部 messages

Kafka Brokers

a kafka cluster is composed of multiple brokers (servers)

  • each broker identified by ID
  • each broker contains certain topic partitions
  • data is distributed
  • connect to one broker = connect to whole cluster

img

Kafka Topic Replication

replication means that data is written down not just to one broker

img

Leader for a Partition

  • only one broker can be a leader for a partition
  • producer can only send data to leader
  • consumer read data from leader by default
  • other brokers will have replication data
  • each partition has one leader and many ISR (in-sync replica)

img

Demo Code

Producer

package io.conduktor.demos.kafka.wikimedia;

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.EventSource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.net.URI;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class WikimediaChangesProducer {
    public static void main(String[] args) throws InterruptedException {
        String bootstrapServers = "127.0.0.1:9092";
        // create producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // set safe producer for kafka version <= 2.8
        // properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        // properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        // properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));

        // set high throughput producer configs
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // 20ms
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); //32kb
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // compression method

        // create producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        String topic = "wikimedia.recentchange";
        EventHandler eventHandler = new WikimediaChangeHandler(producer, topic);

        String url = "https://stream.wikimedia.org/v2/stream/recentchange";
        EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
        EventSource eventSource = builder.build();

        // start the producer in another thread
        eventSource.start();

        // produce 10 mins
        TimeUnit.MINUTES.sleep(10);
    }
}
package io.conduktor.demos.kafka.wikimedia;

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WikimediaChangeHandler implements EventHandler {

    KafkaProducer<String, String> kafkaProducer;
    String topic;
    private final Logger log = LoggerFactory.getLogger(WikimediaChangeHandler.class.getSimpleName());

    public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer, String topic) {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    @Override
    public void onOpen() {

    }

    @Override
    public void onClosed() {
        kafkaProducer.close();
    }

    @Override
    public void onMessage(String event, MessageEvent messageEvent) {
        log.info(messageEvent.getData());
        // receive and send message async
        kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
    }

    @Override
    public void onComment(String comment) {

    }

    @Override
    public void onError(Throwable t) {
        log.error("error", t);
    }
}

Consumer

package io.conduktor.demos.kafka.opensearch;

import com.google.gson.JsonParser;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class OpenSearchConsumer {
    public static RestHighLevelClient createOpenSearchClient() {
        // String connString = "http://localhost:9200"; // local docker setting
        // bonsai.io URL
        String connString = "https://fake_uri";

        // build a URI from the connection string
        RestHighLevelClient restHighLevelClient;
        URI connUri = URI.create(connString);
        // extract login information if it exists
        String userInfo = connUri.getUserInfo();

        if (userInfo == null) {
            // REST client without security
            restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), "http")));
        } else {
            // REST client with security
            String[] auth = userInfo.split(":");

            CredentialsProvider cp = new BasicCredentialsProvider();
            cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(auth[0], auth[1]));

            restHighLevelClient = new RestHighLevelClient(
                    RestClient.builder(new HttpHost(connUri.getHost(), connUri.getPort(), connUri.getScheme()))
                            .setHttpClientConfigCallback(
                                    httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(cp)
                                            .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())));
        }

        return restHighLevelClient;
    }

    private static KafkaConsumer<String, String> createKafkaConsumer() {
        // local kafka endpoint
        String bootstrapServer = "127.0.0.1:9092";
        String groupId = "consumer-opensearch-demo";

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // commit offset manually instead of automatically
        // consumer.commitSync(); is required if it is false
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        return consumer;
    }

    public static void main(String[] args) throws IOException {
        Logger logger = LoggerFactory.getLogger(OpenSearchConsumer.class.getSimpleName());
        // first create opensearch client
        RestHighLevelClient openSearchClient = createOpenSearchClient();

        // create kafka client
        KafkaConsumer<String, String> consumer = createKafkaConsumer();

        // create index on opensearch if it does not exist
        try (openSearchClient; consumer) {
            boolean indexExist = openSearchClient.indices().exists(new GetIndexRequest("wikimedia"), RequestOptions.DEFAULT);
            if (!indexExist) {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest("wikimedia");
                openSearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
            } else {
                logger.info("wikimedia exists");
            }

            // check on bonsai GET /wikimedia/_doc/hVsBJ4QBR2gbmWKcd1Th
            consumer.subscribe(Collections.singleton("wikimedia.recentchange"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
                int recordCount = records.count();
                logger.info("receive" + recordCount);

                // bulk request improve performance
                BulkRequest bulkRequest = new BulkRequest();

                for (ConsumerRecord<String, String> record : records) {
                    // id can be this if data does not have id
                    // String id = record.topic() + "_" + record.partition() + "_" + record.offset();
                    try {
                        String id = extractId(record.value());
                        // send record to opensearch
                        IndexRequest indexRequest = new IndexRequest("wikimedia")
                                .source(record.value(), XContentType.JSON)
                                .id(id); // at least once strategy

                        // single request - not good performance
                        // IndexResponse response = openSearchClient.index(indexRequest, RequestOptions.DEFAULT);

                        // bulk request - good performance
                        bulkRequest.add(indexRequest);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                // bulk request to improve performance
                if (bulkRequest.numberOfActions() > 0) {
                    BulkResponse bulkResponse = openSearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                    logger.info("inserted" + bulkResponse.getItems().length);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    // this is required if ENABLE_AUTO_COMMIT_CONFIG is flase
                    consumer.commitSync();
                    logger.info("offsets committed!");
                }
            }
        }
    }

    private static String extractId(String json) {
        return JsonParser.parseString(json).getAsJsonObject().get("meta").getAsJsonObject().get("id").getAsString();
    }
}

Advanced Topics

img

Kafka Connect - connector

img

Kafka Connect is all about code & connectors re-use! like get/push data to Database

Kafka Stream

img

Easy data processing and transformation library within Kafka

Kafka Schema Registry

img

Schema Registry can reject bad data

Real World Practice

Partition Count

  • More partitions
    • better parallelism - good
    • better throughput - good
    • can run more consumers - good
    • can run more brokers - good
    • worse Zookeeper performance - bad
    • more files opened on Kafka - bad
  • small cluster (< 6 brokers): use 3 * brokers partitions
  • big cluster (> 12 brokers): use 2 * brokers partitions
  • adjust based on throughput

Replication Factor

  • 2 <= replication factor <= 4
  • High replication factor
    • better durability - good
    • better availability - good
    • higher latency - bad
    • more disk space - bad
  • set 3 as start
  • never set 1 in production

Topic name convention

...