springboot 集成Netty+websocket实现简单的聊天功能
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了springboot 集成Netty+websocket实现简单的聊天功能,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含9986字,纯文字阅读大概需要15分钟。
内容图文
1.maven依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency>
2.springboot入口启动类
import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.support.SpringBootServletInitializer; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; import com.xxxxx.netty.NettyServer; /** * Spring Boot 应用启动类 * * Created by bysocket on 16/4/26. */ // Spring Boot 应用的标识 @SpringBootApplication // mapper 接口类扫描包配置 @EnableTransactionManagement @EnableScheduling @EnableAspectJAutoProxy(proxyTargetClass = true) //开启AspectJ代理,并将proxyTargetClass置为true,表示启用cglib对Class也进行代理 @MapperScan("com.xxxxx.dao") publicclass Application extends SpringBootServletInitializer { publicstaticvoid main(String[] args) { // 程序启动入口 // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件 SpringApplication.run(Application.class, args); try { new NettyServer(8091).start(); }catch(Exception e) { System.out.println("NettyServerError:"+e.getMessage()); } } }
3. NettyServer
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * NettyServer Netty服务器配置 * @author * @date */ public class NettyServer { private static Logger logger = LoggerFactory.getLogger(NettyServer.class); privatefinalint port; public NettyServer(int port) { this.port = port; } publicvoid start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); sb.group(group, bossGroup) // 绑定线程池 .channel(NioServerSocketChannel.class) // 指定使用的channel .localAddress(this.port)// 绑定监听端口 .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作 @Override protectedvoid initChannel(SocketChannel ch) throws Exception { logger.info("收到新的客户端连接: {}",ch.toString()); //websocket协议本身是基于http协议的,所以这边也要使用http解编码器 ch.pipeline().addLast(new HttpServerCodec()); //以块的方式来写的处理器 ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); ch.pipeline().addLast(new MyWebSocketHandler()); } }); ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定 System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress()); cf.channel().closeFuture().sync(); // 关闭服务器通道 } finally { group.shutdownGracefully().sync(); // 释放线程池资源 bossGroup.shutdownGracefully().sync(); } } }
4.MyWebSocketHandler
import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import com.github.pagehelper.Page; import com.xxxxx.service.IServiceXfzhQz; import com.xxxxx.util.SpringUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; /** * MyWebSocketHandler * WebSocket处理器,处理websocket连接相关 * @author * @date */ public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ privatestatic Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class); publicstatic ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //用户id=>channel示例 //可以通过用户的唯一标识保存用户的channel //这样就可以发送给指定的用户publicstatic ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); //由于@Autowired注解注入不进去,所以取巧了static IServiceXfzhQz serviceXfzhQz; static { serviceXfzhQz = SpringUtil.getBean(IServiceXfzhQz.class); } /** * 每当服务端收到新的客户端连接时,客户端的channel存入ChannelGroup列表中,并通知列表中其他客户端channel * @param ctx * @throws Exception */ @Override publicvoid handlerAdded(ChannelHandlerContext ctx) throws Exception { //获取连接的channel Channel incomming = ctx.channel(); //通知所有已经连接到服务器的客户端,有一个新的通道加入/*for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n"); }*/ channelGroup.add(ctx.channel()); } /** *每当服务端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel * @param ctx * @throws Exception */ @Override publicvoid handlerRemoved(ChannelHandlerContext ctx) throws Exception { //获取连接的channel/*Channel incomming = ctx.channel(); for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"离开\n"); }*///从服务端的channelGroup中移除当前离开的客户端 channelGroup.remove(ctx.channel()); //从服务端的channelMap中移除当前离开的客户端 Collection<Channel> col = channelMap.values(); while(true == col.contains(ctx.channel())) { col.remove(ctx.channel()); logger.info("netty客户端连接删除成功!"); } } /** * 服务端监听到客户端活动 * @param ctx * @throws Exception */ @Override publicvoid channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("netty与客户端建立连接,通道开启!"); //添加到channelGroup通道组 //channelGroup.add(ctx.channel()); } /** * 服务端监听到客户端不活动 * @param ctx * @throws Exception */ @Override publicvoid channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info("netty与客户端断开连接,通道关闭!"); //添加到channelGroup 通道组 //channelGroup.remove(ctx.channel()); } /** * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的Channel. * @param ctx * @param msg * @throws Exception */ @Override protectedvoid channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { logger.info("netty客户端收到服务器数据: {}" , msg.text()); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //消息处理类 message(ctx,msg.text(),date); //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text())); } /** * 当服务端的IO 抛出异常时被调用 * @param ctx * @param cause * @throws Exception */ @Override publicvoid exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:" + incoming.remoteAddress()+"异常"); //异常出现就关闭连接 cause.printStackTrace(); ctx.close(); } //消息处理类publicvoid message(ChannelHandlerContext ctx,String msg,String date) { try { Map<String,Object> resultmap = (Map<String,Object>)JSONObject.parse(msg); resultmap.put("CREATEDATE", date); //这里需要用户信息跟channel通道绑定 //所以每当一个客户端连接成功时,第一时间传一条登录信息 //该字段用来判断是登录绑定信息,还是发送信息 String msgtype = (String)resultmap.get("MSGTYPE"); if(msgtype.equals("DL")){//用户登录信息绑定 Channel channel = ctx.channel(); channelMap.put((String) resultmap.get("USERID"), channel); resultmap.put("success", true); resultmap.put("message", "用户链接绑定成功!"); channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString())); logger.info("netty用户: {} 连接绑定成功!" , (String) resultmap.get("USERID")); }elseif(msgtype.equals("DH")){//消息对话 //这里根据群组ID查询出来所有的用户信息并循环发送消息 //如果是单聊 可以直接获取channelMap中用户channel并发送 Page<Map<String, Object>> list = serviceXfzhQz.selectXfzhQzByQzId((String)resultmap.get("QZID")); for (Map<String, Object> map : list) { String userid = (String) map.get("USERID"); //判断该用户ID是否绑定通道if(channelMap.containsKey(userid)){ Channel channel = channelMap.get(userid); channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString())); } } } } catch (Exception e) { e.printStackTrace(); } } }
5.测试页面
向服务端发送消息时,数据格式是json字符串.
<! DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd" > < html xmlns ="http://www.w3.org/1999/xhtml" > < head > < meta http-equiv ="Content-Type" content ="text/html; charset=utf-8" /> < title >Netty-Websocket</title><script type="text/javascript">var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket =new WebSocket("ws://127.0.0.1:8091/ws"); socket.onmessage =function(event){ var ta = document.getElementById(‘responseText‘); ta.value += event.data+"\r\n"; }; socket.onopen =function(event){ var ta = document.getElementById(‘responseText‘); ta.value ="Netty-WebSocket服务器。。。。。。连接 \r\n"; }; socket.onclose =function(event){ var ta = document.getElementById(‘responseText‘); ta.value ="Netty-WebSocket服务器。。。。。。关闭 \r\n"; }; }else{ alert("您的浏览器不支持WebSocket协议!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 连接没有建立成功!"); } } </script></head><body><form onSubmit="return false;"><label>TEXT</label><input type="text" name="message" value="这里输入消息" style="width: 1024px;height: 100px;"/><br /><br /><input type="button" value="发送ws消息" onClick="send(this.form.message.value)"/><hr color="black"/><h3>服务端返回的应答消息</h3><textarea id="responseText" style="width: 1024px;height: 300px;"></textarea></form></body></html>
原文:https://www.cnblogs.com/Lixiaogang/p/13045300.html
内容总结
以上是互联网集市为您收集整理的springboot 集成Netty+websocket实现简单的聊天功能全部内容,希望文章能够帮你解决springboot 集成Netty+websocket实现简单的聊天功能所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。