账号密码登录
微信安全登录
微信扫描二维码登录

登录后绑定QQ、微信即可实现信息互通

手机验证码登录
找回密码返回
邮箱找回 手机找回
注册账号返回
其他登录方式
分享
  • 收藏
    X
    python 如何链接kafka 本机是否要安装kafka?
    23
    0

    我电脑没有装过kafka直接用下面的代码模拟python kafka消费数据报错“kafka.errors.NoBrokersAvailable: NoBrokersAvailable” 是不是由于没有安装kafka的原因。
    如果我想模拟kafka读一个excle 然后我去从kafka消费这个excel的数据,请问该如何实现呢?
    如果有同学愿意帮助,愿提供话费酬谢,谢谢各位~

    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import json
    
    class Kafka_producer():
        '''
        使用kafka的生产模块
        '''
    
        def __init__(self, kafkahost,kafkaport, kafkatopic):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                kafka_host=self.kafkaHost,
                kafka_port=self.kafkaPort
                ))
    
        def sendjsondata(self, params):
            try:
                parmas_message = json.dumps(params)
                producer = self.producer
                producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
                producer.flush()
            except KafkaError as e:
                print (e)
    
    
    class Kafka_consumer():
        '''
        使用Kafka—python的消费模块
        '''
    
        def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.groupid = groupid
            self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                          bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                kafka_host=self.kafkaHost,
                kafka_port=self.kafkaPort ))
    
        def consume_data(self):
            try:
                for message in self.consumer:
                    # print json.loads(message.value)
                    yield message
            except KeyboardInterrupt as  e:
                print (e)
    
    
    def main():
        '''
        测试consumer和producer
        :return:
        '''
        ##测试生产模块
        producer = Kafka_producer("127.0.0.1", 9092, "ranktest")
        for id in range(10):
            params = '{abetst}:{null}---'+str(id)
            producer.sendjsondata(params)
        ##测试消费模块
        #消费模块的返回格式为ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,
        #\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,
        #\serialized_key_size=-1, serialized_value_size=21)
        consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest')
        message = consumer.consume_data()
        for i in message:
            print (i.value)
    
    
    if __name__ == '__main__':
        main()
    0
    打赏
    收藏
    点击回答
        全部回答
    • 0
    更多回答
    扫一扫访问手机版
    • 回到顶部
    • 回到顶部