前言:
现时你们对“python ssl socket”大概比较重视,大家都需要了解一些“python ssl socket”的相关知识。那么小编在网络上收集了一些关于“python ssl socket””的相关知识,希望各位老铁们能喜欢,小伙伴们快快来学习一下吧!加密通讯的必要性
我们在使用http或tcp协议传输数据时,都是明文的,用抓包工具截获数据后,很容易就能看到数据内容。你的用户名、密码和各种数据都一目了然。当你不希望别人看到你的数据时,可以使用加密通讯。这样你的数据在传输过程中就不会被偷窥或篡改了,即使用网络抓包工具截获到你的数据也很难被破解。
加密通讯的种类
加密通讯通常有两种方式,一种是对数据进行加密,只把关键的数据通过某种加密算法加密,对方拿到加密数据后通过协商好的算法进行解密。另一种是对传输通道进行加密,无论任何数据在传输过程中都是被保护的。有些网站是https协议的,这就是基于ssl的http通讯,比如一些证券、银行的网站都是用的这种通讯方式。那么是不是选择通道加密就是最好的呢?也不是,因为通道加密相对于部分数据加密,耗费的资源更多,传输性能会有一定的损耗。因此,也是需要根据业务场景来选择的。需要加密的数据很多,就使用通道加密。否则可以使用第一种方式,只对敏感数据进行部分加密即可。
SSL Socket通讯
我们今天主要是要讲一下SSL Socket是加密套接字连接,它就是通道加密的一种传输方式。客户端与服务端建立加密通道后,传输过程中任何数据都是被保护的。我们知道http或https都是建立于tcp socket之上的,因此ssl socket比https的扩展性或传输性能更好。
下载并安装openssl
我们需要先下载并安装openssl,然后生成证书。可以在openssl官网下载openssl,如果是windows系统,也可以在Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions下载。
生成根证书文件
生成根证书私钥
openssl genrsa -out root.key 2048
生成根证书请求文件
openssl req -new -key root.key -out root.csr
生成根证书crt文件
openssl x509 -days 3650 -req -CAcreateserial -signkey root.key -in root.csr -out root.crt
生成服务器证书文件
生成服务器私钥
openssl genrsa -out server.key 2048
生成服务器请求文件
openssl req -new -key server.key -out server.csr
生成服务器证书crt文件
openssl x509 -days 3650 -req -CA root.crt -CAkey root.key -CAcreateserial -in server.csr -out server.crt
生产客户端证书文件
生成客户端私钥
openssl genrsa -out client.key 2048
生成client端csr文件,为生成客户端证书做准备
openssl req -new -key client.key -out client.csr
生成客户端证书crt文件
openssl x509 -days 3650 -req -CA root.crt -CAkey root.key -CAcreateserial -in client.csr -out client.crt
服务端代码:
import sslimport timeimport socketimport tracebackimport Messageimport threadingfrom struct import unpack# 字节转字符串def bytes2str(bytes): return bytes.decode('utf-8')class Server: def __init__(self): # 服务器ip与端口 self.server_address = ('127.0.0.1', 9089) self.ssl_server_crt = "cert/server.crt" # 服务端证书 self.ssl_server_key = "cert/server.key" # 服务端私钥 self.ssl_client_crt = "cert/client.crt" # 信任客户端证书 def start(self): # 生成SSL上下文 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.verify_mode = ssl.CERT_REQUIRED # 加载服务端证书和私钥 context.load_cert_chain(self.ssl_server_crt, self.ssl_server_key) # 加载信任客户端证书 context.load_verify_locations(self.ssl_client_crt) # 监听端口 with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock: sock.bind(self.server_address) sock.listen(1024) print(f'Server started on {self.server_address}') with context.wrap_socket(sock, server_side=True, ) as ssock: while True: try: client_socket, addr = ssock.accept() handler = RequestHandler(client_socket, addr) thread = threading.Thread(target=handler.run, args=()) thread.start() except Exception as ex: print(ex)class RequestHandler: def __init__(self, sock, client_address): self.sock = sock self.client_address = client_address def run(self): try: while True: struct_bytes = self.sock.recv(4) # 读取数据长度(4字节) data_size, = unpack('!I', struct_bytes) # 解struct包,转换为数据长度 data = b'' # 已接收的数据 recv_size = 0 # 接收数据大小 buff_size = 8192 # 接收缓冲区大小 while recv_size < data_size: remain_size = data_size - recv_size # 计算剩余数据长度 if remain_size < buff_size: # 如果剩余数据长度小于缓冲区长度时,设置缓冲区长度为剩余数据长度 buff_size = remain_size recv_data = self.sock.recv(buff_size) # 接收数据 data += recv_data # 数据累加 recv_size = len(data) # 计算已接收的数据长度 if recv_size == data_size: # 当数据接收完成时,对数据进行解析和处理 print(f'recv data from {self.client_address}, data_size:', len(data)) cliid, msgid, = unpack('!2b', data[:2]) # 读取消息ID,根据消息ID区分处理 print(f'cliid:{cliid} msgid:{msgid}') match msgid: case Message.MSG_ID_REGISTER: appid = bytes2str(data[2:38]) imei = bytes2str(data[38:53]) print(f'appid: {appid}, imei: {imei}') error_code = 0 # 错误码 0正常 -1错误 response = Message.pack('!I2b', 2, msgid, error_code) self.sock.sendall(response) case Message.MSG_ID_HEARTBEAT: error_code = 0 # 错误码 0正常 -1错误 response = Message.pack('!I2b', 2, msgid, error_code) self.sock.sendall(response) case Message.MSG_ID_DATETIME: strtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) str_len = len(strtime) error_code = 0 # 错误码 0正常 -1错误 response = Message.pack(f'!I2b{str_len}s', 2 + str_len, msgid, error_code, strtime) self.sock.sendall(response) except Exception as ex: print(f"{ex} {self.client_address}") traceback.print_exc()if __name__ == '__main__': Server().start()
客户端代码:
import socketimport sslimport structimport threadingimport timefrom multiprocessing import JoinableQueueimport Messageclass Client: def __init__(self, host, port): self.sock = None self.ssock = None self.writer = None self.reader = None self.heartbeat_thread = None self.ssl_client_crt = "cert/client.crt" # 客户端证书 self.ssl_client_key = "cert/client.key" # 客户端私钥 self.ssl_server_crt = "cert/server.crt" # 信任服务端证书 self.server_address = (host, port) def connect(self): try: print(f'Connecting to server{self.server_address}.') # 生成SSL上下文 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) # 加载客户端证书和私钥 context.load_cert_chain(self.ssl_client_crt, self.ssl_client_key) # 加载信任服务端证书 context.load_verify_locations(self.ssl_server_crt) # 这里将check_hostname,否则本地验证的时候域名会校验不过 context.check_hostname = False # 与服务端建立socket连接 self.sock = socket.create_connection(self.server_address) self.ssock = context.wrap_socket(self.sock) print(self.ssock.version()) # 创建并启动网络数据读取线程 self.reader = ReaderThread(self) self.reader.start() # 创建并启动网络数据写入线程 self.writer = WriterThread(self) self.writer.start() # 启动心跳线程 if not self.heartbeat_thread: self.heartbeat_thread = HeartbeatThread(self) self.heartbeat_thread.start() print('Connected.') except Exception as e: print(f'Connection refused. {e}') def disconnect(self): print('Disconnected from server.') if self.reader: self.reader.dispose() self.reader = None if self.writer: self.writer.dispose() self.writer = None self.sock.close() # 注册连接 def register(self): appid = '996da38d-a7be-4947-aa4c-f8208b5f4ade' imei = '869858030720693' self.writer.send(Message.msg_register(appid, imei)) # 请求日期时间 def request_datetime(self): self.writer.send(Message.msg_datetime())# 心跳线程class HeartbeatThread(threading.Thread): def __init__(self, cli): threading.Thread.__init__(self) self.cli = cli def run(self) -> None: while True: if self.cli.writer: self.cli.writer.send(Message.msg_heartbeat()) else: self.cli.connect() time.sleep(3)# 数据读取线程class ReaderThread(threading.Thread): def __init__(self, cli): threading.Thread.__init__(self) self.cli = cli self.interrupt = False def run(self) -> None: while not self.interrupt: try: struct_bytes = self.cli.ssock.recv(4) # 数据总长度 data_size, = struct.unpack('!I', struct_bytes) # 解struct包 data = b'' # 已接收的数据 recv_size = 0 # 接收数据大小 buff_size = 8192 # 接收缓冲区大小 while recv_size < data_size: remain_size = data_size - recv_size if remain_size < buff_size: buff_size = remain_size recv_data = self.cli.ssock.recv(buff_size) data += recv_data recv_size = len(data) if recv_size == data_size: print(f'recv data: {data}, size:{len(data)}') msgid, = struct.unpack('!b', data[:1]) error_code, = struct.unpack('!b', data[1:2]) print(f'msgid:{msgid}, error_code:{error_code}') match msgid: case Message.MSG_ID_DATETIME: date_time = data[2:].decode('utf-8') print(date_time) except Exception as e: print(e) self.cli.disconnect() break def dispose(self): self.interrupt = True# 数据写入线程class WriterThread(threading.Thread): def __init__(self, cli): threading.Thread.__init__(self) self.cli = cli self.interrupt = False self.queue = JoinableQueue(1024) def run(self) -> None: while not self.interrupt: try: data = self.queue.get() self.cli.ssock.sendall(data) except Exception as e: print(e) self.cli.disconnect() break # 数据长度(4字节,不包括本身) + 数据(不定长), 大端网络字节序 def send(self, data): if self.queue: print(f'send data: {data}, size:{len(data)}') data_size = struct.pack('!I', len(data)) self.queue.put(data_size + data) def dispose(self): self.interrupt = True self.queue = Noneif __name__ == '__main__': client = Client('127.0.0.1', 9089) client.connect() client.register() client.request_datetime()
消息定义代码:
import struct# 客户端IDCLIENT_ID = 0x00# 注册消息IDMSG_ID_REGISTER = 0x00# 心跳消息IDMSG_ID_HEARTBEAT = 0x01# 时间消息IDMSG_ID_DATETIME = 0x02def pack(fmt, *args): args_list = list(args) for i, arg in enumerate(args_list): if isinstance(arg, str): args_list[i] = arg.encode('utf-8') return struct.pack(fmt, *args_list)# 组装注册消息def msg_register(appid, imei): return pack('!2b36s15s', CLIENT_ID, MSG_ID_REGISTER, appid, imei)# 组装心跳消息def msg_heartbeat(): return pack('!2b', CLIENT_ID, MSG_ID_HEARTBEAT)# 组装时间消息def msg_datetime(): return pack('!2b', CLIENT_ID, MSG_ID_DATETIME)