账号密码登录
微信安全登录
微信扫描二维码登录

登录后绑定QQ、微信即可实现信息互通

手机验证码登录
找回密码返回
邮箱找回 手机找回
注册账号返回
其他登录方式
分享
  • 收藏
    X
    在springboot 中使用kafka 进行消费,启动失败报错
    32
    0

    consumer
    代码如下,其他还有一部分为mvc的代码

    package com.settlement.kafka;
    
    
    import org.springframework.kafka.annotation.KafkaListener;
    
    public class Consumer {
        @KafkaListener(topics = {"${kafka.topic}"})
        public void consumer(String msg){
            System.out.println(msg);
        }
    }
    
    package com.settlement.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
        @Value("${kafka.bootstrap-server}")
        private String bootstrapServers;
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
            ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(5000);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String,String> consumerFactory(){
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String,Object> consumerConfigs(){
            Map<String,Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//自行控制提交offset
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");//提交延迟毫秒数
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");//执行超时时间
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"dsafas");
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//开始消费位置
            return propsMap;
        }
    
        @Bean
        public Consumer listener(){
            return new Consumer();
        }
    }
    
    Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
    2018-02-26 20:47:26.256 ERROR 47792 --- [           main] o.s.b.SpringApplication                  : Application startup failed
    
    java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:463) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(MessagingMessageListenerAdapter.java:106) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.<init>(RecordMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListenerInstance(MethodKafkaListenerEndpoint.java:172) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:374) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:359) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:246) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:49) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:243) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.12.RELEASE.jar:4.3.12.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
        at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at 
    settlement.Application.main(Application.java:12) [classes/:?]
    
    2018-02-26 20:47:26.261  INFO 47792 --- [           main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
    2018-02-26 20:47:26.263  WARN 47792 --- [           main] ationConfigEmbeddedWebApplicationContext : Exception thrown from LifecycleProcessor on context close
    
    java.lang.IllegalStateException: LifecycleProcessor not initialized - call 'refresh' before invoking lifecycle methods via the context: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
        at org.springframework.context.support.AbstractApplicationContext.getLifecycleProcessor(AbstractApplicationContext.java:427) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:999) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:958) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
        at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:750) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
        at settlement.Application.main(Application.java:12) [classes/:?]
    
    2018-02-26 20:47:26.264  INFO 47792 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
    
    Process finished with exit code 1
    
    1
    打赏
    收藏
    点击回答
        全部回答
    • 0
    • 水木皿 普通会员 1楼
      502 Bad Gateway

      502 Bad Gateway


      nginx
    更多回答
    扫一扫访问手机版
    • 回到顶部
    • 回到顶部