# -*- coding: utf-8 -*- from kafka import KafkaProducer #from kafka import KafkaConsumer #from kafka.errors import KafkaError import time def main(): ##生产模块 producer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092','192.168.xx.xx:9092','192.168.xx.xx:9092','192.168.xx.xx:9092']) #这里配置的kafka的地址,一般会将kafka所有节点都列出来,>不同地址之间用,号分隔。 with open('/home/qjzh/miniconda/envs/water_meter2/downloads/ml-100k/u.user','r',encoding='UTF-8') as f: for line in f.readlines(): time.sleep(1) producer.send("100kuser",bytes(line,'utf-8'))#kafka发送实时数据修似乎必须先转换成bytes格式,否则会出错。 print(line) #producer.flush() if __name__ == '__main__': main()