龙空技术网

厉害了,Netty 轻松实现文件上传

xxsiyao 389

前言:

如今我们对“net提交class数据”大致比较珍视,小伙伴们都需要了解一些“net提交class数据”的相关资讯。那么小编也在网上网罗了一些有关“net提交class数据””的相关文章,希望各位老铁们能喜欢,你们一起来了解一下吧!

今天我们来完成一个使用netty进行文件传输的任务。在实际项目中,文件传输通常采用FTP或者HTTP附件的方式。事实上通过TCP Socket+File的方式进行文件传输也有一定的应用场景,尽管不是主流,但是掌握这种文件传输方式还是比较重要的,特别是针对两个跨主机的JVM进程之间进行持久化数据的相互交换。

而使用netty来进行文件传输也是利用netty天然的优势:零拷贝功能。很多同学都听说过netty的”零拷贝”功能,但是具体体现在哪里又不知道,下面我们就简要介绍下:

Netty的“零拷贝”主要体现在如下三个方面:

Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。

具体的分析在此就不多做介绍,有兴趣的可以查阅相关文档。我们还是把重点放在文件传输上。Netty作为高性能的服务器端异步IO框架必然也离不开文件读写功能,我们可以使用netty模拟http的形式通过网页上传文件写入服务器,当然要使用http的形式那你也用不着netty!大材小用。

netty4中如果想使用http形式上传文件你还得借助第三方jar包:okhttp。使用该jar完成http请求的发送。但是在netty5 中已经为我们写好了,我们可以直接调用netty5的API就可以实现。

所以netty4和5的差别还是挺大的,至于使用哪个,那就看你们公司选择哪一个了!本文目前使用netty4来实现文件上传功能。下面我们上代码:

pom文件:

<dependency>      <groupId>io.netty</groupId>        <artifactId>netty-all</artifactId>      <version>4.1.5.Final</version></dependency>

server端:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;public class FileUploadServer {    public void bind(int port) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<Channel>() {                @Override                protected void initChannel(Channel ch) throws Exception {                    ch.pipeline().addLast(new ObjectEncoder());                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); // 最大长度                    ch.pipeline().addLast(new FileUploadServerHandler());                }            });            ChannelFuture f = b.bind(port).sync();            f.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) {        int port = 8080;        if (args != null && args.length > 0) {            try {                port = Integer.valueOf(args[0]);            } catch (NumberFormatException e) {                e.printStackTrace();            }        }        try {            new FileUploadServer().bind(port);        } catch (Exception e) {            e.printStackTrace();        }    }}
import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import java.io.File;public class FileUploadClient {    public void connect(int port, String host, final FileUploadFile fileUploadFile) throws Exception {        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {                @Override                protected void initChannel(Channel ch) throws Exception {                    ch.pipeline().addLast(new ObjectEncoder());                    ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));                    ch.pipeline().addLast(new FileUploadClientHandler(fileUploadFile));                }            });            ChannelFuture f = b.connect(host, port).sync();            f.channel().closeFuture().sync();        } finally {            group.shutdownGracefully();        }    }    public static void main(String[] args) {        int port = 8080;        if (args != null && args.length > 0) {            try {                port = Integer.valueOf(args[0]);            } catch (NumberFormatException e) {                e.printStackTrace();            }        }        try {            FileUploadFile uploadFile = new FileUploadFile();            File file = new File("c:/1.txt");            String fileMd5 = file.getName();// 文件名            uploadFile.setFile(file);            uploadFile.setFile_md5(fileMd5);            uploadFile.setStarPos(0);// 文件开始位置            new FileUploadClient().connect(port, "127.0.0.1", uploadFile);        } catch (Exception e) {            e.printStackTrace();        }    }}
import java.io.File;import java.io.Serializable;public class FileUploadFile implements Serializable {    private static final long serialVersionUID = 1L;    private File file;// 文件    private String file_md5;// 文件名    private int starPos;// 开始位置    private byte[] bytes;// 文件字节数组    private int endPos;// 结尾位置    public int getStarPos() {        return starPos;    }    public void setStarPos(int starPos) {        this.starPos = starPos;    }    public int getEndPos() {        return endPos;    }    public void setEndPos(int endPos) {        this.endPos = endPos;    }    public byte[] getBytes() {        return bytes;    }    public void setBytes(byte[] bytes) {        this.bytes = bytes;    }    public File getFile() {        return file;    }    public void setFile(File file) {        this.file = file;    }    public String getFile_md5() {        return file_md5;    }    public void setFile_md5(String file_md5) {        this.file_md5 = file_md5;    }}

输出为:

块儿长度:894长度:8052-----------------------------894byte 长度:894块儿长度:894长度:7158-----------------------------894byte 长度:894块儿长度:894长度:6264-----------------------------894byte 长度:894块儿长度:894长度:5370-----------------------------894byte 长度:894块儿长度:894长度:4476-----------------------------894byte 长度:894块儿长度:894长度:3582-----------------------------894byte 长度:894块儿长度:894长度:2688-----------------------------894byte 长度:894块儿长度:894长度:1794-----------------------------894byte 长度:894块儿长度:894长度:900-----------------------------894byte 长度:894块儿长度:894长度:6-----------------------------6byte 长度:6块儿长度:894长度:0-----------------------------0文件已经读完--------0Process finished with exit code 0

这样就实现了服务器端文件的上传,当然我们也可以使用http的形式。

server端:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;public class HttpFileServer implements Runnable {    private int port;    public HttpFileServer(int port) {        super();        this.port = port;    }    @Override    public void run() {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        ServerBootstrap serverBootstrap = new ServerBootstrap();        serverBootstrap.group(bossGroup, workerGroup);        serverBootstrap.channel(NioServerSocketChannel.class);        //serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));        serverBootstrap.childHandler(new HttpChannelInitlalizer());        try {            ChannelFuture f = serverBootstrap.bind(port).sync();            f.channel().closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) {        HttpFileServer b = new HttpFileServer(9003);        new Thread(b).start();    }}

Server端initializer:

import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.stream.ChunkedWriteHandler;public class HttpChannelInitlalizer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        pipeline.addLast(new HttpServerCodec());        pipeline.addLast(new HttpObjectAggregator(65536));        pipeline.addLast(new ChunkedWriteHandler());        pipeline.addLast(new HttpChannelHandler());    }}

server端hadler:

import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelProgressiveFuture;import io.netty.channel.ChannelProgressiveFutureListener;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.DefaultFullHttpResponse;import io.netty.handler.codec.http.DefaultHttpResponse;import io.netty.handler.codec.http.FullHttpRequest;import io.netty.handler.codec.http.FullHttpResponse;import io.netty.handler.codec.http.HttpChunkedInput;import io.netty.handler.codec.http.HttpHeaders;import io.netty.handler.codec.http.HttpResponse;import io.netty.handler.codec.http.HttpResponseStatus;import io.netty.handler.codec.http.HttpVersion;import io.netty.handler.codec.http.LastHttpContent;import io.netty.handler.stream.ChunkedFile;import io.netty.util.CharsetUtil;import io.netty.util.internal.SystemPropertyUtil;import java.io.File;import java.io.FileNotFoundException;import java.io.RandomAccessFile;import java.io.UnsupportedEncodingException;import java.net.URLDecoder;import java.util.regex.Pattern;import javax.activation.MimetypesFileTypeMap;public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {    public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";    public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";    public static final int HTTP_CACHE_SECONDS = 60;    @Override    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {        // 监测解码情况        if (!request.getDecoderResult().isSuccess()) {            sendError(ctx, BAD_REQUEST);            return;        }        final String uri = request.getUri();        final String path = sanitizeUri(uri);        System.out.println("get file:"+path);        if (path == null) {            sendError(ctx, FORBIDDEN);            return;        }        //读取要下载的文件        File file = new File(path);        if (file.isHidden() || !file.exists()) {            sendError(ctx, NOT_FOUND);            return;        }        if (!file.isFile()) {            sendError(ctx, FORBIDDEN);            return;        }        RandomAccessFile raf;        try {            raf = new RandomAccessFile(file, "r");        } catch (FileNotFoundException ignore) {            sendError(ctx, NOT_FOUND);            return;        }        long fileLength = raf.length();        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);        HttpHeaders.setContentLength(response, fileLength);        setContentTypeHeader(response, file);        //setDateAndCacheHeaders(response, file);        if (HttpHeaders.isKeepAlive(request)) {            response.headers().set("CONNECTION", HttpHeaders.Values.KEEP_ALIVE);        }        // Write the initial line and the header.        ctx.write(response);        // Write the content.        ChannelFuture sendFileFuture =        ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), ctx.newProgressivePromise());        //sendFuture用于监视发送数据的状态        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {            @Override            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {                if (total < 0) { // total unknown                    System.err.println(future.channel() + " Transfer progress: " + progress);                } else {                    System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);                }            }            @Override            public void operationComplete(ChannelProgressiveFuture future) {                System.err.println(future.channel() + " Transfer complete.");            }        });        // Write the end marker        ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);        // Decide whether to close the connection or not.        if (!HttpHeaders.isKeepAlive(request)) {            // Close the connection when the whole content is written out.            lastContentFuture.addListener(ChannelFutureListener.CLOSE);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        if (ctx.channel().isActive()) {            sendError(ctx, INTERNAL_SERVER_ERROR);        }        ctx.close();    }    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");    private static String sanitizeUri(String uri) {        // Decode the path.        try {            uri = URLDecoder.decode(uri, "UTF-8");        } catch (UnsupportedEncodingException e) {            throw new Error(e);        }        if (!uri.startsWith("/")) {            return null;        }        // Convert file separators.        uri = uri.replace('/', File.separatorChar);        // Simplistic dumb security check.        // You will have to do something serious in the production environment.        if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".")                || INSECURE_URI.matcher(uri).matches()) {            return null;        }        // Convert to absolute path.        return SystemPropertyUtil.get("user.dir") + File.separator + uri;    }    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");        // Close the connection as soon as the error message is sent.        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);    }    /**     * Sets the content type header for the HTTP Response     *     * @param response     *            HTTP response     * @param file     *            file to extract content type     */    private static void setContentTypeHeader(HttpResponse response, File file) {        MimetypesFileTypeMap m = new MimetypesFileTypeMap();        String contentType = m.getContentType(file.getPath());        if (!contentType.equals("application/octet-stream")) {            contentType += "; charset=utf-8";        }        response.headers().set(CONTENT_TYPE, contentType);    }}
import java.io.File;import java.io.FileOutputStream;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.http.HttpContent;//import io.netty.handler.codec.http.HttpHeaders;import io.netty.handler.codec.http.HttpResponse;import io.netty.handler.codec.http.LastHttpContent;import io.netty.util.internal.SystemPropertyUtil;/** * @Author:yangyue * @Description: * @Date: Created in 9:15 on 2017/5/28. */public class HttpDownloadHandler extends ChannelInboundHandlerAdapter {    private boolean readingChunks = false; // 分块读取开关    private FileOutputStream fOutputStream = null;// 文件输出流    private File localfile = null;// 下载文件的本地对象    private String local = null;// 待下载文件名    private int succCode;// 状态码    public HttpDownloadHandler(String local) {        this.local = local;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        if (msg instanceof HttpResponse) {// response头信息            HttpResponse response = (HttpResponse) msg;            succCode = response.getStatus().code();            if (succCode == 200) {                setDownLoadFile();// 设置下载文件                readingChunks = true;            }            // System.out.println("CONTENT_TYPE:"            // + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));        }        if (msg instanceof HttpContent) {// response体信息            HttpContent chunk = (HttpContent) msg;            if (chunk instanceof LastHttpContent) {                readingChunks = false;            }            ByteBuf buffer = chunk.content();            byte[] dst = new byte[buffer.readableBytes()];            if (succCode == 200) {                while (buffer.isReadable()) {                    buffer.readBytes(dst);                    fOutputStream.write(dst);                    buffer.release();                }                if (null != fOutputStream) {                    fOutputStream.flush();                }            }        }        if (!readingChunks) {            if (null != fOutputStream) {                System.out.println("Download done->"+ localfile.getAbsolutePath());                fOutputStream.flush();                fOutputStream.close();                localfile = null;                fOutputStream = null;            }            ctx.channel().close();        }    }    /**     * 配置本地参数,准备下载     */    private void setDownLoadFile() throws Exception {        if (null == fOutputStream) {            local = SystemPropertyUtil.get("user.dir") + File.separator +local;            //System.out.println(local);            localfile = new File(local);            if (!localfile.exists()) {                localfile.createNewFile();            }            fOutputStream = new FileOutputStream(localfile);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)            throws Exception {        System.out.println("管道异常:" + cause.getMessage());        cause.printStackTrace();        ctx.channel().close();    }}

这里客户端我放的是网络连接,下载的是一副图片,启动服务端和客户端就可以看到这个图片被下载到了工程的根目录下。

最后来自小编的福利

小编整理了一份大厂真题的面试资料,以及2021最新Java核心技术整理的资料集锦,需要领取的小伙伴可以 私聊关注我 免费领取 ,编程的世界永远向所有热爱编程的人开放,这是一个自由,平等,共享的世界,我始终是这样坚信的。

喜欢小编的分享可以点赞关注哦,小编持续为你分享最新文章 和 福利领取哦

标签: #net提交class数据