Java IO编程入门到精通 - 5 (项目架构重构)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Java IO编程入门到精通 - 5 (项目架构重构),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含15843字,纯文字阅读大概需要23分钟。
内容图文
![Java IO编程入门到精通 - 5 (项目架构重构)](/upload/InfoBanner/zyjiaocheng/599/e445204dda8e4f71a66edc6c63b082ea.jpg)
重构之前的bio服务器,提升项目可扩展性
为什么要重构
从上次http服务器解析实现之后,我思虑再三,最终决定对之前的架构进行重构
- 移除之前的MessageHandler 和 MessageCodec以及ObjectProvider等类
- 定义Channel接口,并创建其实现类SocketChannel,实现对socket进行封装
package com.lhstack.bio.channel;
import java.io.IOException;
/**
* @author lhstack
* 通道
*/
public interface Channel {
/**
* 写消息
* @param msg
* @return
* @throws IOException
*/
Channel write(Object msg) throws IOException;
/**
* 写消息
* @param msg
* @return
* @throws IOException
*/
Channel writeAndFlush(Object msg) throws IOException;
/**
* 不安全的对象
* @param <T>
* @return
*/
<T> T unsafe();
/**
* 获取管道
* @return
*/
ChannelPipeline pipeline();
/**
* 属性
* @param key
* @param val
* @param <T>
* @return
*/
<T> T attr(Object key, Object val);
}
package com.lhstack.bio.channel;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author lhstack
* @descrption 对socket封装
*/
public class SocketChannel implements Channel{
private final Socket socket;
private ChannelPipeline pipeline;
private final Map<Object,Object> attr;
public SocketChannel(Socket socket){
this.socket = socket;
this.attr = new ConcurrentHashMap<>();
}
@Override
public SocketChannel write(Object msg) throws Exception {
AbstractChannelHandlerContext abstractChannelHandlerContext = this.pipeline.getLast();
abstractChannelHandlerContext.write(msg);
return this;
}
@Override
public SocketChannel writeAndFlush(Object msg) throws Exception {
AbstractChannelHandlerContext abstractChannelHandlerContext = this.pipeline.getLast();
abstractChannelHandlerContext.writeAndFlush(msg);
return this;
}
@Override
public <T> T unsafe() {
return (T) this.socket;
}
public void setPipeline(ChannelPipeline pipeline) {
this.pipeline = pipeline;
}
@Override
public ChannelPipeline pipeline() {
return this.pipeline;
}
@Override
public <T> T attr(Object key, Object val) {
if(!this.attr.containsKey(key)){
this.attr.put(key,val);
}
return (T) this.attr.get(key);
}
}
- 创建ChannelHandler接口,定义生命周期方法
package com.lhstack.bio.channel;
/**
* @author lhstack
* 通道处理器
*/
public interface ChannelHandler {
/**
* 通道激活
* @param ctx
* @throws Exception
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* 初始化方法,默认空处理
* @param ch
* @throws Exception
*/
default void initializer(Channel ch) throws Exception{
}
/**
* 通道注册成功
* @param ctx
* @throws Exception
*/
void channelRegister(ChannelHandlerContext ctx) throws Exception;
/**
* 通道注销
* @param ctx
* @throws Exception
*/
void channelRemove(ChannelHandlerContext ctx) throws Exception;
/**
* 通道注销
* @param ctx
* @throws Exception
*/
void exceptionCatch(ChannelHandlerContext ctx,Throwable throwable) throws Exception;
/**
* 读取消息
* @param ctx
* @param msg
* @throws Exception
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* 消息写出
* @param ctx
* @param msg
* @throws Exception
*/
void channelWrite(ChannelHandlerContext ctx, Object msg) throws Exception;
}
5.定义ChannelHandlerContext接口,管理通道上下文
package com.lhstack.bio.channel;
/**
* 通道处理器上下文
*
* @author lhstack
*/
public interface ChannelHandlerContext {
/**
* 通道上下文绑定的通道
*
* @return
*/
Channel channel();
/**
* 通道上下文的名称
*
* @return
*/
String name();
/**
* 管道
*
* @return
*/
ChannelPipeline pipeline();
/**
* 通道上下文绑定的处理器
*
* @return
*/
ChannelHandler handler();
/**
* 执行下一个active方法
*
* @return
* @throws Exception
*/
ChannelHandlerContext fireChannelActive() throws Exception;
/**
* 执行下一个register方法
*
* @return
* @throws Exception
*/
ChannelHandlerContext fireChannelRegister() throws Exception;
/**
* 执行下一个通道上下文的注销方法
* @return
*/
ChannelHandlerContext fireChannelUnregistered() throws Exception;
/**
* 执行下一个通道上下文的一次处理方法
* @param cause
* @return
*/
ChannelHandlerContext fireExceptionCaught(Throwable cause) throws Exception;
/**
* 执行下一个通道上下文的读取方法
* @param msg
* @return
*/
ChannelHandlerContext fireChannelRead(Object msg) throws Exception;
/**
* 属性
* @param key
* @param val
* @param <T>
* @return
*/
<T> T attr(Object key, Object val);
/**
* 执行上一个通道上下文writeAndFlush方法
* @throws Exception
* @param msg
*/
void writeAndFlush(Object msg) throws Exception;
/**
* 执行上一个通道上下文write方法
* @param msg
* @throws Exception
*/
void write(Object msg) throws Exception;
}
6.编写ChannelInitializeHandler.java类,实现初始化添加自定义handler
package com.lhstack.bio.channel;
/**
* @author lhstack
* 通道初始化
*/
public abstract class ChannelInitializeHandler implements ChannelHandler {
/**
* init
* @param ch
* @throws Exception
*/
@Override
public abstract void initializer(Channel ch) throws Exception;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelRegister(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegister();
}
@Override
public void channelRemove(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void exceptionCatch(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
ctx.fireExceptionCaught(throwable);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelWrite(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
}
6.创建Handler适配器
package com.lhstack.bio.channel;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
/**
* @author lhstack
* 通道处理器适配器
*/
public class ChannelHandlerAdapter implements ChannelHandler{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelRegister(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegister();
}
@Override
public void channelRemove(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void exceptionCatch(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
ctx.fireExceptionCaught(throwable);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelWrite(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg instanceof ByteBuf ? ((ByteBuf) msg).copy() : msg);
ReferenceCountUtil.release(msg);
}
}
- 定义通用的消息解编码器
ByteToMessageCodec
package com.lhstack.bio.codec;
import com.lhstack.bio.channel.ChannelHandlerAdapter;
import com.lhstack.bio.channel.ChannelHandlerContext;
import com.lhstack.bio.utils.TypeUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.List;
/**
* @author lhstack
* 消息编解码器
*/
public abstract class ByteToMessageCodec<T> extends ChannelHandlerAdapter {
/**
* 消息编码器
* @param ctx 通道上下文
* @param msg 被编码的消息
* @param buf 编码成功后输出到out
* @throws Exception
*/
protected abstract void messageEncoder(ChannelHandlerContext ctx, T msg,ByteBuf buf) throws Exception;
/**
* messageDecoder
* 将ByteBuf解码成message 解码器的buf不需要回收,框架会自动回收
* @param buf 接收的数据
* @param ctx 通道上下文处理器
* @param out 解码后输出到out即可
* @return Object
*/
protected abstract void messageDecoder(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof ByteBuf){
List<Object> list = new ArrayList<>();
this.messageDecoder(ctx, (ByteBuf) msg,list);
if(!list.isEmpty()){
for (Object o : list) {
ctx.fireChannelRead(o);
ReferenceCountUtil.safeRelease(o);
}
}
}else{
ctx.fireChannelRead(msg);
}
}
/**
* 通道写出消息时,会从低至上的方式
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelWrite(ChannelHandlerContext ctx, Object msg) throws Exception {
if(TypeUtils.mainGenClassSupport(this.getClass(),msg.getClass())){
ByteBuf buf = Unpooled.buffer(128);
this.messageEncoder(ctx, (T) msg,buf);
if(buf.isReadable()){
ctx.write(buf);
}
}
}
}
9.ByteToMessageDecoder.java
package com.lhstack.bio.codec;
import com.lhstack.bio.channel.ChannelHandlerAdapter;
import com.lhstack.bio.channel.ChannelHandlerContext;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.List;
/**
* @author lhstack
* 消息解码器
*/
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
/**
* messageDecoder
* 将ByteBuf解码成message
* @param buf 接收的数据
* @param ctx 通道上下文处理器
* @param out 解码后输出到out即可
* @return Object
*/
protected abstract void messageDecoder(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof ByteBuf){
List<Object> list = new ArrayList<>();
this.messageDecoder(ctx, (ByteBuf) msg,list);
if(!list.isEmpty()){
for (Object o : list) {
ctx.fireChannelRead(o);
ReferenceCountUtil.safeRelease(o);
}
}
}else{
ctx.fireChannelRead(msg);
}
}
}
10.MessageToByteEncoder.java
package com.lhstack.bio.codec;
import com.lhstack.bio.channel.ChannelHandlerAdapter;
import com.lhstack.bio.channel.ChannelHandlerContext;
import com.lhstack.bio.utils.TypeUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* @author lhstack
* 消息编码器
*/
public abstract class MessageToByteEncoder<T> extends ChannelHandlerAdapter {
/**
* 消息编码器
* @param ctx 通道上下文
* @param msg 被编码的消息
* @param buf 编码成功后输出到out
* @throws Exception
*/
protected abstract void messageEncoder(ChannelHandlerContext ctx, T msg,ByteBuf buf) throws Exception;
/**
* 通道写出消息时,会从低至上的方式
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelWrite(ChannelHandlerContext ctx, Object msg) throws Exception {
if(TypeUtils.mainGenClassSupport(this.getClass(),msg.getClass())){
ByteBuf buf = Unpooled.buffer(128);
this.messageEncoder(ctx, (T) msg,buf);
if(buf.isReadable()){
ctx.write(buf.copy());
}
}
}
}
11.将之前的StringCodec以及LengthCodec等编码器迁移到新的实现方式
StringCodec.java
package com.lhstack.bio.codec.string;
import com.lhstack.bio.channel.ChannelHandlerContext;
import com.lhstack.bio.codec.ByteToMessageCodec;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @author lhstack
* 字符串消息编解码器
*/
public class StringMessageCodec extends ByteToMessageCodec<String> {
@Override
protected void messageEncoder(ChannelHandlerContext ctx, String msg, ByteBuf buf) throws Exception {
buf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
}
@Override
protected void messageDecoder(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
if(buf.isReadable()){
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
out.add(new String(bytes,StandardCharsets.UTF_8));
}
}
}
LengthMessageCodec.java
package com.lhstack.bio.codec.length;
import com.lhstack.bio.channel.ChannelHandlerContext;
import com.lhstack.bio.codec.ByteToMessageCodec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.List;
/**
* @author lhstack
* 基于消息长度的编解码器
*/
public class LengthMessageCodec extends ByteToMessageCodec<ByteBuf> {
private final LengthConstant constant;
public LengthMessageCodec(LengthConstant constant){
this.constant = constant;
}
public LengthMessageCodec(){
this.constant = LengthConstant.INT;
}
@Override
protected void messageEncoder(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf buf) throws Exception {
if(msg.isReadable()){
switch (constant){
case BYTE:{
buf.writeByte(msg.readableBytes())
.writeBytes(msg);
}break;
case SHORT:{
buf.writeShort(msg.readableBytes())
.writeBytes(msg);
}break;
case MEDIUM:{
buf.writeMedium(msg.readableBytes())
.writeBytes(msg);
}break;
case INT:{
buf.writeInt(msg.readableBytes())
.writeBytes(msg);
}break;
case LONG:{
buf.writeLong(msg.readableBytes())
.writeBytes(msg);
}break;
default:{
}
}
}
}
@Override
protected void messageDecoder(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
if(buf.isReadable()){
buf.markReaderIndex();
switch (constant){
case BYTE:{
int len = buf.readByte();
if(buf.readableBytes() > len){
ByteBuf buffer = Unpooled.buffer(len);
buf.readBytes(buffer);
out.add(buffer);
}else{
buf.resetReaderIndex();
}
}break;
case SHORT:{
int len = buf.readShort();
if(buf.readableBytes() > len){
ByteBuf buffer = Unpooled.buffer(len);
buf.readBytes(buffer);
out.add(buffer);
}else{
buf.resetReaderIndex();
}
}break;
case MEDIUM:{
int len = buf.readMedium();
if(buf.readableBytes() > len){
ByteBuf buffer = Unpooled.buffer(len);
buf.readBytes(buffer);
out.add(buffer);
}else{
buf.resetReaderIndex();
}
}break;
case INT:{
int len = buf.readInt();
if(buf.readableBytes() >= len){
ByteBuf buffer = Unpooled.buffer(len);
buf.readBytes(buffer);
out.add(buffer);
}else{
buf.resetReaderIndex();
}
}break;
case LONG:{
long len = buf.readLong();
if(buf.readableBytes() > len){
ByteBuf buffer = Unpooled.buffer((int) len);
buf.readBytes(buffer);
out.add(buffer);
}else{
buf.resetReaderIndex();
}
}break;
default:{
}
}
}
}
}
使用案例
package com.lhstack.example;
import com.lhstack.bio.Server;
import com.lhstack.bio.codec.length.LengthConstant;
import com.lhstack.bio.codec.length.LengthMessageDecoder;
import com.lhstack.bio.codec.length.LengthMessageEncoder;
import com.lhstack.bio.codec.string.StringMessageCodec;
import com.lhstack.bio.codec.string.StringMessageDecoder;
import com.lhstack.bio.channel.*;
import com.lhstack.bio.codec.string.StringMessageEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
/**
* @author lhstack
* @description 测试bio server
*/
public class ServerTest {
public static void main(String[] args) throws Exception {
Server server = new Server(8080, 200, new ChannelInitializeHandler() {
@Override
public void initializer(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthMessageDecoder(LengthConstant.INT))
.addLast(new ChannelHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof ByteBuf){
System.out.println(((ByteBuf) msg).toString(StandardCharsets.UTF_8));
}
ctx.writeAndFlush(Unpooled.wrappedBuffer("我是服务端".getBytes(StandardCharsets.UTF_8)));
}
});
}
});
server.start();
}
}
实现我们的客户端
package com.lhstack.example;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.StandardSocketOptions;
import java.nio.charset.StandardCharsets;
/**
* @author lhstack
* @description 测试bio client
*/
public class ClientTest {
public static void main(String[] args) throws Exception{
Socket socket = new Socket();
socket.connect(new InetSocketAddress(8080));
socket.setOption(StandardSocketOptions.SO_SNDBUF,128);
byte[] msg = "hello wasdfdfddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddasaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaorld".getBytes(StandardCharsets.UTF_8);
DataOutputStream dot = new DataOutputStream(socket.getOutputStream());
dot.writeInt(msg.length);
dot.write(msg);
dot.flush();
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[512];
int len = 0;
while((len = inputStream.read(bytes)) > 0){
System.out.println(new String(bytes));
socket.getOutputStream().write("hello world".getBytes(StandardCharsets.UTF_8));
}
}
}
运行效果如下
因为使用了netty-buffer中的byteBuf类,所以在初始化的时候,会打印ByteBuf相关的信息
服务端效果如下
客户端效果如下
由于这次重构,基础架构用了一部分时间,所以后续再完成上一次说的群聊系统,并且继续完善此架构中的缺陷以及功能
内容总结
以上是互联网集市为您收集整理的Java IO编程入门到精通 - 5 (项目架构重构)全部内容,希望文章能够帮你解决Java IO编程入门到精通 - 5 (项目架构重构)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。