龙空技术网

springboot(三十五)使用stomp的java客户端接收websocket数据

聪明的晚风zqw 125

前言:

目前姐妹们对“java接收socket”大约比较关怀,我们都需要学习一些“java接收socket”的相关文章。那么小编在网摘上汇集了一些关于“java接收socket””的相关资讯,希望你们能喜欢,各位老铁们一起来了解一下吧!

在某些测试或其他场景下需要有一个java版的stomp客户端。还好万能的spring爸爸在设计它的时候也考虑到了这一点,因此也提供了创建java客户端的方式。

服务端代码不再赘述,如需要,可参考:

客户端启动类:

package com.iscas.biz.test.wsclient;import org.springframework.messaging.simp.stomp.StompHeaders;import org.springframework.messaging.simp.stomp.StompSessionHandler;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.web.socket.WebSocketHttpHeaders;import org.springframework.web.socket.client.standard.StandardWebSocketClient;import org.springframework.web.socket.messaging.WebSocketStompClient;import org.springframework.web.socket.sockjs.client.SockJsClient;import org.springframework.web.socket.sockjs.client.Transport;import org.springframework.web.socket.sockjs.client.WebSocketTransport;import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;import java.net.URI;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class StompClient {	public static void main(String[] args) {		final CountDownLatch latch = new CountDownLatch(1);		List<Transport> transports = new ArrayList<>(1);		transports.add(new WebSocketTransport(new StandardWebSocketClient()));		SockJsClient transport = new SockJsClient(transports);		transport.setMessageCodec(new Jackson2SockJsMessageCodec());		WebSocketStompClient stompClient = new WebSocketStompClient(transport);		//接收大小限制		stompClient.setInboundMessageSizeLimit(1024 * 1024);		//处理心跳		ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();		taskScheduler.afterPropertiesSet();		//for heartbeats		stompClient.setTaskScheduler(taskScheduler);		StompSessionHandler customHandler = new CustomStompSessionHandler();		//可以发送请求头		StompHeaders stompHeaders = new StompHeaders();		stompHeaders.add("Authorization", "admin");		URI uri = URI.create(";);		stompClient.connect(uri, null, stompHeaders, customHandler);		//阻塞		try {			latch.await(31536000, TimeUnit.SECONDS);			//latch.await(3, TimeUnit.SECONDS);		} catch (InterruptedException e) {			e.printStackTrace();		}	}}

消息接收处理:

package com.iscas.biz.test.wsclient;import org.springframework.messaging.simp.stomp.*;import java.lang.reflect.Type;public class CustomStompSessionHandler extends StompSessionHandlerAdapter {	public CustomStompSessionHandler(){	}	@Override	public void afterConnected(final StompSession session, StompHeaders connectedHeaders) {		System.out.println("StompHeaders: " + connectedHeaders.toString());		//订阅地址,发送端前面没有/user		String destination = "/user/queue/message";		//订阅消息		session.subscribe(destination, new StompFrameHandler() {			@Override			public Type getPayloadType(StompHeaders headers) {				return byte[].class;			}			@Override			public void handleFrame(StompHeaders headers, Object payload) {				//todo 只能接收到byte[]数组,没时间研究原因				System.out.println(new String((byte[])payload));			}		}); 	}	@Override	public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload,								Throwable exception) {		    System.out.println(exception.getMessage());	}	@Override	public void handleTransportError(StompSession session, Throwable exception) {		exception.printStackTrace();		System.out.println("transport error.");	}}

标签: #java接收socket #js获取frame #java stomp客户端