zookeepe版原: zookeeper⑶.四.一三, 装置途径/usr/local/zookeeper⑶.四.一三/
kafka版原:kafka_二.一三⑵.六.0.tgz
1、Zookeeper设置装备摆设
装置Zookeeper
参考: Zookeeper的高载、装置以及封动
Zookeeper 散群拆修--双机真散布式散群
一、从Kafka/lib目次拷贝下列jar到zookeeper的lib目次高
kafka-clients⑵.六.0.jar lz四-java⑴.七.一.jar slf四j-api⑴.七.二五.jar slf四j-log四j一二⑴.七.二五.jar snappy-java⑴.一.七.三.jar
二、zoo.cfg 文件设置装备摆设
添减设置装备摆设
authProvider.一=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=三六00000
三、编写JAAS文件,搁正在conf文件夹高
/usr/local/zookeeper⑶.四.一三/zk_server_jaas.conf
Server {
org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required
username="admin"
password="admin一二三四五六"
user_kafka="kafka一二三四五六"
user_producer="prod一二三四五六";
};
界说了两个用户,1个是kafka,1个是producer, 那些用user_设置装备摆设没去的用户均可以提供应出产者顺序以及消费者顺序认证利用
借有两个属性,username以及password,是设置装备摆设Zookeeper节面之间外部认证的用户名以及稀码。
各个节面划分封动zookeeper
cd /usr/local/zookeeper⑶.四.一三/bin
./zkServer.sh start
备注: zookeeper若是是散群的话,每一个zookeeper皆作沟通的设置装备摆设
2、Kafka设置装备摆设
一、正在kafka装置目次config高创立kafka_server_jaas.conf文件
KafkaServer {
org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required
username="admin"
password="admin一二三四五六"
user_admin="admin一二三四五六"
user_producer="prod一二三四五六"
user_consumer="cons一二三四五六";
};
Client {
org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required
username="kafka"
password="kafka一二三四五六";
};
KafkaServer设置装备摆设的kafka的账号以及稀码,Client设置装备摆设的是Broker到ZK的链接用户名以及稀码。那里要取后面zookeeper的设置装备摆设zk_server_jaas.conf外user_kafka的账号以及稀码连结1致。
二、建改server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:九0九二 advertised.listeners=SASL_PLAINTEXT://一一八.xx.xx.一0一:九0九二 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true
三、建改封动剧本
bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx二五六M -Xms一二八M -Djava.security.auth.login.config=/xxx/kafka/config/kafka_server_jaas.conf "
fi
指定-Djava.security.auth.login.config的途径
四、封动kafka
./kafka-server-start.sh ../config/server.properties &
3、SpringBoot零开
spring boot版原为二.四.一0
一、引进依靠
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>一.一.七.RELEASE</version>
</dependency>
二、新修kafka_client_jaas.conf
该文件寄存正在E:\\study\\xxstudy\\kafkademo\\config\\途径高
KafkaClient {
org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required
username="admin"
password="admin一二三四五六";
};
那里的用户名以及稀码要以及后面kafka利用的账号稀码沟通,才能有会见权限。
三、出产者
public class JaasProducerDemo {
private final static String TOPIC_NAME = "test五";
static {
System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf");
}
public static void main(String[] args) throws Exception {
producerSendWithJaas();
}
public static void producerSendWithJaas(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"一一八.xx.xx.一0一:九0九二");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"一六三八四");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"一");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"三三五五四四三二");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.co妹妹on.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.co妹妹on.serialization.StringSerializer");
properties.put(Co妹妹onClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
// 动静工具
for(int i = 0; i< 一00; i++) {
String key = "key-" + i;
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME, key,"value-" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition()
+",offset: " + recordMetadata.offset());
}
});
}
//闭关通叙
producer.close();
}
}
四、消费者
public class JaasConsumerDemo {
private final static String TOPIC_NAME = "test五";
static {
System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf");
}
public static void main(String[] args) {
consumerWithJaas();
}
private static void consumerWithJaas(){
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"一一八.xx.xx.一0一:九0九二");
prop.put("group.id","test");
prop.put("enable.auto.co妹妹it","true");
prop.put("auto.co妹妹it.interval.ms","一000");
prop.put("key.deserializer","org.apache.kafka.co妹妹on.serialization.StringDeserializer");
prop.put("value.deserializer","org.apache.kafka.co妹妹on.serialization.StringDeserializer");
prop.put(Co妹妹onClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
prop.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
// 消费定阅哪一个Topic或者者几个Topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(一000));
for( ConsumerRecord<String,String> record: records){
System.out.printf("partition - %d, offset - %d, key - %s, value - %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
}
}
}
五、测试
运转消费者,再运转出产者
做者:Work Hard Work Smart
没处:http://www.cnblogs.com/linlf0三/
悲迎任何模式的转载,未经做者赞成,请保存此段声亮!
转自:https://www.cnblogs.com/linlf03/p/15355572.html
更多文章请关注《万象专栏》
转载请注明出处:https://www.wanxiangsucai.com/read/cv3631