龙空技术网

Python之socket模块使用详解(附带解决丢包、粘包问题)

牛鹭软件测试 160

前言:

此时大家对“java socket raw”大致比较讲究,大家都需要分析一些“java socket raw”的相关内容。那么小编也在网上收集了一些有关“java socket raw””的相关资讯,希望小伙伴们能喜欢,朋友们一起来学习一下吧!

一、Socket参数使用介绍

Python使用 socket 模块创建套接字,语法格式如下:

import socketsocket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None) # 默认参数

1. socket()参数

family:

socket.AF_INET - IPv4(默认)socket.AF_INET6 - IPv6socket.AF_UNIX - 只能够用于单一的Unix系统进程间通信

type:

socket.SOCK_STREAM - 流式socket, for TCP (默认)socket.SOCK_DGRAM - 数据报式socket, for UDPsocket.SOCK_RAW - 原始套接字socket.SOCK_RDM - 可靠UDP形式socket.SOCK_SEQPACKET - 可靠的连续数据包服务

2. socket对象内建方法

服务端套接字方法:

s.bind() - 绑定地址(host,port)到套接字,在AF_INET下,以元组(host,port)的形式表示地址。s.listen() - 开启TCP监听,操作系统可以挂起的最大连接数量,该值至少为1。s.accept() - 被动接受TCP客户端连接,(阻塞式)等待连接的到来。

客户端套接字方法:

s.connect() - 主动初始化TCP服务器连接,一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。s.connect_ex() - connect()函数的扩展版本,出错时返回出错码,而不是抛出异常。

公共套接字方法:

s.recv() - 接收TCP数据,数据以byte类型返回,bufsize指定要接收的最大数据量。s.send() - 发送TCP数据,将string中的数据转化为byte类型发送到连接的套接字,返回值是要发送的字节数量,该数量可能小于string的字节大小。s.sendall() - 发送完整TCP数据,将string中的数据转化为byte类型发送到连接的套接字,但在返回之前会尝试发送所有数据,成功返回None,失败则抛出异常。s.recvfrom() - 接收UDP数据,与recv()类似,但返回值是(data,address),其中data是包含接收数据的字符串,address是客户端的套接字地址。s.sendto() - 发送UDP数据,将数据发送到套接字,参数形式为(data,(address,port))的元组,address为远程服务端地址,返回值是发送的字节数。s.close() - 关闭套接字。s.getpeername() - 返回连接套接字的远程地址,返回值通常是元组(ipaddr,port)。s.getsockname() - 返回套接字自己的地址,通常是一个元组(ipaddr,port)。s.setsockopt(level,optname,value) - 设置给定套接字选项的值。s.getsockopt(level,optname[.buflen]) - 返回套接字选项的值。s.settimeout(timeout) - 设置套接字操作的超时时间,timeout是一个浮点数,单位是秒s.gettimeout() - 返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None。s.fileno() - 返回套接字的文件描述符。s.setblocking(flag) - 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。s.makefile() - 创建一个与该套接字相关连的文件。二、丢包、粘包问题解决思路和方法

问题:

Socket有一个缓冲区,缓冲区是一个流,先进先出,发送和取出都可以自定义大小,如果缓冲区的数据未取完,则可能会存在数据堆积。其中【recv(1024)】表示从缓冲区里取最大为1024个字节,但实际取值大小是不确定的,可能会导致丢包,socket发送两条连续数据时,也有可能最终会拼接成一条进行发送,所以也会导致粘包问题的产生。

解决的一些办法和思路:

在每条数据发送之间增加停顿时间,如【tiem.sleep(0.5) # 延时0.5s】每次发送后等待对方确认接收数据,确认完毕后再发送下一条(加验证),否则重传减少一次性发送和接收数据的大小,理论上buffer size越小丢包或粘包率就越低,建议在1024~10240之间

下面提供一个解决TCP recv丢包的方法:

def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):    """    循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息    :param handle: socket句柄    :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组    :param side: 默认server端    :param do_decode: 是否需要decode,默认True    :param do_print_info: 是否需要打印socket信息,默认True    :return:    """    while True:        if do_decode:            socket_data = handle.recv(BUFFER_SIZE).decode()        else:            socket_data = handle.recv(BUFFER_SIZE)        if do_print_info:            current_time = time.strftime('%Y-%m-%d %H:%M:%S')            if side == 'server':                print(f'Server received ==> {current_time} - {socket_data}')            else:                print(f'Client received ==> {current_time} - {socket_data}')        # 如果expected_msg为空,跳出循环        if not expected_msg:            break        if isinstance(expected_msg, (list, tuple)):            flag = False            for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中                if expect in socket_data:  # 如果有任意一个存在,跳出循环                    flag = True                    break            if flag:                break        else:            if expected_msg in socket_data:                break        time.sleep(3)  # 每隔3秒接收一次socket    return socket_data

原理就是使用while循环不停地接收socket,直到指定的字符出现为止,再跳出循环,这样可以防止socket丢包,也可以保证socket接收的完整性。

三、构建Socket-TCP传输

1. 客户端配置

代码如下:

# -*- coding:utf-8 -*-import timeimport socket__author__ = 'Evan'REMOTE_IP = ('127.0.0.1', 6666)BUFFER_SIZE = 1024SOCKET_TIMEOUT_TIME = 60def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):    """    发送socket info,并根据side打印不同的前缀信息    :param handle: socket句柄    :param msg: 要发送的内容    :param side: 默认server端    :param do_encode: 是否需要encode,默认True    :param do_print_info: 是否需要打印socket信息,默认True    :return:    """    if do_encode:        handle.send(msg.encode())    else:        handle.send(msg)    if do_print_info:        current_time = time.strftime('%Y-%m-%d %H:%M:%S')        if side == 'server':            print(f'Server send --> {current_time} - {msg}')        else:            print(f'Client send --> {current_time} - {msg}')def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):    """    循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息    :param handle: socket句柄    :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组    :param side: 默认server端    :param do_decode: 是否需要decode,默认True    :param do_print_info: 是否需要打印socket信息,默认True    :return:    """    while True:        if do_decode:            socket_data = handle.recv(BUFFER_SIZE).decode()        else:            socket_data = handle.recv(BUFFER_SIZE)        if do_print_info:            current_time = time.strftime('%Y-%m-%d %H:%M:%S')            if side == 'server':                print(f'Server received ==> {current_time} - {socket_data}')            else:                print(f'Client received ==> {current_time} - {socket_data}')        # 如果expected_msg为空,跳出循环        if not expected_msg:            break        if isinstance(expected_msg, (list, tuple)):            flag = False            for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中                if expect in socket_data:  # 如果有任意一个存在,跳出循环                    flag = True                    break            if flag:                break        else:            if expected_msg in socket_data:                break        time.sleep(3)  # 每隔3秒接收一次socket    return socket_datadef start_client_socket():    """    启动客户端TCP Socket    :return:    """    ip, port = REMOTE_IP    client = socket.socket()  # 使用TCP方式传输    print(f'开始连接服务端 {ip}:{port} ...')    client.connect((ip, port))  # 连接远程服务端    print(f'连接服务端 {ip}:{port} 成功')    client.settimeout(SOCKET_TIMEOUT_TIME)  # 设置客户端超时时间    # 与服务端握手,达成一致    send_socket_info(handle=client, side='client', msg='客户端已就绪')    receive_socket_info(handle=client, side='client', expected_msg='服务端已就绪')    # 与服务端交互    while True:        answer = input('请输入要发送给服务端的信息:')        send_socket_info(handle=client, side='client', msg=answer)        socket_data = receive_socket_info(handle=client, side='client', expected_msg='')        if 'quit' in socket_data:            send_socket_info(handle=client, side='client', msg='quit')            break    # 断开socket连接    client.close()    print(f'与服务端 {ip}:{port} 断开连接')if __name__ == '__main__':    start_client_socket()  # 启动客户端socket

2. 服务端配置(阻塞式TCP连接)

代码如下:

# -*- coding:utf-8 -*-"""阻塞式TCP连接"""import timeimport socket__author__ = 'Evan'SOCKET_IP = ('127.0.0.1', 6666)BUFFER_SIZE = 1024SOCKET_TIMEOUT_TIME = 60def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):    """    发送socket info,并根据side打印不同的前缀信息    :param handle: socket句柄    :param msg: 要发送的内容    :param side: 默认server端    :param do_encode: 是否需要encode,默认True    :param do_print_info: 是否需要打印socket信息,默认True    :return:    """    if do_encode:        handle.send(msg.encode())    else:        handle.send(msg)    if do_print_info:        current_time = time.strftime('%Y-%m-%d %H:%M:%S')        if side == 'server':            print(f'Server send --> {current_time} - {msg}')        else:            print(f'Client send --> {current_time} - {msg}')def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):    """    循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息    :param handle: socket句柄    :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组    :param side: 默认server端    :param do_decode: 是否需要decode,默认True    :param do_print_info: 是否需要打印socket信息,默认True    :return:    """    while True:        if do_decode:            socket_data = handle.recv(BUFFER_SIZE).decode()        else:            socket_data = handle.recv(BUFFER_SIZE)        if do_print_info:            current_time = time.strftime('%Y-%m-%d %H:%M:%S')            if side == 'server':                print(f'Server received ==> {current_time} - {socket_data}')            else:                print(f'Client received ==> {current_time} - {socket_data}')        # 如果expected_msg为空,跳出循环        if not expected_msg:            break        if isinstance(expected_msg, (list, tuple)):            flag = False            for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中                if expect in socket_data:  # 如果有任意一个存在,跳出循环                    flag = True                    break            if flag:                break        else:            if expected_msg in socket_data:                break        time.sleep(3)  # 每隔3秒接收一次socket    return socket_datadef start_server_socket():    """    启动服务端TCP Socket    :return:    """    ip, port = SOCKET_IP    server = socket.socket()  # 使用TCP方式传输    server.bind((ip, port))  # 绑定IP与端口    server.listen(5)  # 设置最大连接数为5    print(f'服务端 {ip}:{port} 开启')    # 不断循环,接受客户端请求    while True:        print('等待客户端连接...')        conn, address = server.accept()  # 使用accept阻塞式等待客户端请求,如果多个客户端同时访问,排队一个一个进        print(f'当前连接客户端:{address}')        conn.settimeout(SOCKET_TIMEOUT_TIME)  # 设置服务端超时时间        # 与客户端握手,达成一致        receive_socket_info(handle=conn, expected_msg='客户端已就绪')        send_socket_info(handle=conn, msg='服务端已就绪')        # 不断接收客户端发来的消息        while True:            socket_data = receive_socket_info(handle=conn, expected_msg='')            if 'quit' in socket_data:                send_socket_info(handle=conn, msg='quit')                break            answer = input('请回复客户端的信息:')            send_socket_info(handle=conn, msg=answer)        # 断开socket连接        conn.close()        print(f'与客户端 {ip}:{port} 断开连接')if __name__ == '__main__':    start_server_socket()  # 启动服务端socket

3. 服务端配置(非阻塞式TCP连接)

代码如下:

# -*- coding:utf-8 -*-"""非阻塞式TCP连接"""import timeimport socketserver__author__ = 'Evan'SOCKET_IP = ('127.0.0.1', 6666)BUFFER_SIZE = 1024SOCKET_TIMEOUT_TIME = 60class UnblockSocketServer(socketserver.BaseRequestHandler):    # 继承socketserver.BaseRequestHandler类    # 首先执行setup方法,然后执行handle方法,最后执行finish方法    # 如果handle方法报错,则会跳过    # setup与finish无论如何都会执行    # 一般只定义handle方法即可    def setup(self):        print('开启非阻塞式连接...')    @staticmethod    def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):        """        发送socket info,并根据side打印不同的前缀信息        :param handle: socket句柄        :param msg: 要发送的内容        :param side: 默认server端        :param do_encode: 是否需要encode,默认True        :param do_print_info: 是否需要打印socket信息,默认True        :return:        """        if do_encode:            handle.send(msg.encode())        else:            handle.send(msg)        if do_print_info:            current_time = time.strftime('%Y-%m-%d %H:%M:%S')            if side == 'server':                print(f'Server send --> {current_time} - {msg}')            else:                print(f'Client send --> {current_time} - {msg}')    @staticmethod    def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):        """        循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息        :param handle: socket句柄        :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组        :param side: 默认server端        :param do_decode: 是否需要decode,默认True        :param do_print_info: 是否需要打印socket信息,默认True        :return:        """        while True:            if do_decode:                socket_data = handle.recv(BUFFER_SIZE).decode()            else:                socket_data = handle.recv(BUFFER_SIZE)            if do_print_info:                current_time = time.strftime('%Y-%m-%d %H:%M:%S')                if side == 'server':                    print(f'Server received ==> {current_time} - {socket_data}')                else:                    print(f'Client received ==> {current_time} - {socket_data}')            # 如果expected_msg为空,跳出循环            if not expected_msg:                break            if isinstance(expected_msg, (list, tuple)):                flag = False                for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中                    if expect in socket_data:  # 如果有任意一个存在,跳出循环                        flag = True                        break                if flag:                    break            else:                if expected_msg in socket_data:                    break            time.sleep(3)  # 每隔3秒接收一次socket        return socket_data    def handle(self):        """        所有和客户端交互的操作写在这里        :return:        """        conn = self.request  # 获取socket句柄        # 与客户端握手,达成一致        self.receive_socket_info(handle=conn, expected_msg='客户端已就绪')        self.send_socket_info(handle=conn, msg='服务端已就绪')        # 不断接收客户端发来的消息        while True:            socket_data = self.receive_socket_info(handle=conn, expected_msg='')            if 'quit' in socket_data:                self.send_socket_info(handle=conn, msg='quit')                break            answer = input('请回复客户端的信息:')            self.send_socket_info(handle=conn, msg=answer)        # 断开socket连接        conn.close()    def finish(self):        print('连接关闭')def main():    # 创建多线程实例    server = socketserver.ThreadingTCPServer(SOCKET_IP, UnblockSocketServer)    # 开启异步多线程,等待连接    server.timeout = SOCKET_TIMEOUT_TIME  # 设置服务端超时时间    print(f'服务端 {SOCKET_IP[0]}:{SOCKET_IP[1]} 开启')    server.serve_forever()  # 永久运行if __name__ == '__main__':    main()

4. 交互过程demo

客户端:

服务端:

四、构建Socket-UDP传输

1. 客户端配置

代码如下:

# -*- coding:utf-8 -*-import socket__author__ = 'Evan'REMOTE_IP = ('127.0.0.1', 6666)BUFFER_SIZE = 1024SOCKET_TIMEOUT_TIME = 60def start_client_socket():    """    启动客户端UDP Socket    :return:    """    ip, port = REMOTE_IP    client = socket.socket(type=socket.SOCK_DGRAM)  # 使用TCP方式传输    print(f'开始连接服务端 {ip}:{port} ...')    client.connect((ip, port))  # 连接远程服务端    print(f'连接服务端 {ip}:{port} 成功')    # 与服务端交互    while True:        answer = input('请输入要发送给服务端的信息:')        client.sendto(answer.encode(), REMOTE_IP)  # 使用sendto发送UDP消息,address填入服务端IP和端口        if 'quit' in answer:            break    # 断开socket连接    client.close()    print(f'与服务端 {ip}:{port} 断开连接')if __name__ == '__main__':    start_client_socket()  # 启动客户端socket

2. 服务端配置

代码如下:

# -*- coding:utf-8 -*-import socket__author__ = 'Evan'SOCKET_IP = ('127.0.0.1', 6666)BUFFER_SIZE = 1024def start_server_socket():    """    启动服务端UDP Socket    :return:    """    ip, port = SOCKET_IP    server = socket.socket(type=socket.SOCK_DGRAM)  # 使用UDP方式传输    server.bind((ip, port))  # 绑定IP与端口    print(f'服务端 {ip}:{port} 开启')    # 不断循环,接受客户端发来的消息    while True:        socket_data, address = server.recvfrom(BUFFER_SIZE)        print('收到客户端 -> {} 发来的消息: {}'.format(address, socket_data.decode()))if __name__ == '__main__':    start_server_socket()

3. 交互过程demo

客户端:

服务端:

标签: #java socket raw