前言:
目前各位老铁们对“clienthold如何解决”可能比较注意,朋友们都想要了解一些“clienthold如何解决”的相关知识。那么小编同时在网络上搜集了一些对于“clienthold如何解决””的相关内容,希望咱们能喜欢,我们快快来学习一下吧!长轮询实时通信的几种方式socket
实时通信,能够做到双方通信
轮训
实现简单,调用次数频繁,消息通知会产生延迟
长轮询
服务端实现复杂,消息实时性高,能够做到实时通信
MQ中采用的通信方式
在MQ中,多数会采用长轮询的方式获取消息,我们来分析一下这几种方式各自有什么优势
通信方式
实现复杂度
消息实时性
缺点
socket
一般
实时性高
需要维持链接,服务端推送消息,无法感知客户端的消费速率,容易造成客户端消息积压
轮训
简单
实时性低
消息容易产生延迟,频繁调用,网络开销大
长轮询
复杂
实时性高
实现起来较为复杂
长轮询的核心思想
用户发起一个请求,服务端将请求异步,并持有这个请求,当服务端数据发生变更的时候,找到相应的请求,并将数据返回,没有数据发送变更时,服务端通过延时的定时任务,将请求返回,客户端在收到请求后,重新发起新的请求。长轮询减少了请求次数,并提高了获取变更数据的实时性。
长轮询代码实现LongPollingController
提供http接口,供外部调用,并发起长轮询
@RestController@RequestMapping("longPolling")public class LongPollingController { @Autowired private LongPollingAdapter longPollingAdapter; @PostMapping("listen") public void listen(@Validated @RequestBody LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) { longPollingAdapter.doLongPolling(longPollingDto, request, response); }}LongPollingAdapter
兼容短轮训,对于支持长轮询的请求,开始执行长轮询
public class LongPollingAdapter { @Autowired private LongPollingService longPollingService; public void doLongPolling(LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) { // 判断是否支持长轮询 if (isSupportLongPolling(request)) { longPollingService.addLongPollingClient(longPollingDto, request, response); return; } // 兼容短轮训 ConfigData configData = ConfigDataPersistent.getConfigData(longPollingDto.getAppCode()); if (configData == null) { ResponseUtils.generatorResponse(response, Response.builder().code(-1).message("appCode不存在").build()); return; } if (longPollingDto.getMd5().equals(configData.getMd5())) { // 数据未发生变化 ResponseUtils.generatorResponse(response, Response.builder().success()); } ResponseUtils.generatorResponse(response, Response.builder().data(configData)); } private boolean isSupportLongPolling(HttpServletRequest request) { return request.getHeader(CommonConstant.LONG_POLLING_TIMEOUT) != null; }}LongPollingService继承SingleEventListener,用于触发LocalDataChangeEvent事件LongPollingClient,hold住请求,并开启延时定时任务,当定时执行时,返回改变的数据,并释放连接DataChangeTask,用于触发LocalDataChangeEvent事件时,返回改变的数据,并释放连接
@Servicepublic class LongPollingService extends SingleEventListener { @Value("${pool.with.fix.rate:false}") private boolean isFixPoll; private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); private static final Queue<LongPollingClient> longPollingClientQueue = new ConcurrentLinkedQueue<>(); public void addLongPollingClient(LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) { String timeoutStr = request.getHeader(CommonConstant.LONG_POLLING_TIMEOUT); int timeout = Math.max(10000, Integer.valueOf(timeoutStr) - 500); if (isFixPolling()) { timeout = 10000; } else { // 检测数据是否发生变化 ConfigData configData = ConfigDataPersistent.getConfigData(longPollingDto.getAppCode()); if (configData != null && !configData.getMd5().equals(longPollingDto.getMd5())) { // 数据发生变化,直接返回 ResponseUtils.generatorResponse(response, Response.builder().data(configData)); return; } } // 开启异步 AsyncContext asyncContext = request.startAsync(); // 通过线程池异步请求 executor.execute(new LongPollingClient(longPollingDto.getAppCode(), longPollingDto.getMd5(), asyncContext, timeout)); } /** * 是否以固定的频率返回数据 */ private boolean isFixPolling() { return isFixPoll; } @Override public Class<? extends Event> interest() { return LocalDataChangeEvent.class; } /** * 触发事件 * * @param event */ @Override public void onEvent(Event event) { if (isFixPolling()) { // 忽略,待定时执行时返回变更的数据 } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent localDataChangeEvent = (LocalDataChangeEvent) event; executor.execute(new DataChangeTask(localDataChangeEvent.getAppCode())); } } } @Data class LongPollingClient implements Runnable { private final String appCode; private final String md5; private final AsyncContext asyncContext; private final int timeout; Future<?> asyncTimeoutFuture; public LongPollingClient(String appCode, String md5, AsyncContext asyncContext, int timeout) { this.appCode = appCode; this.md5 = md5; this.asyncContext = asyncContext; this.timeout = timeout; } @Override public void run() { asyncTimeoutFuture = executorService.schedule(() -> { longPollingClientQueue.remove(this); // 固定时间的模式通过此定时返回变更的数据 if (isFixPolling()) { ConfigData configData = ConfigDataPersistent.getConfigData(appCode); if (configData != null && !md5.equals(configData.getMd5())) { sendResponse(configData); } else { sendResponse(null); } } else { // 无数据变更,只是使用此方式hold请求, sendResponse(null); } }, timeout, TimeUnit.MILLISECONDS); longPollingClientQueue.add(this); } void sendResponse(ConfigData configData) { // 取消定时 if (asyncTimeoutFuture != null) { asyncTimeoutFuture.cancel(false); } generatorResponse(configData); } void generatorResponse(ConfigData configData) { if (configData != null) { ResponseUtils.generatorResponse((HttpServletResponse) asyncContext.getResponse(), Response.builder().data(configData)); } // 结束hold住的请求 asyncContext.complete(); } } class DataChangeTask implements Runnable { private final String appCode; public DataChangeTask(String appCode) { this.appCode = appCode; } @Override public void run() { if (CollectionUtils.isEmpty(longPollingClientQueue)) { return; } Iterator<LongPollingClient> iterator = longPollingClientQueue.iterator(); while (iterator.hasNext()) { LongPollingClient longPollingClient = iterator.next(); if (appCode.equals(longPollingClient.getAppCode())) { // 移除持有的请求 iterator.remove(); ConfigData configData = ConfigDataPersistent.getConfigData(appCode); longPollingClient.sendResponse(configData); } } } }}事件驱动推送数据EventListener
public interface EventListener { /** * 感兴趣的事件 * * @return */ List<Class<? extends Event>> interests(); /** * 触发事件 * * @param event */ void onEvent(Event event);}
listener顶层接口,定时感兴趣的事件,以及触发事件
AbstractEventListener
@Slf4jpublic abstract class AbstractEventListener implements EventListener { /** * spring初始化bean的时候,自动添加listener * */ @PostConstruct public void init() { List<Class<? extends Event>> interests = interests(); if (CollectionUtils.isEmpty(interests)) { log.error("没有感兴趣的事件"); return; } interests.forEach(clazz -> EventDispatcher.addListener(clazz, this)); }}
定义抽象类,用于在bean初始化时,自动注册listener
SingleEventListener
public abstract class SingleEventListener extends AbstractEventListener { /** * 感兴趣的时间 * * @return */ @Override public final List<Class<? extends Event>> interests() { return Lists.newArrayList(interest()); } public abstract Class<? extends Event> interest();}
只对单个事件感兴趣
EventDispatcher
public class EventDispatcher { private EventDispatcher() { } private static final Map<Class<? extends Event>, List<EventListener>> listenerHolder = new ConcurrentHashMap<>(); public static final void addListener(Class<? extends Event> clazz, EventListener listener) { List<EventListener> listeners; // 非线程安全,需要使用synchronized synchronized (EventDispatcher.class) { if (listenerHolder.containsKey(clazz)) { listeners = listenerHolder.get(clazz); } else { listeners = new ArrayList<>(); listenerHolder.put(clazz, listeners); } } listeners.add(listener); } public static final List<EventListener> getListeners(Class<? extends Event> clazz) { return listenerHolder.get(clazz); } public static final void fireEvent(Event event) { // 获取listener List<EventListener> listeners = listenerHolder.get(event.getClass()); if (!CollectionUtils.isEmpty(listeners)) { listeners.forEach(listener -> listener.onEvent(event)); } }}源码地址
标签: #clienthold如何解决