下面一个客户端代码例子访问kafka服务器,来发送和接受消息。
使用方式
1、命令行参数
$ ./kafkaclient -h Usage of ./client: -ca string CA Certificate (default "ca.pem") -cert string Client Certificate (default "cert.pem") -command string consumer|producer (default "consumer") -host string Common separated kafka hosts (default "localhost:9093") -key string Client Key (default "key.pem") -partition int Kafka topic partition -tls TLS enable -topic string Kafka topic (default "test--topic")
2、作为producer启动
$ ./kafkaclient -command producer -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command producer -tls -cert client.pem -key client.key -ca ca.pem -host kafka1:9093,kafka2:9093
producer发送消息给kafka:
> aaa 2018/12/15 07:11:21 Produced message: [aaa] > bbb 2018/12/15 07:11:30 Produced message: [bbb] > quit
3、作为consumer启动
$ ./kafkaclient -command consumer -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command consumer -tls -cert client.pem -key client.key -ca ca.pem -host kafka1:9093,kafka2:9093
consumer从kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]
完整源代码如下
这个代码使用到了Shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go
package main
import (
"flag"
"fmt"
"log"
"os"
"io/ioutil"
"bufio"
"strings"
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
)
var (
command string
tlsEnable bool
hosts string
topic string
partition int
clientcert string
clientkey string
cacert string
)
func main() {
flag.StringVar(&command, "command", "consumer", "consumer|producer")
flag.BoolVar(&tlsEnable, "tls", false, "TLS enable")
flag.StringVar(&hosts, "host", "localhost:9093", "Common separated kafka hosts")
flag.StringVar(&topic, "topic", "test--topic", "Kafka topic")
flag.IntVar(&partition, "partition", 0, "Kafka topic partition")
flag.StringVar(&clientcert, "cert", "cert.pem", "Client Certificate")
flag.StringVar(&clientkey, "key", "key.pem", "Client Key")
flag.StringVar(&cacert, "ca", "ca.pem", "CA Certificate")
flag.Parse()
config := sarama.NewConfig()
if tlsEnable {
//sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
if err != nil {
log.Fatal(err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
if command == "consumer" {
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
loopConsumer(consumer, topic, partition)
} else {
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
loopProducer(producer, topic, partition)
}
}
func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
// load client cert
clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
if err != nil {
return nil, err
}
// load ca cert pool
cacert, err := ioutil.ReadFile(cacertfile)
if err != nil {
return nil, err
}
cacertpool := x509.NewCertPool()
cacertpool.AppendCertsFromPEM(cacert)
// generate tlcconfig
tlsConfig := tls.Config{}
tlsConfig.RootCAs = cacertpool
tlsConfig.Certificates = []tls.Certificate{clientcert}
tlsConfig.BuildNameToCertificate()
// tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
return &tlsConfig, err
}
func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("> ")
for scanner.Scan() {
text := scanner.Text()
if text == "" {
} else if text == "exit" || text == "quit" {
break
} else {
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
}
fmt.Print("> ")
}
}
func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
log.Println(err)
return
}
defer partitionConsumer.Close()
for {
msg := <-partitionConsumer.Messages()
log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
}
}
编译:
$ go build kafkaclient.go
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新动态
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]