otelkafka

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2025 License: Apache-2.0 Imports: 15 Imported by: 2

README

otelkafka

Open Telemetry instrumentation for confluent-kafka-go.

Installation

go get -u github.com/jurabek/otelkafka

Usage

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/otelkafka/otelkafka"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func main() {
    // Create a new Kafka producer
    producer, err := otelkafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
    })
    if err != nil {
        log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()

    // Produce a message
    topic := "my-topic"
    message := &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          []byte("Hello, World!"),
    }

    // make sure the context is propagated before produce the message
	otel.GetTextMapPropagator().Inject(ctx, otelkafka.NewMessageCarrier(message))
    if err := producer.Produce(message, nil); err != nil {
        log.Fatalf("Failed to produce message: %v", err)
    }

    // Wait for the message to be delivered
    producer.Flush(15 * 1000)

    // Create a new Kafka consumer
    consumer, err := otelkafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "my-group",
        "auto.offset.reset": "earliest",
    })
    if err != nil {
        log.Fatalf("Failed to create consumer: %v", err)
    }
    defer consumer.Close()

    // Subscribe to the topic
    if err := consumer.Subscribe(topic, nil); err != nil {
        log.Fatalf("Failed to subscribe to topic: %v", err)
    }

    // Consume messages
    for {
        message, err := consumer.ReadMessage(-1)
        if err != nil {
            log.Fatalf("Failed to read message: %v", err)
        }
        fmt.Printf("Message: %s\n", message.Value)
    }
}

Metrics list

Table below lists the metrics that are collected by the instrumentation, and can exported using Otel Metric Exporter

Name Description Type Attributes
messaging.client.sent.messages The total number of messages sent by the producer. Counter more attributes
messaging.client.consumed.messages The total number of messages received by the consumer. Counter more attributes
messaging.client.operation.duration The duration of the messaging operation initiated by a producer or consumer client. filtered by messaging.system.operation.name Histogram more attributes

Documentation

Overview

Package otelkafka instruments the github.com/confluentinc/confluent-kafka-go/v2 package and supports function based Producer and Consumer

The consumer's span will be created as a child of the producer's span.

Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers. (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html)

Based on: https://github.com/DataDog/dd-trace-go/tree/main/contrib/confluentinc/confluent-kafka-go/kafka.v2

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Version

func Version() string

Types

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) (*Consumer, error)

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) (event kafka.Event)

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a message. Message will be traced.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) *MessageCarrier

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

func (MessageCarrier) Set

func (c MessageCarrier) Set(key string, value string)

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option interface used for setting optional config properties.

func WithCustomAttributeInjector

func WithCustomAttributeInjector(fn func(msg *kafka.Message) []attribute.KeyValue) Option

func WithMeterProvider

func WithMeterProvider(meterProvider metric.MeterProvider) Option

func WithPropagators

func WithPropagators(propagators propagation.TextMapPropagator) Option

WithPropagators specifies propagators to use for extracting information from the HTTP requests. If none are specified, global ones will be used.

func WithTracerProvider

func WithTracerProvider(provider trace.TracerProvider) Option

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

Producer supports only tracing mechanism for Produce method over deprecated ProduceChannel method

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close()

Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
OSZAR »