golang使用kafka

kafka 安装及基础概念介绍可以参考:kafka 安装、配置、启动_王安的博客-CSDN博客_kafka安装启动

本文主要介绍confluent-kafka-go的使用方法。confluent-kafka-go,简单易用,并且表现稳定,是kafka官网推荐的golang package。
https://github.com/confluentinc/confluent-kafka-go  

一. 下载go client

go get -v github.com/confluentinc/confluent-kafka-go

二 example

2.1 创建topic 


import (
	"context"
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"
)

const Topic = "testTopic"
const Broker = "127.0.0.1:9192"
const NumPartition = 2
const ReplicationFactor = 1
const ConsumerGroup1 = "consumerTest1"

func main() {

	fmt.Println("Kafka Demo RUN:")
	//创建topic
	KafkaCreateTopic()
	//创建生产者
	go KafkaProducer()
	//创建消费者
	go KafkaConsumer("group1")
	go KafkaConsumer("group1")
	go KafkaConsumer("group2")

	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
	run := true
	for run {
		select {
		case sig := {
			Topic:             Topic,
			NumPartitions:     NumPartition,
			ReplicationFactor: ReplicationFactor}},
		// Admin options
		kafka.SetAdminOperationTimeout(maxDur))
	if err != nil {
		fmt.Printf("Failed to create topic: %vn", err)
		os.Exit(1)
	}

	// Print results
	for _, result := range results {
		fmt.Printf("%sn", result)
	}

	a.Close()
}

2.2 Producer


/*
消息生产者
*/
func KafkaProducer() {
	topic := Topic
	broker := Broker
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})

	if err != nil {
		fmt.Printf("Failed to create producer: %sn", err)
		os.Exit(1)
	}

	fmt.Printf("Created Producer %vn", p)

	// Optional delivery channel, if not specified the Producer object's
	// .Events channel is used.
	deliveryChan := make(chan kafka.Event)

	//每5s向kafka发送一条消息
	n := 0
	for {
		n++
		value := strconv.Itoa(n) + " Hello Go!"
		err = p.Produce(kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: topic, Partition: kafka.PartitionAny},
			Value:          []byte(value),
			Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
		}, deliveryChan)

		e := <- deliveryChan
		m := e.(*kafka.Message)
		fmt.Printf("生产者:Delivery failed: %v\n", m.TopicPartition.Error)
		if m.TopicPartition.Error != nil {
			fmt.Printf("生产者:Delivery failed: %v\n", m.TopicPartition.Error)
		} else {
			fmt.Printf("生产者:Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
		}
		time.Sleep(5 * time.Second)
	}
	close(deliveryChan)
}

2.3 Consumer


func KafkaConsumer(consumerGroup string) {
	broker := Broker
	group := consumerGroup
	topics := Topic
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		// Avoid connecting to IPv6 brokers:
		// This is needed for the ErrAllBrokersDown show-case below
		// when using localhost brokers on OSX, since the OSX resolver
		// will return the IPv6 addresses first.
		// You typically don't need to specify this configuration property.
		"broker.address.family": "v4",
		"group.id":              group,
		"session.timeout.ms":    6000,
		"auto.offset.reset":     "earliest"})

	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %sn", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %vn", c)

	err = c.SubscribeTopics([]string{topics}, nil)

	run := true

	for run {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			ev := c.Poll(100)
			if ev == nil {
				continue
			}
			switch e := ev.(type) {
			case *kafka.Message:
				fmt.Printf("%%消费者consumerGroup%s Message on %s:\n%s\n", group, e.TopicPartition, string(e.Value))
				if e.Headers != nil {
					fmt.Printf("%% Headers: %v\n", e.Headers)
				}
			case kafka.Error:
				// Errors should generally be considered
				// informational, the client will try to
				// automatically recover.
				// But in this example we choose to terminate
				// the application if all brokers are down.
				fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
				if e.Code() == kafka.ErrAllBrokersDown {
					run = false
				}
			default:
				fmt.Printf("Ignored %v\n", e)
			}
		}
	}
	fmt.Printf("Closing consumer\n")
	c.Close()
}

 更多example

文章来源于互联网:golang使用kafka

DevOpskubernetes后端技术

【云原生 | Kubernetes篇】Kubernetes基础入门

2022-5-31 13:05:21

技术服务器开发

前端 CDNJS 库及 Google Fonts、Ajax 和 Gravatar 国内加速服务

2022-6-1 2:05:46

搜索