龙空技术网

Python使用Redis和Queue模块实现消息队列

312每天一个短知识 187

前言:

而今你们对“pythonredisqueue”可能比较讲究,姐妹们都想要学习一些“pythonredisqueue”的相关内容。那么小编同时在网摘上收集了一些关于“pythonredisqueue””的相关内容,希望你们能喜欢,小伙伴们快快来了解一下吧!

首先给大家讲些理论知识。

什么是队列?队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。【百度百科】

消息队列主要用来暂存生产者生产的消息,供后续其他消费者来消费。消息队列对数据的消费没有顺序要求。消息队列主要解决应用异步解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

今天为大家介绍使用Python的Queue模块和Redis模块分别实现的消息队列。

一、Redis模块实现的消息队列

1、Redis消息队列类

#主要实现了消息队列的核心功能#入队列、出队列、查看队列长度import redis"""@author 312每天一个短知识@desc Redis队列类@date 2022-04-28 23:48:12"""class RedisQueue:		"""	@author 312每天一个短知识	@desc 初始化构造函数 	@date 2022-04-28 23:48:12	"""	def __init__(self):		self._conn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)	"""	@author 312每天一个短知识	@desc push队列数据 	@date 2022-04-28 23:48:12	"""	def lpush(self, key, value):		return self._conn.lpush(key, value)		"""	@author 312每天一个短知识	@desc pop队列数据	@date 2022-04-28 23:48:12	"""	def rpop(self, key):		return self._conn.rpop(key)	"""	@author 312每天一个短知识	@desc 队列长度 	@date 2022-04-28 23:48:12	"""	def llen(self, key):		return self._conn.llen(key)

2、Redis消息队列生产数据(测试)

#类构造函数push数据import timeimport jsonfrom redis_queue import RedisQueueclass RedisQueueConsume:	REDIS_QUEUE_KEY = 't21' 	def __init__(self):			self._rqueue = RedisQueue()		data = {"a":1,"b":2}		self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data))		data = {"a":2,"b":3}		self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data))		data = {"a":3,"b":4}		self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data))		data = {"a":4,"b":5}		self._rqueue.lpush(self.REDIS_QUEUE_KEY, json.dumps(data))if __name__=="__main__":	rconsume = RedisQueueConsume()	rconsume.run()

3、Redis消息队列消费数据(测试)【先进先出】

#增加数据处理阈值防止内存溢出和执行超时import timeimport jsonfrom redis_queue import RedisQueueclass RedisQueueConsume:	REDIS_QUEUE_KEY = 't21'	MAX_HANDLE_NUM = 100	def __init__(self):			self._rqueue = RedisQueue()	def run(self):		num = 0 		while True:			res = self._rqueue.rpop(self.REDIS_QUEUE_KEY)			if res is None or self.MAX_HANDLE_NUM < num:				break			data = json.loads(res)			print(data)			#业务处理			self._handle(data)			time.sleep(1)			num = num + 1	def _handle(self, data):		passif __name__=="__main__":	rconsume = RedisQueueConsume()	rconsume.run() 执行结果(先进先出):{'a': 1, 'b': 2}{'a': 2, 'b': 3}{'a': 3, 'b': 4}{'a': 4, 'b': 5}

二、Queue模块实现的消息队列

Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构即队列,用来在生产者和消费者线程之间的信息传递。队列机制包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。

1、Queue官方文档

2、Queue消息队列类

#queue模块本身实现了队列的很多功能所以只是简单的进行代码封装import queue"""@author 312每天一个短知识@desc Python队列类@date 2022-04-28 23:48:12"""class PythonQueue:		"""	@author 312每天一个短知识	@desc 初始化构造函数 	@date 2022-04-28 23:48:12	"""	def __init__(self):		self._queue = queue.Queue()	"""	@author 312每天一个短知识	@desc put队列数据 	@date 2022-04-28 23:48:12	"""	def put(self, value):		return self._queue.put(value)		"""	@author 312每天一个短知识	@desc get队列数据	@date 2022-04-28 23:48:12	"""	def get(self):		if self._queue.empty():			return None		return self._queue.get()	"""	@author 312每天一个短知识	@desc 队列长度 	@date 2022-04-28 23:48:12	"""	def qsize(self):		return self._queue.qsize()

3、Queue消息队列生产数据(测试)

import timeimport jsonfrom module_queue import PythonQueueclass PythonQueueConsume:	def __init__(self):			self._queue = PythonQueue()		data = {"a":1,"b":2}		self._queue.put(json.dumps(data))		data = {"a":2,"b":3}		self._queue.put(json.dumps(data))		data = {"a":3,"b":4}		self._queue.put(json.dumps(data))		data = {"a":4,"b":5}		self._queue.put(json.dumps(data))if __name__=="__main__":	mconsume = PythonQueueConsume()	mconsume.run()

4、Queue消息队列消费数据(测试)【先进先出】

import timeimport jsonfrom module_queue import PythonQueueclass PythonQueueConsume:	MAX_HANDLE_NUM = 100	def __init__(self):			self._queue = PythonQueue()	def run(self):		num = 0 		while True:			res = self._queue.get()			if res is None or self.MAX_HANDLE_NUM < num:				break			data = json.loads(res)			print(data)			#业务处理			self._handle(data)			time.sleep(1)			num = num + 1	def _handle(self, data):		passif __name__=="__main__":	mconsume = PythonQueueConsume()	mconsume.run()执行结果:{'a': 1, 'b': 2}{'a': 2, 'b': 3}{'a': 3, 'b': 4}{'a': 4, 'b': 5}

5、Queue消息队列长度(测试)

import timeimport jsonfrom module_queue import PythonQueueclass PythonQueueConsume:    	def __init__(self):			self._queue = PythonQueue()	def run(self):		print(self._queue.qsize())if __name__=="__main__":	mconsume = PythonQueueConsume()	mconsume.run()

感谢大家的评论、点赞、分享、关注。。。

标签: #pythonredisqueue