龙空技术网

Python进行摄像头采集和图像客户端服务器之间传送demo

南站往南 229

前言:

此时大家对“python 读取摄像头数据”都比较着重,姐妹们都需要剖析一些“python 读取摄像头数据”的相关内容。那么小编也在网上汇集了一些对于“python 读取摄像头数据””的相关文章,希望咱们能喜欢,同学们一起来学习一下吧!

最近有朋友咨询流媒体的一个场景,大概如下,需要将30fps的摄像头图像抓取,不需要传递音频,每帧图像压缩到512*384像素,不考虑网络时延的情况下,15ms内传递到服务器端

我们知道MAC Pro M1的摄像头获取的就是1080P,30fsp,我没有windows电脑,对C++也不熟悉,就全Python来涉及一个整体流程,看看性能如何。

整体架构和流程设计

整体架构如下,客户端分采采集和发送,采集采用opencv模块, resize到目标尺寸,并压缩成jpeg。这部分处理耗时主要在压缩,我实测大概2~3ms左右,如果不压缩,在网络上直接传输,那可能会导致传输时间过长,具体压缩和传输的时间损耗,这个需要进行性能测试优化。

压缩了之后图片大概在50K左右,传输基于TCP,我自定义了一个协议,第一个文件发送请求,携带了文件名称和图像生成时间T1,这个T1是图像刚捕获的时间,另外携带文件大小信息size,这部分信息用于服务端评估文件是否接受完成。

服务端接收到请求,响应客户端可以发送了。

客户端分片按序发送到服务端,服务端接收到分片,因为是按序接收的,在内存中拼接分片,所有分片接收完成,这个时刻记为T2,服务端则发送文件接收成功消息到客户端。同时服务端将内存中文件写入磁盘,记录T1和T2时间到文件名中用于性能分析。T2-T1的时间大致就是从摄像头捕获到服务端接收到图像的时间。

文件名记录的时间大致如下

对部分数据进行了近似统计,平均值在12ms左右

cost max: 14, min: 8, avg:12

客户端收到消息,此时记录T3时间,表明图像传递完成, T3-T1则大于需求中要求的从客户端到服务端的传递时间,因为从T2到T3这段还有网络时延,但基本可以比较准确的评估整体时延了。针对T3-T1的时间,进行了分析统计,大致满足需求,平均在8.2ms左右。

WARNING:root:camera size: (1280, 720)WARNING:root:cost max: 14.4ms, avg:  8.2msWARNING:root:total:10.04075288772583, fps: 29.0

比较有意思的时按照理论上来看,T3-T1 > T2-T1,但实际情况是前者在客户端统计是8.2ms,后者在服务端统计是12ms,可能有些人不太能理解。

这个原因很简单,因为客户端和服务端ms级时间不同步,我已经在服务端建立了NTP服务,客户端去跟服务端同步,但精度无法达到ms级以下。

可优化的重点方向

参照mp4和rtsp视频流这些策略, 比较简单的思路是摄像头的两个帧之间画面比较相似,像素级可以传递差异化部分压缩,这样数据量就会少很多,但这样处理需要前后帧关联处理,在服务端需要按序每个帧都解码。当然还可以采用流推送的方式来节省流量,减少网络传输时延。

所以具体场景下需要分析到底是CPU资源受限,还是网络带宽受限,不同的资源瓶颈优化策略和重点都不一样。

客户端图像捕获和处理

我们直接使用opencv来连接摄像头,获取图像,合格比较简单如下。

class Cap:    def __init__(self):        self.connect_camera()    def connect_camera(self):        self.cap = cv2.VideoCapture(0)        self.size = (int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),                      int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))    def yield_img(self):        logging.warning(f"camera size: {self.size}")        while True:            try:                ret, frame = self.cap.read()                # 读取之前获取时间错                timestamp = time.time()                yield timestamp, frame            except Exception as e:                logging.error(f"get camera error: {e}")

获取到图像,按照目标尺寸resize,然后压缩成jpeg,采用自定义的协议,基于tcp单线程实时发送。

# sender = SendUDP(server_ip="127.0.0.1", server_port=udp_recv_port)sender = SendTCP(server_ip="192.168.3.22", server_port=udp_recv_port)c = Cap()width, height = 512, 384if width != c.size[0] or height != c.size[1]:    resize = Trueelse:    resize = Falsecost_list = []start = time.time()for ts, img in c.yield_img():    if resize:        img_dst = cv2.resize(img, (width, height))    else:        img_dst = img    shape = img_dst.shape    _, content = cv2.imencode(".jpeg", img_dst)    # content = img_dst    # logging.warning(f'convert cost:{time.time() - ts}')    sender.send("test", content.tobytes(), timestamp=int(ts*1000))    # 发送到服务器    cost_list.append(time.time() - ts)    if len(cost_list) > 300:        logging.warning(f"cost max: {max(cost_list)*1000:.1f}ms, avg: {sum(cost_list)*1000/len(cost_list): .1f}ms")        cost_total = time.time() - start        logging.warning(f"total:{cost_total}, fps: {len(cost_list)//cost_total}")        start = time.time()        cost_list = []        break
客户端TCP发送

客户端发送时纯文件发送,代码如下。

    def send(self, device, content, timestamp: int, width: int=0, height: int=0) -> bool:        """        :param device:        :param content:        :param timestamp: 时间戳,单位ms        :return:        """        block_total = math.ceil(len(content)/block_size)        header = FileUdpHeader()        header.block_total = block_total        header.timestamp = timestamp        header.width = width        header.height = height        name = f"{device}.{timestamp}.jpeg"        header.name = name.encode(udp_encoding)        header.size = len(content)        # 文件发送请求        self.client_socket.send(header.encode())        rsp = self.client_socket.recv(1000)        # 服务端是否接收到文件发送请求并等待接收        ack = self.parse(rsp)        if ack != TCPRspType.start:            logging.error(f"文件发送失败:{name},关闭tcp连接")            self.connect()            return False        self.client_socket.send(content)        # 发送完成等待服务端响应        try:            rsp = self.client_socket.recv(1000)        except:            logging.error(f"文件接收响应超时:{name},关闭tcp连接")            self.connect()            return False        ack = self.parse(rsp)        if ack != TCPRspType.finish:            logging.error(f"文件发送失败:{name},关闭tcp连接")            self.connect()            return False        return True
服务端TCP接收

服务端接收可以使用Python的TCP服务端的两个socket库,本demo主要是文件接收,所以比较简单,代码如下。

import osimport shutilimport timefrom socketserver import BaseRequestHandler, ThreadingTCPServerimport loggingfrom file_define import FileUdpHeader, udp_encoding, header_size, block_size, SendRsp, TCPRspTypeimport hashlibcache_dir = "/home/data/cache"dst_dir = "/home/data/dst"os.makedirs(cache_dir, exist_ok=True)os.makedirs(dst_dir, exist_ok=True)udp_recv_port = 37090class FileRec(BaseRequestHandler):    def write(self, size: int, timestamp: int, name: str, content: bytes=b'', md5="") -> bool:        file_path = os.path.join(cache_dir, name)        with open(file_path, "ab") as f:            f.write(content)            self.file_size += len(content)        if self.file_size < size:            return False        cur_time = int(time.time() * 1000)        file_name = os.path.splitext(name)[0]        file_dst = os.path.join(dst_dir, f"{file_name}_{cur_time}.jpeg")        # 移动        shutil.move(file_path, file_dst)        cur_time = int(time.time() * 1000)        # logging.warning(f"文件{name}传输完毕@{self.peer_name},总大小约:{size},"        #                 f"文件创建时间:{timestamp}, 当前时间:{cur_time}, 时间差:{cur_time-timestamp}")        return True    def handle(self) -> None:        """        单个请求处理        :return:        """        self.peer_name = self.request.getpeername()        while True:            msg = self.request.recv(block_size)            try:                header = FileUdpHeader()                header.decode(msg)                self.file_size = 0                name = header.name.decode(udp_encoding)                self.write(header.size, header.timestamp, name)                start = SendRsp()                start.name = header.name                start.state = TCPRspType.start                self.request.send(start.encode())                # 循环接收                while True:                    try:                        # logging.warning(f'{time.time()}:start recv {name} block')                        msg = self.request.recv(block_size)                        # logging.warning(f'{time.time()}:recv success {name} block')                        if len(msg) == 0:                            return                        if self.write(header.size, header.timestamp, name, msg):                            break                    except Exception as e:                        logging.error(f"recv {name} error: {e}")                        self.request.close()                        return                finish = SendRsp()                finish.name = header.name                finish.state = TCPRspType.finish                self.request.send(finish.encode())            except Exception as e:                logging.error(f"parse msg error:{e}, reset connect")                self.request.close()                returnif __name__ == '__main__':    # 多线程    logging.warning(f"开始监听{udp_recv_port}, 缓存目录:{cache_dir}, 目标目录:{dst_dir}")    serv = ThreadingTCPServer(('', udp_recv_port), FileRec)    serv.timeout = 5    serv.serve_forever()

标签: #python 读取摄像头数据