通过netty实现服务端与客户端的长连接通讯,及心跳检测。
基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。
环境JDK1.8 和netty5
以下是具体的代码实现和介绍:
1公共的Share部分(主要包含消息协议类型的定义)
设计消息类型:
public enum MsgType { PING,ASK,REPLY,LOGIN }
Message基类:
//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!! public abstract class BaseMsg implements Serializable { private static final long serialVersionUID = 1L; private MsgType type; //必须唯一,否者会出现channel调用混乱 private String clientId; //初始化客户端id public BaseMsg() { this.clientId = Constants.getClientId(); } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public MsgType getType() { return type; } public void setType(MsgType type) { this.type = type; } }
常量设置:
public class Constants { private static String clientId; public static String getClientId() { return clientId; } public static void setClientId(String clientId) { Constants.clientId = clientId; } }
登录类型消息:
public class LoginMsg extends BaseMsg { private String userName; private String password; public LoginMsg() { super(); setType(MsgType.LOGIN); } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } }
心跳检测Ping类型消息:
public class PingMsg extends BaseMsg { public PingMsg() { super(); setType(MsgType.PING); } }
请求类型消息:
public class AskMsg extends BaseMsg { public AskMsg() { super(); setType(MsgType.ASK); } private AskParams params; public AskParams getParams() { return params; } public void setParams(AskParams params) { this.params = params; } } //请求类型参数 //必须实现序列化接口 public class AskParams implements Serializable { private static final long serialVersionUID = 1L; private String auth; public String getAuth() { return auth; } public void setAuth(String auth) { this.auth = auth; } }
响应类型消息:
public class ReplyMsg extends BaseMsg { public ReplyMsg() { super(); setType(MsgType.REPLY); } private ReplyBody body; public ReplyBody getBody() { return body; } public void setBody(ReplyBody body) { this.body = body; } } //相应类型body对像 public class ReplyBody implements Serializable { private static final long serialVersionUID = 1L; } public class ReplyClientBody extends ReplyBody { private String clientInfo; public ReplyClientBody(String clientInfo) { this.clientInfo = clientInfo; } public String getClientInfo() { return clientInfo; } public void setClientInfo(String clientInfo) { this.clientInfo = clientInfo; } } public class ReplyServerBody extends ReplyBody { private String serverInfo; public ReplyServerBody(String serverInfo) { this.serverInfo = serverInfo; } public String getServerInfo() { return serverInfo; } public void setServerInfo(String serverInfo) { this.serverInfo = serverInfo; } }
2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.
Map:
public class NettyChannelMap { private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>(); public static void add(String clientId,SocketChannel socketChannel){ map.put(clientId,socketChannel); } public static Channel get(String clientId){ return map.get(clientId); } public static void remove(SocketChannel socketChannel){ for (Map.Entry entry:map.entrySet()){ if (entry.getValue()==socketChannel){ map.remove(entry.getKey()); } } } }
Handler
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //channel失效,从Map中移除 NettyChannelMap.remove((SocketChannel)ctx.channel()); } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception { if(MsgType.LOGIN.equals(baseMsg.getType())){ LoginMsg loginMsg=(LoginMsg)baseMsg; if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){ //登录成功,把channel存到服务端的map中 NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel()); System.out.println("client"+loginMsg.getClientId()+" 登录成功"); } }else{ if(NettyChannelMap.get(baseMsg.getClientId())==null){ //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录 LoginMsg loginMsg=new LoginMsg(); channelHandlerContext.channel().writeAndFlush(loginMsg); } } switch (baseMsg.getType()){ case PING:{ PingMsg pingMsg=(PingMsg)baseMsg; PingMsg replyPing=new PingMsg(); NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing); }break; case ASK:{ //收到客户端的请求 AskMsg askMsg=(AskMsg)baseMsg; if("authToken".equals(askMsg.getParams().getAuth())){ ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!"); ReplyMsg replyMsg=new ReplyMsg(); replyMsg.setBody(replyBody); NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg); } }break; case REPLY:{ //收到客户端回复 ReplyMsg replyMsg=(ReplyMsg)baseMsg; ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody(); System.out.println("receive client msg: "+clientBody.getClientInfo()); }break; default:break; } ReferenceCountUtil.release(baseMsg); } }
ServerBootstrap:
public class NettyServerBootstrap { private int port; private SocketChannel socketChannel; public NettyServerBootstrap(int port) throws InterruptedException { this.port = port; bind(); } private void bind() throws InterruptedException { EventLoopGroup boss=new NioEventLoopGroup(); EventLoopGroup worker=new NioEventLoopGroup(); ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(boss,worker); bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_BACKLOG, 128); //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 bootstrap.option(ChannelOption.TCP_NODELAY, true); //保持长连接状态 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new ObjectEncoder()); p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); p.addLast(new NettyServerHandler()); } }); ChannelFuture f= bootstrap.bind(port).sync(); if(f.isSuccess()){ System.out.println("server start---------------"); } } public static void main(String []args) throws InterruptedException { NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999); while (true){ SocketChannel channel=(SocketChannel)NettyChannelMap.get("001"); if(channel!=null){ AskMsg askMsg=new AskMsg(); channel.writeAndFlush(askMsg); } TimeUnit.SECONDS.sleep(5); } } }
3 Client端:包含发起登录,发送心跳,及对应消息处理
handler
public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> { //利用写空闲发送心跳检测消息 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case WRITER_IDLE: PingMsg pingMsg=new PingMsg(); ctx.writeAndFlush(pingMsg); System.out.println("send ping to server----------"); break; default: break; } } } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception { MsgType msgType=baseMsg.getType(); switch (msgType){ case LOGIN:{ //向服务器发起登录 LoginMsg loginMsg=new LoginMsg(); loginMsg.setPassword("yao"); loginMsg.setUserName("robin"); channelHandlerContext.writeAndFlush(loginMsg); }break; case PING:{ System.out.println("receive ping from server----------"); }break; case ASK:{ ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!"); ReplyMsg replyMsg=new ReplyMsg(); replyMsg.setBody(replyClientBody); channelHandlerContext.writeAndFlush(replyMsg); }break; case REPLY:{ ReplyMsg replyMsg=(ReplyMsg)baseMsg; ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody(); System.out.println("receive client msg: "+replyServerBody.getServerInfo()); } default:break; } ReferenceCountUtil.release(msgType); } }
bootstrap
public class NettyClientBootstrap { private int port; private String host; private SocketChannel socketChannel; private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20); public NettyClientBootstrap(int port, String host) throws InterruptedException { this.port = port; this.host = host; start(); } private void start() throws InterruptedException { EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE,true); bootstrap.group(eventLoopGroup); bootstrap.remoteAddress(host,port); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0)); socketChannel.pipeline().addLast(new ObjectEncoder()); socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture future =bootstrap.connect(host,port).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel)future.channel(); System.out.println("connect server 成功---------"); } } public static void main(String[]args) throws InterruptedException { Constants.setClientId("001"); NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost"); LoginMsg loginMsg=new LoginMsg(); loginMsg.setPassword("yao"); loginMsg.setUserName("robin"); bootstrap.socketChannel.writeAndFlush(loginMsg); while (true){ TimeUnit.SECONDS.sleep(3); AskMsg askMsg=new AskMsg(); AskParams askParams=new AskParams(); askParams.setAuth("authToken"); askMsg.setParams(askParams); bootstrap.socketChannel.writeAndFlush(askMsg); } } }
具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient
转发请注明来源:http://my.oschina.net/robinyao/blog/399060
总结:
java实现tcp的序列化和反序列化的时候,最好使用json字符串传递数据。这样确保客户端反序列化成功。如果客户端和服务端都是使用java实现,则使用对象序列化和反序列化可以的。如果客户端使用非java语言实现的,不能使用java的对象序列化和反序列化。最好使用字符串传递数据。推荐使用json字符串。
相关推荐
用netty实现长连接和心跳监测的示例代码
springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合...
使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据,数据可以批量进行操作
利用netty实现Modbus TCP client/server READ COILS | 0x01 READ DISCRETE INPUTS | 0x02 READ HOLDING REGISTERS | 0x03 READ INPUT REGISTERS | 0x04 WRITE SINGLE COIL | 0x05 WRITE SINGLE REGISTER | 0x06 ...
Netty4长连接、断开重连、心跳检测、Msgpack编码解码 http://blog.csdn.net/giousa/article/details/72846303#t2
Netty实现TCP服务
Java异步NIO框架Netty实现高性能高并发,通过netty搭建TCP、UDP服务,支持物联网设备上行,下行
NULL 博文链接:https://gjp014.iteye.com/blog/2390925
随着物联网的发展,随之出现了各种传感器监测数据的实时发送,需要和netty服务器通讯,netty和传感器之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息) 碰到的问题: netty作为服务器端如何...
netty 底层,实现心跳检测,下线重连,发送消息.服务端注册到zookeeper,客户端连接服务端
1、下载后导入eclipse(maven工程) 2、先运行ServerTest 3、再运行ClientTest,即可看到输出结果 4、保证可用,亲手编写,欢迎大家指正不足
基于Netty实现的内网穿透&反向代理的工具 (支持TCP上层协议和HTTP的穿透式反向代理).zip
1、客户端与服务端基于单一长连接进行通信,客户端一条连接被多个线程使用。 2、实现私有协议 请求协议 协议头:4字节的请求序列号 2字节的请求类型 2字节数据包长度 数据部分: 服务调用:2字节请求服务名...
实现netty作为服务端,websocket连接成功,将channel保存到map集合,通过js发送心跳,服务端接收心跳信息并响应给客户端,当服务端断开时 客户端进行重连操作
spring整合netty心跳检测,spring整合netty心跳检测,spring整合netty心跳检测,spring整合netty心跳检测
spring boot 整合的netty 实现的socket的demo(包括服务端和客户端是分开的两个项目,导入idea,启动即可)。
基于netty的心跳检测技术,测试过可以在java端和android上使用
主要介绍了使用Netty解决TCP粘包和拆包问题过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
NULL 博文链接:https://hbxflihua.iteye.com/blog/2236369
基于springboot+netty实现的心跳检测源码+项目说明文档.zip (1),NioEventLoopGroup是一个处理I / O操作的多线程事件循环。 Netty为不同类型的传输提供各种EventLoopGroup实现。我们在此示例中实现了服务器端应用程序...