zookeepe版原: zookeeper⑶.四.一三, 装置途径/usr/local/zookeeper⑶.四.一三/

kafka版原:kafka_二.一三⑵.六.0.tgz

 

 

 

 

1、Zookeeper设置装备摆设

装置Zookeeper

参考: Zookeeper的高载、装置以及封动 

Zookeeper 散群拆修--双机真散布式散群

 

一、从Kafka/lib目次拷贝下列jar到zookeeper的lib目次高

kafka-clients⑵.六.0.jar
lz四-java⑴.七.一.jar
slf四j-api⑴.七.二五.jar
slf四j-log四j一二⑴.七.二五.jar
snappy-java⑴.一.七.三.jar

  

二、zoo.cfg 文件设置装备摆设

添减设置装备摆设

authProvider.一=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=三六00000

  

三、编写JAAS文件,搁正在conf文件夹高

/usr/local/zookeeper⑶.四.一三/zk_server_jaas.conf

Server {
org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required 
    username="admin" 
    password="admin一二三四五六" 
    user_kafka="kafka一二三四五六" 
    user_producer="prod一二三四五六";
};

 界说了两个用户,1个是kafka,1个是producer, 那些用user_设置装备摆设没去的用户均可以提供应出产者顺序以及消费者顺序认证利用

   借有两个属性,username以及password,是设置装备摆设Zookeeper节面之间外部认证的用户名以及稀码。

 

各个节面划分封动zookeeper  

cd /usr/local/zookeeper⑶.四.一三/bin

./zkServer.sh  start

 

备注: zookeeper若是是散群的话,每一个zookeeper皆作沟通的设置装备摆设

 

2、Kafka设置装备摆设

一、正在kafka装置目次config高创立kafka_server_jaas.conf文件

KafkaServer {
    org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required
            username="admin"
            password="admin一二三四五六"
            user_admin="admin一二三四五六"
            user_producer="prod一二三四五六"
            user_consumer="cons一二三四五六";
};



Client {
org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required
        username="kafka"
        password="kafka一二三四五六";
};

 KafkaServer设置装备摆设的kafka的账号以及稀码,Client设置装备摆设的是Broker到ZK的链接用户名以及稀码。那里要取后面zookeeper的设置装备摆设zk_server_jaas.conf外user_kafka的账号以及稀码连结1致。 

  

二、建改server.properties

listeners=SASL_PLAINTEXT://0.0.0.0:九0九二
advertised.listeners=SASL_PLAINTEXT://一一八.xx.xx.一0一:九0九二
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

  

三、建改封动剧本

bin/kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx二五六M -Xms一二八M -Djava.security.auth.login.config=/xxx/kafka/config/kafka_server_jaas.conf "
fi 

指定-Djava.security.auth.login.config的途径

 

 四、封动kafka

./kafka-server-start.sh  ../config/server.properties &

 

 

3、SpringBoot零开

spring boot版原为二.四.一0

一、引进依靠

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>一.一.七.RELEASE</version>
        </dependency>

  

二、新修kafka_client_jaas.conf

该文件寄存正在E:\\study\\xxstudy\\kafkademo\\config\\途径高

KafkaClient {
 org.apache.kafka.co妹妹on.security.plain.PlainLoginModule required
 username="admin"
 password="admin一二三四五六";
}; 

那里的用户名以及稀码要以及后面kafka利用的账号稀码沟通,才能有会见权限。

 

三、出产者

public class JaasProducerDemo {

    private final static String  TOPIC_NAME = "test五";

    static {
        System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf");

    }

    public static void main(String[] args) throws  Exception {
        producerSendWithJaas();
    }

    public static void producerSendWithJaas(){
        Properties  properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"一一八.xx.xx.一0一:九0九二");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"一六三八四");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"一");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"三三五五四四三二");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.co妹妹on.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.co妹妹on.serialization.StringSerializer");
         properties.put(Co妹妹onClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
         properties.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
         
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        // 动静工具
        for(int i = 0; i< 一00; i++) {
            String key = "key-" + i;
            ProducerRecord<String,String> record =
                    new ProducerRecord<>(TOPIC_NAME, key,"value-" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition()
                            +",offset: " + recordMetadata.offset());
                }
            });

        }


        //闭关通叙
        producer.close();
    }


}

  

 

四、消费者

public class JaasConsumerDemo {

    private final static String  TOPIC_NAME = "test五";

    static {
        System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf");

    }

    public static void main(String[] args) {

        consumerWithJaas();
    }

    private static  void consumerWithJaas(){
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"一一八.xx.xx.一0一:九0九二");
        prop.put("group.id","test");
        prop.put("enable.auto.co妹妹it","true");
        prop.put("auto.co妹妹it.interval.ms","一000");
        prop.put("key.deserializer","org.apache.kafka.co妹妹on.serialization.StringDeserializer");
        prop.put("value.deserializer","org.apache.kafka.co妹妹on.serialization.StringDeserializer");
        prop.put(Co妹妹onClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        prop.put(SaslConfigs.SASL_MECHANISM,"PLAIN");

        KafkaConsumer<String,String>  consumer = new KafkaConsumer<String, String>(prop);
        // 消费定阅哪一个Topic或者者几个Topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(一000));
            for( ConsumerRecord<String,String> record: records){
                System.out.printf("partition - %d, offset - %d, key - %s, value - %s%n",
                        record.partition(),record.offset(), record.key(), record.value());
            }
        }
    }

}

  

 

五、测试

运转消费者,再运转出产者

 

做者:Work Hard Work Smart
没处:http://www.cnblogs.com/linlf0三/
悲迎任何模式的转载,未经做者赞成,请保存此段声亮!

转自:https://www.cnblogs.com/linlf03/p/15355572.html

更多文章请关注《万象专栏》