首页 / NETTY / netty实现消息转发服务
netty实现消息转发服务
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了netty实现消息转发服务,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含6922字,纯文字阅读大概需要10分钟。
内容图文
![netty实现消息转发服务](/upload/InfoBanner/zyjiaocheng/1312/095f821e8e6f4c72baf368246c6b5c91.jpg)
1、结构图
2、消息服务器
消息服务器(SNS)由Http Netty Server(HNS)和WebSocket Netty Server(WNS)组成。HNS采用Netty Http+XML协议栈开发实现,WNS采用Netty WebSocket+JSON实现。
HNS只接收预定义的HttpXmlRequest类型的数据,这由编解码器控制,编解码器是继承了MessageToMessageDecoder<T>和MessageToMessageEncoder<T>这两个编解码基础类、并用于解析处理预定义HttpXmlRequest数据的类。HNS根据接收结果向客户端发送预定义的HttpXmlResponse类型数据。
HNS可以通过HttpXmlClient创建与业务服务器的链接,并通过HttpXmlClientHandler转发业务请求。HttpXmlClientHandler继承自SimpleChannelInboundHandler,通过它可以实现HNS与业务服务器的异步通信。
目前,WNS主要用于与Web客户端端进行websocket通信,WNS通过全局变量Global.WSCG维护通道信息,通过Global.appUsers维护客户端连接。WNS定义了一个消息基类BaseMsg,该类描述了客户端发起请求所需要的数据信息。同样,WNS也定义了一个AppUser类用于存储客户端信息,必须说明的一点是,同一个AppUser可能存在多个通道,因此,在AppUser中定义了一个ChannelId数组,该数组维护了当前用户的所有通道ID。客户端发起连接请求时,必要的数据包括appid、userId、cmd,appid是一个业务服务器的唯一标识。
3、业务服务器
业务服务器是各个应用端建立的与SNS交互的Http Netty Server,换句话说,每一个应用都需要启动一个HNS用于与SNS交互。同样的,业务服务器也是通过HttpXmlClient向SNS的HNS发起连接请求,不再赘述。
4、HttpXmlServer
package com.sns.protocol.http.xml.server; import java.net.InetSocketAddress; import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest; import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequestDecoder; import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponseEncoder; import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; public class HttpXmlServer implements Runnable { private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private SimpleChannelInboundHandler<HttpXmlRequest> handler = null; private int port = 9999; @SuppressWarnings("unused") private HttpXmlServer() { } public HttpXmlServer(int _port, SimpleChannelInboundHandler<HttpXmlRequest> _handler) { this.port = _port; this.handler = _handler; } @Override public void run() { // 处理网络连接---接受请求 bossGroup = new NioEventLoopGroup(); // 进行socketChannel的网络读写 workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // boss线程接收参数设置,BACKLOG用于构造服务端套接字ServerSocket对象, // 标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。 // 如果未设置或所设置的值小于1,Java将使用默认值50。 .option(ChannelOption.SO_BACKLOG, 1024) // 发送缓冲器 .option(ChannelOption.SO_SNDBUF, 1024) // 接收缓冲器 .option(ChannelOption.SO_RCVBUF, 1024) // 接收缓冲分配器 .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536)) // work线程参数设置 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-decoder", new HttpRequestDecoder()); ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("xml-decoder", new HttpXmlRequestDecoder(HttpRequestMessage.class, true)); ch.pipeline().addLast("http-encoder", new HttpResponseEncoder()); ch.pipeline().addLast("xml-encoder", new HttpXmlResponseEncoder()); ch.pipeline().addLast("xmlServerHandler", handler); } }); ChannelFuture future = b.bind(new InetSocketAddress(port)).sync(); System.out.println("HTTP netty server started. the port is " + port); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public void shutdown() { if (bossGroup != null) bossGroup.shutdownGracefully(); if (workerGroup != null) workerGroup.shutdownGracefully(); } }
5、HttpXmlServerHandler
package com.sns.protocol.http.xml.server; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest; import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponse; import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage; import com.zehin.sns.protocol.http.xml.pojo.HttpResponseMessage; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @Sharable public final class HttpXmlServerHandler extends SimpleChannelInboundHandler<HttpXmlRequest> { @Override public void messageReceived(final ChannelHandlerContext ctx, HttpXmlRequest xmlRequest) throws Exception { HttpRequest request = xmlRequest.getRequest(); HttpRequestMessage reqMessage = (HttpRequestMessage) xmlRequest.getBody(); System.out.println("Http server receive request : " + reqMessage); HttpResponseMessage resMessage = dobusiness(reqMessage); ChannelFuture future = ctx.writeAndFlush(new HttpXmlResponse(null, resMessage)); if (!isKeepAlive(request)) { future.addListener(new GenericFutureListener<Future<? super Void>>() { public void operationComplete(Future future) throws Exception { ctx.close(); } }); } } private HttpResponseMessage dobusiness(HttpRequestMessage req) { HttpResponseMessage resMessage = new HttpResponseMessage(); if (req.getCmd() == 0) { resMessage.setResult(true); } else { // other verify code here... } return resMessage; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (ctx.channel().isActive()) { sendError(ctx, INTERNAL_SERVER_ERROR); } } private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("失败: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } }
6、备注
主要参考《Netty权威指南》而写了个简单的消息转发。
原文:http://www.cnblogs.com/sdnu/p/5946498.html
内容总结
以上是互联网集市为您收集整理的netty实现消息转发服务全部内容,希望文章能够帮你解决netty实现消息转发服务所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。