下面一个客户端代码例子访问kafka服务器,来发送和接受消息。
使用方式
1、命令行参数
$ ./kafkaclient -h Usage of ./client: -ca string CA Certificate (default "ca.pem") -cert string Client Certificate (default "cert.pem") -command string consumer|producer (default "consumer") -host string Common separated kafka hosts (default "localhost:9093") -key string Client Key (default "key.pem") -partition int Kafka topic partition -tls TLS enable -topic string Kafka topic (default "test--topic")
2、作为producer启动
$ ./kafkaclient -command producer -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command producer -tls -cert client.pem -key client.key -ca ca.pem -host kafka1:9093,kafka2:9093
producer发送消息给kafka:
> aaa 2018/12/15 07:11:21 Produced message: [aaa] > bbb 2018/12/15 07:11:30 Produced message: [bbb] > quit
3、作为consumer启动
$ ./kafkaclient -command consumer -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command consumer -tls -cert client.pem -key client.key -ca ca.pem -host kafka1:9093,kafka2:9093
consumer从kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]
完整源代码如下
这个代码使用到了Shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go package main import ( "flag" "fmt" "log" "os" "io/ioutil" "bufio" "strings" "crypto/tls" "crypto/x509" "github.com/Shopify/sarama" ) var ( command string tlsEnable bool hosts string topic string partition int clientcert string clientkey string cacert string ) func main() { flag.StringVar(&command, "command", "consumer", "consumer|producer") flag.BoolVar(&tlsEnable, "tls", false, "TLS enable") flag.StringVar(&hosts, "host", "localhost:9093", "Common separated kafka hosts") flag.StringVar(&topic, "topic", "test--topic", "Kafka topic") flag.IntVar(&partition, "partition", 0, "Kafka topic partition") flag.StringVar(&clientcert, "cert", "cert.pem", "Client Certificate") flag.StringVar(&clientkey, "key", "key.pem", "Client Key") flag.StringVar(&cacert, "ca", "ca.pem", "CA Certificate") flag.Parse() config := sarama.NewConfig() if tlsEnable { //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert) if err != nil { log.Fatal(err) } config.Net.TLS.Enable = true config.Net.TLS.Config = tlsConfig } client, err := sarama.NewClient(strings.Split(hosts, ","), config) if err != nil { log.Fatalf("unable to create kafka client: %q", err) } if command == "consumer" { consumer, err := sarama.NewConsumerFromClient(client) if err != nil { log.Fatal(err) } defer consumer.Close() loopConsumer(consumer, topic, partition) } else { producer, err := sarama.NewAsyncProducerFromClient(client) if err != nil { log.Fatal(err) } defer producer.Close() loopProducer(producer, topic, partition) } } func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) { // load client cert clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile) if err != nil { return nil, err } // load ca cert pool cacert, err := ioutil.ReadFile(cacertfile) if err != nil { return nil, err } cacertpool := x509.NewCertPool() cacertpool.AppendCertsFromPEM(cacert) // generate tlcconfig tlsConfig := tls.Config{} tlsConfig.RootCAs = cacertpool tlsConfig.Certificates = []tls.Certificate{clientcert} tlsConfig.BuildNameToCertificate() // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert: return &tlsConfig, err } func loopProducer(producer sarama.AsyncProducer, topic string, partition int) { scanner := bufio.NewScanner(os.Stdin) fmt.Print("> ") for scanner.Scan() { text := scanner.Text() if text == "" { } else if text == "exit" || text == "quit" { break } else { producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)} log.Printf("Produced message: [%s]\n",text) } fmt.Print("> ") } } func loopConsumer(consumer sarama.Consumer, topic string, partition int) { partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { log.Println(err) return } defer partitionConsumer.Close() for { msg := <-partitionConsumer.Messages() log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset) } }
编译:
$ go build kafkaclient.go
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件!
如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
暂无“golang如何使用sarama访问kafka”评论...
P70系列延期,华为新旗舰将在下月发布
3月20日消息,近期博主@数码闲聊站 透露,原定三月份发布的华为新旗舰P70系列延期发布,预计4月份上市。
而博主@定焦数码 爆料,华为的P70系列在定位上已经超过了Mate60,成为了重要的旗舰系列之一。它肩负着重返影像领域顶尖的使命。那么这次P70会带来哪些令人惊艳的创新呢?
根据目前爆料的消息来看,华为P70系列将推出三个版本,其中P70和P70 Pro采用了三角形的摄像头模组设计,而P70 Art则采用了与上一代P60 Art相似的不规则形状设计。这样的外观是否好看见仁见智,但辨识度绝对拉满。
更新动态
2024年05月02日
2024年05月02日
- 徐凤仪《在角落唱着歌》[FLAC/分轨][244.43MB]
- 【柔顺爵士(P)】Streetwize-2024-LiftMeUp(FLAC)
- 【爵士沙发】Magmatunes-2024-FlyingHigh(FLAC)
- 玖月奇迹《试音天碟K2母带》[WAV+CUE]
- 张美玲.1998-世界第一等【南方】【WAV+CUE】
- 梦剧院.1988-飘去夏天(2018三十周年纪念版)【乐意唱片】【WAV+CUE】
- 云朵.2011-云朵【啊呀啦嗦】【WAV+CUE】
- 群星.1998-琼瑶98年度主题曲精选大碟【上华】【WAV+CUE】
- 胡鸿钧.2023-我们在结束时开始新曲+精丫TVB.MUSIC】【WAV+CUE】
- 任洁玲.1995-我们有没有爱过【飞碟】【WAV+CUE】
- 海来阿木《西楼情歌》1:1母版直刻[320K/MP3][139.61MB]
- 海来阿木《西楼情歌》1:1母版直刻[FLAC/分轨][344.97MB]
- 群星《影视剧 群星闪耀时 影视原声带》[320K/MP3][38.69MB]
- 姜育恒.1998-成名金曲(马来西亚瑞华金碟珍藏版)【瑞华】【WAV+CUE】
- 水木年华.2006-生命狂想曲特别版2CD【水木同创】【WAV+CUE】