You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

17 lines
819 B

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
from kafka import KafkaProducer
#from kafka import KafkaConsumer
#from kafka.errors import KafkaError
import time
def main():
##生产模块
producer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092','192.168.xx.xx:9092','192.168.xx.xx:9092','192.168.xx.xx:9092']) #这里配置的kafka的地址一般会将kafka所有节点都列出来>不同地址之间用,号分隔。
with open('/home/qjzh/miniconda/envs/water_meter2/downloads/ml-100k/u.user','r',encoding='UTF-8') as f:
for line in f.readlines():
time.sleep(1)
producer.send("100kuser",bytes(line,'utf-8'))#kafka发送实时数据修似乎必须先转换成bytes格式否则会出错。
print(line)
#producer.flush()
if __name__ == '__main__':
main()