Integrate Kafka into .NET Core Applications - A Step-by-Step Guide

Apache kafka is an open source distributed system for event processing of real time data events and pipelines. Kakfka is a really popular at enterprise level that can handle millions of messages per second making it a suitable option for large scale systems.

  • The producer publishes messages to the kafka topics. Internally, the messages are split into records and each of this record has an unique offset.
  • Kafka topics are divided into partitions and any time a message is sent to a topic, the message is assigned to the partition. Every partition is ordered immutable sequence of records.
  • The reason why partitioning is important in kafka is because it plays a direct role in scalability and distribution of the data across the kafka brokers.
  • If the partition key is not provided, kafka will use a partitioning strategy (like round robin mechanism) to distribute the message across the partitions. But if the partition key is provided, then that message goes into the same partition, there by ensuring that the messages are ordered in that partition. As long as the messages are in the same partition, the ordering is gauranteed.

Let’s explore how to create a .NET Core Web API application with that will place an order and stream it to Kafka. First up, include the Confluent.Kafka package in your .NET Core Web API application.

    <PackageReference Include="Confluent.Kafka" Version="2.6.1" />

In order to produce a kafka message, you would need to configure kafka producer in appsettings.json with key parameters - BootstrapServers, SaslUsername, SaslPassword and TopicName.

  • BootstrapServers is a list of kafka broker address in order to connect to the cluster.
  • SASL stands for Simple authentication and security layer. SASL user name and password should be provided to authenticate the producer.
  • Topic name specified where the messages will be sent.
    "KafkaOptions": {
        "BootstrapServers": "localhost:9092",
        "SaslUsername": "your-username",
        "SaslPassword": "your-password",
        "TopicName" :  "OrderTopic"
    },

Now register the kafka producer with the above configuration. The Build() will create an instance of kafka producer that will take a string partition key and a string value.

    services.AddSingleton(producer =>
    {
        var kafkaOptions = producer.GetService<IOptions<KafkaOptions>>();
        var config = new ProducerConfig
        {
            BootstrapServers = kafkaOptions?.Value.BootstrapServers,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername = kafkaOptions?.Value.SaslUserName,
            SaslPassword = kafkaOptions?.Value.SaslPassword,
        };
        return new ProducerBuilder<string, string>(config).Build();
    });

Now inject the IProducer<string, string> into the service which will be used in the controller. Next up, produce the kafka message using _producer.ProduceAsync with the key and value as parameters.

    public KafkaEventProducer(IProducer<string, string> producer, ILogger<KafkaEventProducer> logger, IOptionsSnapshot<KafkaOptions> kafkaOptions)
    {
        _producer = producer;
        _logger = logger;
        _kafkaOptions = kafkaOptions.Value;
    }

    public async Task PublishKafkaEvent(string key, string value)
    {
        _logger.LogInformation($"Starting publish event");

        try
        {
            var message = new Message<string, string>
            {
                Key = key,
                Value = value
            };

            var deliveryResult = await _producer.ProduceAsync(_kafkaOptions.TopicName, message);
        }
        catch (ProduceException<string, string> ex)
        {
            _logger.LogError($"Failed to deliver the message: {ex.Error.Reason}");
        }
        catch (Exception ex)
        {
            _logger.LogError($"An unexpected error has occurred: {ex.Message}");
        }
    }

The final step is to incorporate the producer service into the end point that places an order. The order request is serialized as the message and the partition key will be the product ID.

    [HttpPost(Name = "CreateOrder")]
    public async Task Post([FromBody] CreateOrderRequest request)
    {
        var message = JsonSerializer.Serialize(request);

        await _producer.PublishKafkaEvent(request.ProductId, message);
    }

Every time an order is placed with the product ID as key and the order details as message, this is placed on kafka event stream. Orders with the same product ID will be placed on the same partition there by ensuring that the order is maintained for any product ID.

Written by

Deepti

I'm Deepti Velusamy. I’m a software developer with a strong focus on backend API development (.NET) and extensive experience in AWS, terraform, micro services and containerization.