前言:
此时大家对“python 读取摄像头数据”都比较着重,姐妹们都需要剖析一些“python 读取摄像头数据”的相关内容。那么小编也在网上汇集了一些对于“python 读取摄像头数据””的相关文章,希望咱们能喜欢,同学们一起来学习一下吧!最近有朋友咨询流媒体的一个场景,大概如下,需要将30fps的摄像头图像抓取,不需要传递音频,每帧图像压缩到512*384像素,不考虑网络时延的情况下,15ms内传递到服务器端
我们知道MAC Pro M1的摄像头获取的就是1080P,30fsp,我没有windows电脑,对C++也不熟悉,就全Python来涉及一个整体流程,看看性能如何。
整体架构和流程设计
整体架构如下,客户端分采采集和发送,采集采用opencv模块, resize到目标尺寸,并压缩成jpeg。这部分处理耗时主要在压缩,我实测大概2~3ms左右,如果不压缩,在网络上直接传输,那可能会导致传输时间过长,具体压缩和传输的时间损耗,这个需要进行性能测试优化。
压缩了之后图片大概在50K左右,传输基于TCP,我自定义了一个协议,第一个文件发送请求,携带了文件名称和图像生成时间T1,这个T1是图像刚捕获的时间,另外携带文件大小信息size,这部分信息用于服务端评估文件是否接受完成。
服务端接收到请求,响应客户端可以发送了。
客户端分片按序发送到服务端,服务端接收到分片,因为是按序接收的,在内存中拼接分片,所有分片接收完成,这个时刻记为T2,服务端则发送文件接收成功消息到客户端。同时服务端将内存中文件写入磁盘,记录T1和T2时间到文件名中用于性能分析。T2-T1的时间大致就是从摄像头捕获到服务端接收到图像的时间。
文件名记录的时间大致如下
对部分数据进行了近似统计,平均值在12ms左右
cost max: 14, min: 8, avg:12
客户端收到消息,此时记录T3时间,表明图像传递完成, T3-T1则大于需求中要求的从客户端到服务端的传递时间,因为从T2到T3这段还有网络时延,但基本可以比较准确的评估整体时延了。针对T3-T1的时间,进行了分析统计,大致满足需求,平均在8.2ms左右。
WARNING:root:camera size: (1280, 720)WARNING:root:cost max: 14.4ms, avg: 8.2msWARNING:root:total:10.04075288772583, fps: 29.0
比较有意思的时按照理论上来看,T3-T1 > T2-T1,但实际情况是前者在客户端统计是8.2ms,后者在服务端统计是12ms,可能有些人不太能理解。
这个原因很简单,因为客户端和服务端ms级时间不同步,我已经在服务端建立了NTP服务,客户端去跟服务端同步,但精度无法达到ms级以下。
可优化的重点方向
参照mp4和rtsp视频流这些策略, 比较简单的思路是摄像头的两个帧之间画面比较相似,像素级可以传递差异化部分压缩,这样数据量就会少很多,但这样处理需要前后帧关联处理,在服务端需要按序每个帧都解码。当然还可以采用流推送的方式来节省流量,减少网络传输时延。
所以具体场景下需要分析到底是CPU资源受限,还是网络带宽受限,不同的资源瓶颈优化策略和重点都不一样。
客户端图像捕获和处理
我们直接使用opencv来连接摄像头,获取图像,合格比较简单如下。
class Cap: def __init__(self): self.connect_camera() def connect_camera(self): self.cap = cv2.VideoCapture(0) self.size = (int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) def yield_img(self): logging.warning(f"camera size: {self.size}") while True: try: ret, frame = self.cap.read() # 读取之前获取时间错 timestamp = time.time() yield timestamp, frame except Exception as e: logging.error(f"get camera error: {e}")
获取到图像,按照目标尺寸resize,然后压缩成jpeg,采用自定义的协议,基于tcp单线程实时发送。
# sender = SendUDP(server_ip="127.0.0.1", server_port=udp_recv_port)sender = SendTCP(server_ip="192.168.3.22", server_port=udp_recv_port)c = Cap()width, height = 512, 384if width != c.size[0] or height != c.size[1]: resize = Trueelse: resize = Falsecost_list = []start = time.time()for ts, img in c.yield_img(): if resize: img_dst = cv2.resize(img, (width, height)) else: img_dst = img shape = img_dst.shape _, content = cv2.imencode(".jpeg", img_dst) # content = img_dst # logging.warning(f'convert cost:{time.time() - ts}') sender.send("test", content.tobytes(), timestamp=int(ts*1000)) # 发送到服务器 cost_list.append(time.time() - ts) if len(cost_list) > 300: logging.warning(f"cost max: {max(cost_list)*1000:.1f}ms, avg: {sum(cost_list)*1000/len(cost_list): .1f}ms") cost_total = time.time() - start logging.warning(f"total:{cost_total}, fps: {len(cost_list)//cost_total}") start = time.time() cost_list = [] break客户端TCP发送
客户端发送时纯文件发送,代码如下。
def send(self, device, content, timestamp: int, width: int=0, height: int=0) -> bool: """ :param device: :param content: :param timestamp: 时间戳,单位ms :return: """ block_total = math.ceil(len(content)/block_size) header = FileUdpHeader() header.block_total = block_total header.timestamp = timestamp header.width = width header.height = height name = f"{device}.{timestamp}.jpeg" header.name = name.encode(udp_encoding) header.size = len(content) # 文件发送请求 self.client_socket.send(header.encode()) rsp = self.client_socket.recv(1000) # 服务端是否接收到文件发送请求并等待接收 ack = self.parse(rsp) if ack != TCPRspType.start: logging.error(f"文件发送失败:{name},关闭tcp连接") self.connect() return False self.client_socket.send(content) # 发送完成等待服务端响应 try: rsp = self.client_socket.recv(1000) except: logging.error(f"文件接收响应超时:{name},关闭tcp连接") self.connect() return False ack = self.parse(rsp) if ack != TCPRspType.finish: logging.error(f"文件发送失败:{name},关闭tcp连接") self.connect() return False return True服务端TCP接收
服务端接收可以使用Python的TCP服务端的两个socket库,本demo主要是文件接收,所以比较简单,代码如下。
import osimport shutilimport timefrom socketserver import BaseRequestHandler, ThreadingTCPServerimport loggingfrom file_define import FileUdpHeader, udp_encoding, header_size, block_size, SendRsp, TCPRspTypeimport hashlibcache_dir = "/home/data/cache"dst_dir = "/home/data/dst"os.makedirs(cache_dir, exist_ok=True)os.makedirs(dst_dir, exist_ok=True)udp_recv_port = 37090class FileRec(BaseRequestHandler): def write(self, size: int, timestamp: int, name: str, content: bytes=b'', md5="") -> bool: file_path = os.path.join(cache_dir, name) with open(file_path, "ab") as f: f.write(content) self.file_size += len(content) if self.file_size < size: return False cur_time = int(time.time() * 1000) file_name = os.path.splitext(name)[0] file_dst = os.path.join(dst_dir, f"{file_name}_{cur_time}.jpeg") # 移动 shutil.move(file_path, file_dst) cur_time = int(time.time() * 1000) # logging.warning(f"文件{name}传输完毕@{self.peer_name},总大小约:{size}," # f"文件创建时间:{timestamp}, 当前时间:{cur_time}, 时间差:{cur_time-timestamp}") return True def handle(self) -> None: """ 单个请求处理 :return: """ self.peer_name = self.request.getpeername() while True: msg = self.request.recv(block_size) try: header = FileUdpHeader() header.decode(msg) self.file_size = 0 name = header.name.decode(udp_encoding) self.write(header.size, header.timestamp, name) start = SendRsp() start.name = header.name start.state = TCPRspType.start self.request.send(start.encode()) # 循环接收 while True: try: # logging.warning(f'{time.time()}:start recv {name} block') msg = self.request.recv(block_size) # logging.warning(f'{time.time()}:recv success {name} block') if len(msg) == 0: return if self.write(header.size, header.timestamp, name, msg): break except Exception as e: logging.error(f"recv {name} error: {e}") self.request.close() return finish = SendRsp() finish.name = header.name finish.state = TCPRspType.finish self.request.send(finish.encode()) except Exception as e: logging.error(f"parse msg error:{e}, reset connect") self.request.close() returnif __name__ == '__main__': # 多线程 logging.warning(f"开始监听{udp_recv_port}, 缓存目录:{cache_dir}, 目标目录:{dst_dir}") serv = ThreadingTCPServer(('', udp_recv_port), FileRec) serv.timeout = 5 serv.serve_forever()
标签: #python 读取摄像头数据