kafka 安装及基础概念介绍可以参考:kafka 安装、配置、启动_王安的博客-CSDN博客_kafka安装启动
本文主要介绍confluent-kafka-go的使用方法。confluent-kafka-go,简单易用,并且表现稳定,是kafka官网推荐的golang package。
https://github.com/confluentinc/confluent-kafka-go
一. 下载go client
go get -v github.com/confluentinc/confluent-kafka-go
二 example
2.1 创建topic
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"os/signal"
"strconv"
"syscall"
"time"
)
const Topic = "testTopic"
const Broker = "127.0.0.1:9192"
const NumPartition = 2
const ReplicationFactor = 1
const ConsumerGroup1 = "consumerTest1"
func main() {
fmt.Println("Kafka Demo RUN:")
//创建topic
KafkaCreateTopic()
//创建生产者
go KafkaProducer()
//创建消费者
go KafkaConsumer("group1")
go KafkaConsumer("group1")
go KafkaConsumer("group2")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
run := true
for run {
select {
case sig := {
Topic: Topic,
NumPartitions: NumPartition,
ReplicationFactor: ReplicationFactor}},
// Admin options
kafka.SetAdminOperationTimeout(maxDur))
if err != nil {
fmt.Printf("Failed to create topic: %vn", err)
os.Exit(1)
}
// Print results
for _, result := range results {
fmt.Printf("%sn", result)
}
a.Close()
}
2.2 Producer
/*
消息生产者
*/
func KafkaProducer() {
topic := Topic
broker := Broker
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
fmt.Printf("Failed to create producer: %sn", err)
os.Exit(1)
}
fmt.Printf("Created Producer %vn", p)
// Optional delivery channel, if not specified the Producer object's
// .Events channel is used.
deliveryChan := make(chan kafka.Event)
//每5s向kafka发送一条消息
n := 0
for {
n++
value := strconv.Itoa(n) + " Hello Go!"
err = p.Produce(kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)
e := <- deliveryChan
m := e.(*kafka.Message)
fmt.Printf("生产者:Delivery failed: %v\n", m.TopicPartition.Error)
if m.TopicPartition.Error != nil {
fmt.Printf("生产者:Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("生产者:Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
time.Sleep(5 * time.Second)
}
close(deliveryChan)
}
2.3 Consumer
func KafkaConsumer(consumerGroup string) {
broker := Broker
group := consumerGroup
topics := Topic
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
// Avoid connecting to IPv6 brokers:
// This is needed for the ErrAllBrokersDown show-case below
// when using localhost brokers on OSX, since the OSX resolver
// will return the IPv6 addresses first.
// You typically don't need to specify this configuration property.
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %sn", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %vn", c)
err = c.SubscribeTopics([]string{topics}, nil)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%%消费者consumerGroup%s Message on %s:\n%s\n", group, e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
// automatically recover.
// But in this example we choose to terminate
// the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
文章来源于互联网:golang使用kafka