前言:
而今咱们对“python 队列和多线程”都比较关心,各位老铁们都需要了解一些“python 队列和多线程”的相关内容。那么小编同时在网摘上汇集了一些关于“python 队列和多线程””的相关文章,希望小伙伴们能喜欢,咱们快快来学习一下吧!#!/usr/bin/python3import requestsimport loggingimport timeimport sysfrom queue import Queuefrom concurrent.futures import ThreadPoolExecutor, wait, as_completed, ALL_COMPLETEDimport threadingq = Queue(maxsize=100)def print_time(): return time.time()def post_influxdb(dbname,host='localhost',port='8086'): while True: # if q.empty(): # print(print_time(),"q is empty, exit !") # break data = q.get(timeout=1) q.task_done() if not data: print(print_time(),"q is empty, exit !") data_row = data[0] task_id = data[1] url = f"http://{host}:{port}/write?db={dbname}" try: r = requests.post(url=url,data=data_row) if r.status_code >= 400: print([print_time(),task_id,r.status_code, r.text]) if r.status_code == 404: raise InterruptedError return False # print([print_time(),task_id,r.status_code, r.text]) except Exception as e: print([print_time(),task_id,str(e)[0:100]]) # 读取文件放入队列def read_file(file_path): with open(file_path,'r') as file: counter = 0 for line in file: # time.sleep(5) # if q.full(): # print(f"{print_time()} 队列满,等待中.......{counter}") # time.sleep(1) q.put((line,counter,)) counter = counter + 1def main(file_path,dbname,max_workers=5): read_thread = threading.Thread(target=read_file,args=(file_path,)) read_thread.start() with ThreadPoolExecutor(max_workers=max_workers) as executor: task_list = [ executor.submit(post_influxdb,dbname) for i in range(max_workers)] q.join() wait(task_list,return_when=ALL_COMPLETED) read_thread.join()if __name__ == "__main__": # args = sys.argv # print(args) file_path = "./python_notes/influxdb/data.txt" dbname = "test" t1 = time.time() max_workers = 10 main(file_path=file_path,dbname=dbname,max_workers=max_workers) print("耗时(s):",time.time() - t1)
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #python 队列和多线程