龙空技术网

如何实现客户端与服务端的双向通行

海的赤子 207

前言:

目前各位老铁们对“clienthold如何解决”可能比较注意,朋友们都想要了解一些“clienthold如何解决”的相关知识。那么小编同时在网络上搜集了一些对于“clienthold如何解决””的相关内容,希望咱们能喜欢,我们快快来学习一下吧!

长轮询实时通信的几种方式socket

实时通信,能够做到双方通信

轮训

实现简单,调用次数频繁,消息通知会产生延迟

长轮询

服务端实现复杂,消息实时性高,能够做到实时通信

MQ中采用的通信方式

在MQ中,多数会采用长轮询的方式获取消息,我们来分析一下这几种方式各自有什么优势

通信方式

实现复杂度

消息实时性

缺点

socket

一般

实时性高

需要维持链接,服务端推送消息,无法感知客户端的消费速率,容易造成客户端消息积压

轮训

简单

实时性低

消息容易产生延迟,频繁调用,网络开销大

长轮询

复杂

实时性高

实现起来较为复杂

长轮询的核心思想

用户发起一个请求,服务端将请求异步,并持有这个请求,当服务端数据发生变更的时候,找到相应的请求,并将数据返回,没有数据发送变更时,服务端通过延时的定时任务,将请求返回,客户端在收到请求后,重新发起新的请求。长轮询减少了请求次数,并提高了获取变更数据的实时性。

长轮询代码实现LongPollingController

提供http接口,供外部调用,并发起长轮询

@RestController@RequestMapping("longPolling")public class LongPollingController {    @Autowired    private LongPollingAdapter longPollingAdapter;    @PostMapping("listen")    public void listen(@Validated @RequestBody LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) {        longPollingAdapter.doLongPolling(longPollingDto, request, response);    }}
LongPollingAdapter

兼容短轮训,对于支持长轮询的请求,开始执行长轮询

public class LongPollingAdapter {    @Autowired    private LongPollingService longPollingService;    public void doLongPolling(LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) {        // 判断是否支持长轮询        if (isSupportLongPolling(request)) {            longPollingService.addLongPollingClient(longPollingDto, request, response);            return;        }        // 兼容短轮训        ConfigData configData = ConfigDataPersistent.getConfigData(longPollingDto.getAppCode());        if (configData == null) {            ResponseUtils.generatorResponse(response, Response.builder().code(-1).message("appCode不存在").build());            return;        }        if (longPollingDto.getMd5().equals(configData.getMd5())) {            // 数据未发生变化            ResponseUtils.generatorResponse(response, Response.builder().success());        }        ResponseUtils.generatorResponse(response, Response.builder().data(configData));    }    private boolean isSupportLongPolling(HttpServletRequest request) {        return request.getHeader(CommonConstant.LONG_POLLING_TIMEOUT) != null;    }}
LongPollingService继承SingleEventListener,用于触发LocalDataChangeEvent事件LongPollingClient,hold住请求,并开启延时定时任务,当定时执行时,返回改变的数据,并释放连接DataChangeTask,用于触发LocalDataChangeEvent事件时,返回改变的数据,并释放连接
@Servicepublic class LongPollingService extends SingleEventListener {    @Value("${pool.with.fix.rate:false}")    private boolean isFixPoll;    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8,            60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());    private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);    private static final Queue<LongPollingClient> longPollingClientQueue = new ConcurrentLinkedQueue<>();    public void addLongPollingClient(LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) {        String timeoutStr = request.getHeader(CommonConstant.LONG_POLLING_TIMEOUT);        int timeout = Math.max(10000, Integer.valueOf(timeoutStr) - 500);        if (isFixPolling()) {            timeout = 10000;        } else {            // 检测数据是否发生变化            ConfigData configData = ConfigDataPersistent.getConfigData(longPollingDto.getAppCode());            if (configData != null && !configData.getMd5().equals(longPollingDto.getMd5())) {                // 数据发生变化,直接返回                ResponseUtils.generatorResponse(response, Response.builder().data(configData));                return;            }        }        // 开启异步        AsyncContext asyncContext = request.startAsync();        // 通过线程池异步请求        executor.execute(new LongPollingClient(longPollingDto.getAppCode(), longPollingDto.getMd5(), asyncContext, timeout));    }    /**    * 是否以固定的频率返回数据    */    private boolean isFixPolling() {        return isFixPoll;    }    @Override    public Class<? extends Event> interest() {        return LocalDataChangeEvent.class;    }    /**     * 触发事件     *     * @param event     */    @Override    public void onEvent(Event event) {        if (isFixPolling()) {            // 忽略,待定时执行时返回变更的数据        } else {            if (event instanceof LocalDataChangeEvent) {                LocalDataChangeEvent localDataChangeEvent = (LocalDataChangeEvent) event;                executor.execute(new DataChangeTask(localDataChangeEvent.getAppCode()));            }        }    }    @Data    class LongPollingClient implements Runnable {        private final String appCode;        private final String md5;        private final AsyncContext asyncContext;        private final int timeout;        Future<?> asyncTimeoutFuture;        public LongPollingClient(String appCode, String md5, AsyncContext asyncContext, int timeout) {            this.appCode = appCode;            this.md5 = md5;            this.asyncContext = asyncContext;            this.timeout = timeout;        }        @Override        public void run() {            asyncTimeoutFuture = executorService.schedule(() -> {                longPollingClientQueue.remove(this);                // 固定时间的模式通过此定时返回变更的数据                if (isFixPolling()) {                    ConfigData configData = ConfigDataPersistent.getConfigData(appCode);                    if (configData != null && !md5.equals(configData.getMd5())) {                        sendResponse(configData);                    } else {                        sendResponse(null);                    }                } else {                    // 无数据变更,只是使用此方式hold请求,                    sendResponse(null);                }            }, timeout, TimeUnit.MILLISECONDS);            longPollingClientQueue.add(this);        }        void sendResponse(ConfigData configData) {            // 取消定时            if (asyncTimeoutFuture != null) {                asyncTimeoutFuture.cancel(false);            }            generatorResponse(configData);        }        void generatorResponse(ConfigData configData) {            if (configData != null) {                ResponseUtils.generatorResponse((HttpServletResponse) asyncContext.getResponse(), Response.builder().data(configData));            }            // 结束hold住的请求            asyncContext.complete();        }    }    class DataChangeTask implements Runnable {        private final String appCode;        public DataChangeTask(String appCode) {            this.appCode = appCode;        }        @Override        public void run() {            if (CollectionUtils.isEmpty(longPollingClientQueue)) {                return;            }            Iterator<LongPollingClient> iterator = longPollingClientQueue.iterator();            while (iterator.hasNext()) {                LongPollingClient longPollingClient = iterator.next();                if (appCode.equals(longPollingClient.getAppCode())) {                    // 移除持有的请求                    iterator.remove();                    ConfigData configData = ConfigDataPersistent.getConfigData(appCode);                    longPollingClient.sendResponse(configData);                }            }        }    }}
事件驱动推送数据EventListener
public interface EventListener {    /**     * 感兴趣的事件     *     * @return     */    List<Class<? extends Event>> interests();    /**     * 触发事件     *     * @param event     */    void onEvent(Event event);}

listener顶层接口,定时感兴趣的事件,以及触发事件

AbstractEventListener

@Slf4jpublic abstract class AbstractEventListener implements EventListener {    /**    * spring初始化bean的时候,自动添加listener    *    */    @PostConstruct    public void init() {        List<Class<? extends Event>> interests = interests();        if (CollectionUtils.isEmpty(interests)) {            log.error("没有感兴趣的事件");            return;        }        interests.forEach(clazz -> EventDispatcher.addListener(clazz, this));    }}

定义抽象类,用于在bean初始化时,自动注册listener

SingleEventListener

public abstract class SingleEventListener extends AbstractEventListener {    /**     * 感兴趣的时间     *     * @return     */    @Override    public final List<Class<? extends Event>> interests() {        return Lists.newArrayList(interest());    }    public abstract Class<? extends Event> interest();}

只对单个事件感兴趣

EventDispatcher

public class EventDispatcher {    private EventDispatcher() {    }    private static final Map<Class<? extends Event>, List<EventListener>> listenerHolder = new ConcurrentHashMap<>();    public static final void addListener(Class<? extends Event> clazz, EventListener listener) {        List<EventListener> listeners;        // 非线程安全,需要使用synchronized        synchronized (EventDispatcher.class) {            if (listenerHolder.containsKey(clazz)) {                listeners = listenerHolder.get(clazz);            } else {                listeners = new ArrayList<>();                listenerHolder.put(clazz, listeners);            }        }        listeners.add(listener);    }    public static final List<EventListener> getListeners(Class<? extends Event> clazz) {        return listenerHolder.get(clazz);    }    public static final void fireEvent(Event event) {        // 获取listener        List<EventListener> listeners = listenerHolder.get(event.getClass());        if (!CollectionUtils.isEmpty(listeners)) {            listeners.forEach(listener -> listener.onEvent(event));        }    }}
源码地址

标签: #clienthold如何解决