前言:
而今你们对“pythonredisqueue”可能比较讲究,姐妹们都想要学习一些“pythonredisqueue”的相关内容。那么小编同时在网摘上收集了一些关于“pythonredisqueue””的相关内容,希望你们能喜欢,小伙伴们快快来了解一下吧!首先给大家讲些理论知识。
什么是队列?队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。【百度百科】
消息队列主要用来暂存生产者生产的消息,供后续其他消费者来消费。消息队列对数据的消费没有顺序要求。消息队列主要解决应用异步解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
今天为大家介绍使用Python的Queue模块和Redis模块分别实现的消息队列。
一、Redis模块实现的消息队列
1、Redis消息队列类
#主要实现了消息队列的核心功能#入队列、出队列、查看队列长度import redis"""@author 312每天一个短知识@desc Redis队列类@date 2022-04-28 23:48:12"""class RedisQueue: """ @author 312每天一个短知识 @desc 初始化构造函数 @date 2022-04-28 23:48:12 """ def __init__(self): self._conn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0) """ @author 312每天一个短知识 @desc push队列数据 @date 2022-04-28 23:48:12 """ def lpush(self, key, value): return self._conn.lpush(key, value) """ @author 312每天一个短知识 @desc pop队列数据 @date 2022-04-28 23:48:12 """ def rpop(self, key): return self._conn.rpop(key) """ @author 312每天一个短知识 @desc 队列长度 @date 2022-04-28 23:48:12 """ def llen(self, key): return self._conn.llen(key)
2、Redis消息队列生产数据(测试)
#类构造函数push数据import timeimport jsonfrom redis_queue import RedisQueueclass RedisQueueConsume: REDIS_QUEUE_KEY = 't21' def __init__(self): self._rqueue = RedisQueue() data = {"a":1,"b":2} self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data)) data = {"a":2,"b":3} self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data)) data = {"a":3,"b":4} self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data)) data = {"a":4,"b":5} self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data))if __name__=="__main__": rconsume = RedisQueueConsume() rconsume.run()
3、Redis消息队列消费数据(测试)【先进先出】
#增加数据处理阈值防止内存溢出和执行超时import timeimport jsonfrom redis_queue import RedisQueueclass RedisQueueConsume: REDIS_QUEUE_KEY = 't21' MAX_HANDLE_NUM = 100 def __init__(self): self._rqueue = RedisQueue() def run(self): num = 0 while True: res = self._rqueue.rpop(self.REDIS_QUEUE_KEY) if res is None or self.MAX_HANDLE_NUM < num: break data = json.loads(res) print(data) #业务处理 self._handle(data) time.sleep(1) num = num + 1 def _handle(self, data): passif __name__=="__main__": rconsume = RedisQueueConsume() rconsume.run() 执行结果(先进先出):{'a': 1, 'b': 2}{'a': 2, 'b': 3}{'a': 3, 'b': 4}{'a': 4, 'b': 5}
二、Queue模块实现的消息队列
Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构即队列,用来在生产者和消费者线程之间的信息传递。队列机制包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。
1、Queue官方文档
2、Queue消息队列类
#queue模块本身实现了队列的很多功能所以只是简单的进行代码封装import queue"""@author 312每天一个短知识@desc Python队列类@date 2022-04-28 23:48:12"""class PythonQueue: """ @author 312每天一个短知识 @desc 初始化构造函数 @date 2022-04-28 23:48:12 """ def __init__(self): self._queue = queue.Queue() """ @author 312每天一个短知识 @desc put队列数据 @date 2022-04-28 23:48:12 """ def put(self, value): return self._queue.put(value) """ @author 312每天一个短知识 @desc get队列数据 @date 2022-04-28 23:48:12 """ def get(self): if self._queue.empty(): return None return self._queue.get() """ @author 312每天一个短知识 @desc 队列长度 @date 2022-04-28 23:48:12 """ def qsize(self): return self._queue.qsize()
3、Queue消息队列生产数据(测试)
import timeimport jsonfrom module_queue import PythonQueueclass PythonQueueConsume: def __init__(self): self._queue = PythonQueue() data = {"a":1,"b":2} self._queue.put(json.dumps(data)) data = {"a":2,"b":3} self._queue.put(json.dumps(data)) data = {"a":3,"b":4} self._queue.put(json.dumps(data)) data = {"a":4,"b":5} self._queue.put(json.dumps(data))if __name__=="__main__": mconsume = PythonQueueConsume() mconsume.run()
4、Queue消息队列消费数据(测试)【先进先出】
import timeimport jsonfrom module_queue import PythonQueueclass PythonQueueConsume: MAX_HANDLE_NUM = 100 def __init__(self): self._queue = PythonQueue() def run(self): num = 0 while True: res = self._queue.get() if res is None or self.MAX_HANDLE_NUM < num: break data = json.loads(res) print(data) #业务处理 self._handle(data) time.sleep(1) num = num + 1 def _handle(self, data): passif __name__=="__main__": mconsume = PythonQueueConsume() mconsume.run()执行结果:{'a': 1, 'b': 2}{'a': 2, 'b': 3}{'a': 3, 'b': 4}{'a': 4, 'b': 5}
5、Queue消息队列长度(测试)
import timeimport jsonfrom module_queue import PythonQueueclass PythonQueueConsume: def __init__(self): self._queue = PythonQueue() def run(self): print(self._queue.qsize())if __name__=="__main__": mconsume = PythonQueueConsume() mconsume.run()
感谢大家的评论、点赞、分享、关注。。。
标签: #pythonredisqueue