龙空技术网

websocket实战

liangcao 2652

前言:

此刻你们对“c语言websocket客户端”都比较关切,小伙伴们都想要了解一些“c语言websocket客户端”的相关内容。那么小编也在网络上收集了一些对于“c语言websocket客户端””的相关知识,希望咱们能喜欢,兄弟们一起来了解一下吧!

文中使用到的小工具:websocket调试工具

实战角度讲解 websocket 在企业中的使用

需求说明任务管理平台,每个任务有唯一执行 ID (performId),要实时将任务最新状态反馈给用户用户在平台观察任务执行情况一个用户可以开多个页面观察不同的任务执行一个任务可以同时被多名用户关注

源码地址

工程说明

bean: 租户信息constant: 事件枚举controller: 模拟了一个事件广播interceptors: websocket 握手拦截器,完成认证、参数设定等功能utils:两个小工具类ws:websocket 相关处理

一、启动类

@EnableWebSocket // 开启 websocket 支持@EnableScheduling // 开启定时,方便后续资源回收@SpringBootApplicationpublic class WebsocketApplication {    public static void main(String[] args) {        SpringApplication.run(WebsocketApplication.class, args);    }    @Bean    public WebSocketConfigurer webSocketConfigurer(TaskWsHandler handler,                                                   WsHandShakeInterceptor interceptor) {        return registry -> registry                /**                 * ws/history?performId=xxx                 * 通过后面的自定义参数,完成 session 订阅                 */                .addHandler(handler, "/ws/history") //注册地址和具体处理器                .addInterceptors(interceptor) // 配置拦截器                .setAllowedOrigins("*"); //开启跨域支持    }}

二、拦截器

这里本想通过 response 直接返回消息给前端,终止协议升级,但是通过 debug 发现最终执行这里

org.springframework.web.servlet.mvc.HttpRequestHandlerAdapter#handle

@Override

@Nullable

public ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception

{

((HttpRequestHandler) handler).handleRequest(request, response);

return null;

}

没有办法直接返回,所以转换了一种思想,通过放入 attributes 中 error,后面判断如果有值则关闭连接(但此时已经造成资源浪费了)

如果哪位大佬,有其他方案,欢迎交流

主要是通过 http 请求,拿到用户认证信息。操作 websocketSession 级别的参数 attributes

return true 完成认证,协议转换 http -> websocket

return false 终止协议转换,前端无法建立 websocket 连接

 @Override    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {        if (request instanceof ServletServerHttpRequest) {            HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();            String performId = servletRequest.getParameter("performId");            Tenant tenant = getTenant(servletRequest); // 通过 http_header 拿到用户信息,封装成租户            if (StringUtils.isAnyBlank(performId, tenant.getAccountId(), tenant.getUserId())) {            // 这里无法直接返回消息给前端,中转了一下,后面再返回                attributes.put(ERR_KEY, "URl param 'performId' or Header value 'accountId/userId' missing");            } else {                // concurrentHashMap key and value can not be 'null'                attributes.put(SESSION_KEY, performId);            }            String uniqueKey = UUIDUtil.build();            tenants.put(uniqueKey, tenant);            attributes.put(KEY, uniqueKey);            return true;        }        return false;    }

三、接收器

public class TaskWsHandler extends TextWebSocketHandler {    private static final Logger log = LoggerFactory.getLogger(TaskWsHandler.class);    private static final String PING = "ping";    private static final String PONG = "pong";    private AbstractMessageHandler handler;    /**     * performId: [sessionId1, sessionId2]     */    // 保存观察 performId 的 webSocketSession    private static final ConcurrentHashMap<String, Set<String>> HID_SID = new ConcurrentHashMap<>();    /**     * performId: YES     */    // 正在创建观察的 performId 得连接信息,防止被清理调    private static final ConcurrentHashMap<String, Byte> ON_CREATING = new ConcurrentHashMap<>();    private static final Byte PLACEHOLDER = 1;    /**     * 使用 guava 的,带有失效时间的缓存     * 失效后有通知,用户可以做资释放等操作     */    private static final LoadingCache<String, WebSocketSession> CACHES = CacheBuilder.newBuilder()            .expireAfterAccess(59, TimeUnit.SECONDS)            .initialCapacity(128)            .removalListener((notification -> {                // 移除通知                WebSocketSession session = (WebSocketSession) notification.getValue();                if (session.isOpen()) {                    clean(session);                    try {                        session.close(CloseStatus.SERVICE_OVERLOAD);                    } catch (IOException e) {                        log.error("close websocket error: {}", ExceptionUtils.getStackTrace(e));                    }                }            }))            .build(new CacheLoader<String, WebSocketSession>() {                // 拿不到资源时,初始化逻辑                       @Override                       public WebSocketSession load(String key) throws Exception {                           return null;                       }                   }            );    @Override    public void afterConnectionEstablished(WebSocketSession session) throws Exception {        String err = (String)session.getAttributes().get(WsHandShakeInterceptor.ERR_KEY);        if (null!=err) {            // 通过 interceptor 后,完成连接建立,如果存在错误,则关闭连接并给前端返回错误            session.sendMessage(new TextMessage(err));            session.close(CloseStatus.NOT_ACCEPTABLE.withReason(err));            return;        }        /**         * 放入缓存         * 后续通过 sessionId 拿到连接,才能发送消息         */        CACHES.put(session.getId(), session);        // 拿到 performId        String key = getPerformId(session);        /**         * 代表当前有"用户"关心",{@link #clean(WebSocketSession)} 仅移除 session, 否则移除整个集合          */        ON_CREATING.put(key, PLACEHOLDER);        // 如果不存在,则代表是首位"关心"者,初始化集合        HID_SID.putIfAbsent(key , new HashSet<>());        HID_SID.get(key).add(session.getId());        // 初始化待发送队列        WAIT_SENDING.put(session.getId(), new ConcurrentLinkedDeque<>());        // 创建完毕,移除"正在进行"操作        ON_CREATING.remove(key);    }    /**     * 处理接收到的消息     * @param session     * @param message     * @throws Exception     */    @Override    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {        log.debug("ws msg: {}", JsonUtil.toJSONString(message));        String msgBody = message.getPayload();        if (PING.equalsIgnoreCase(msgBody)) {            // 如果是 PING, 则响应 PONG            send(CACHES.getIfPresent(session.getId()), PONG);            return;        }        try {            Map<String, Object> msg = JsonUtil.parseObjectThrow(msgBody);            // 责任链处理            handler.handle(msg, getSessionFromCache(session));        } catch (Exception e) {            log.error("handle message error:{}", ExceptionUtils.getStackTrace(e));        }    }    @Override    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {        super.handleTransportError(session, exception);    }    @Override    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {        // 连接关闭,释放资源        clean(session);    }    private static String getPerformId(WebSocketSession session) {        return session.getAttributes().get(WsHandShakeInterceptor.SESSION_KEY).toString();    }    public WebSocketSession getSessionFromCache(WebSocketSession session) {        WebSocketSession cs = CACHES.getIfPresent(session.getId());        if (null==cs) {            log.error("get session from cache failed,sessionId: {}, and recache", session.getId());            CACHES.put(session.getId(), session);            return session;        }        return cs;    }    /**     * clean cache     * @param session     */    private static void clean(WebSocketSession session) {        String key = getPerformId(session);        WsHandShakeInterceptor.removeTenant(session);        Set<String> sids = HID_SID.getOrDefault(key, new HashSet<>());        sids.remove(session.getId());        if (!ON_CREATING.contains(key) && sids.size()==0) {            HID_SID.remove(key);        }        WAIT_SENDING.remove(session.getId());//        CACHES.invalidate(getSessionKey(session));    }    public Set<WebSocketSession> getAllSessionByPerformId(String performId) {        Set<String> sids = HID_SID.get(performId);        HashSet<WebSocketSession> sessions = new HashSet<>();        for (String sid : sids) {            sessions.add(CACHES.getIfPresent(sid));        }        return sessions;    }    /**     * 处理广播消息     * @param msg     * @param performId     */    public void handlerAll(Map<String, Object> msg, String performId) {        if (StringUtils.isBlank(performId)) {            return;        }        // 得到所有"关注者"信息        Set<String> sids = HID_SID.getOrDefault(performId, Collections.emptySet());        log.debug("listeners:{}, for performId: {}", JsonUtil.toJSONString(sids), performId);        for (String sid : sids) {            /**             * 这里由于消息一致,其实不用多次处理             * 向所有"关注者"直接发送即可             * 我懒了             */            handler.handle(msg, CACHES.getIfPresent(sid));        }    }    private static final ConcurrentHashMap<String, ConcurrentLinkedDeque<String>> WAIT_SENDING = new ConcurrentHashMap<>();    private static final ConcurrentHashMap<String, Byte> ON_SENDING = new ConcurrentHashMap<>();    /**     * 消息发送     * 这里不会真的发送,而是将消息存入该 session 的待发送队列     * 而后由 {@link #doSend(WebSocketSession)} 真正发送     * @param session     * @param msg     */    public static void send(WebSocketSession session, String msg) {        try {            if (null==session || !session.isOpen()) {                log.error("session status is error: {}, msg: {} drop", JsonUtil.toJSONString(session), msg);                return;            }            if (PONG.equals(msg)) {                // 如果是 PONG 放入头节点,首先发送                WAIT_SENDING.get(session.getId()).addFirst(msg);            } else {                WAIT_SENDING.get(session.getId()).offer(msg);            }            log.debug("will send msg: {} ,to session: {}", msg, session.getId());            doSend(session);        } catch (Exception e) {            log.error("send msg error: {}", ExceptionUtils.getStackTrace(e));        }    }    private static void doSend(WebSocketSession session) {        if (null==ON_SENDING.putIfAbsent(session.getId(), PLACEHOLDER)) {            // 代表没有正在发送的线程            if (session.isOpen()) {                String s;                while ((s = WAIT_SENDING.get(session.getId()).poll()) != null) {                    // 队列循环拉取,一直到空                    try {                        session.sendMessage(new TextMessage(s));                    } catch (Exception e) {                        // maybe current thread waiting blocking io, terminal this time send                        // 如果失败,则放入队列头,等待下次发送(下次消息到达或心跳发送都会触发)                        WAIT_SENDING.get(session.getId()).addFirst(s);                        log.error("terminal send,and wait next time. msg:{}, error: {}", s, ExceptionUtils.getStackTrace(e));                        break;                    }                }            } else {                log.error("session: {} has been closed", session.getId());            }            // 发送完成,移除标识            ON_SENDING.remove(session.getId());        }    }    /**     * 添加处理器     * @param handler     */    public void addHandler(AbstractMessageHandler handler) {        synchronized (this) {            if (null==this.handler) {                this.handler = handler;            } else {                this.handler = this.handler.addHeadChain(handler);            }        }    }    /**     * auto clean resource     * every 1s * ?     */    @Scheduled(fixedRate = 1000 * 100)    private void autoClean() {        // 自动清理        CACHES.cleanUp();    }}

四、 处理器

public abstract class AbstractMessageHandler<T> {    private static final Logger log = LoggerFactory.getLogger(AbstractMessageHandler.class);    @Autowired    TaskWsHandler historyTaskWsHandler;    private static final String EVENT = "event";    protected AbstractMessageHandler nextHandler;    protected WsEventEnum event;    protected boolean preHandle(String type) {        return false;    }    protected abstract void doHandle(Map<String, Object> msg, WebSocketSession session);    public void handle(Map<String, Object> msg, WebSocketSession session) {        log.debug("param msg: {}", JsonUtil.toJSONString(msg));        if (null == session ) {            log.error("session is null, msg:{}", JsonUtil.toJSONString(msg));            return;        }        if (null==msg || msg.isEmpty()) {            sendErr(session,"params is empty");            return;        }        if (StringUtils.isBlank(String.valueOf(msg.get(EVENT)))) {            sendErr(session,"param:[event] is needed");            return;        }        // 这里可以递归调用的,但是如果上面有很多检查,对性能会有影响        /**         *  if (preHandle(String.valueOf(msg.get(EVENT)))) {         *             doHandle(msg, session);         *         } else {         *             if (null!=nextHandler) {         *                 nextHandler.handle(msg, session);         *             }         *         }         */        AbstractMessageHandler currentHandler = this;        do {            if (currentHandler.preHandle(String.valueOf(msg.get(EVENT)))) {                // 由真正的处理器处理消息                currentHandler.doHandle(msg, session);                break;            }        } while ((currentHandler=currentHandler.nextHandler)!=null);        if (null==currentHandler) {            log.warn("no handler process msg: {}", JsonUtil.toJSONString(msg));        }    }    protected void send(WebSocketSession session, T msg, WsEventEnum eventEnum) {        try {            Map<String, Object> map = new HashMap<>();            map.put("data", msg);            map.put("type", eventEnum.getValue());            map.put("code", "success");            map.put("msg", "OK");            TaskWsHandler.send(session, JsonUtil.toJSONString(map));        } catch (Exception e) {            log.error("send msg error: {}", ExceptionUtils.getStackTrace(e));        }    }    protected void sendErr(WebSocketSession session, String msg) {        try {            Map<String, String> map = new HashMap<>();            map.put("code", "error");            map.put("msg", msg);            TaskWsHandler.send(session, JsonUtil.toJSONString(map));        } catch (Exception e) {            log.error("send msg error: {}", ExceptionUtils.getStackTrace(e));        }    }    public AbstractMessageHandler addChain(AbstractMessageHandler nextHandler) {        this.nextHandler = nextHandler;        return nextHandler;    }    public AbstractMessageHandler addHeadChain(AbstractMessageHandler nextHandler) {        nextHandler.nextHandler = this;        return nextHandler;    }    protected Tenant getTenant(WebSocketSession session) {        return WsHandShakeInterceptor.getTenant(session);    }    protected String getStringFromObj(Object obj) {        if (null==obj) {            return null;        }        if (obj instanceof String) {            String str = StringUtils.trimToNull(String.valueOf(obj));            if ("null".equals(str)) {                return null;            }            return str;        }        return JsonUtil.toJSONString(obj);    }    @PostConstruct    private void init() {        historyTaskWsHandler.addHandler(this);    }}

五、效果演示

启动项目连接websocket

失败

没有完成认证,连接关闭

加入认证,连接两个客户端

发送 PING消息互相没有影响访问 :{port}/test/publish/{performId} 模拟消息广播

测试工具使用:wscat-websocket调试工具

标签: #c语言websocket客户端