From d744d29b243bcb039d9306fbf0adc5ff4a0ee7cc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=83=A1=E5=AE=8F=E9=9F=AC?= <3183764662@qq.com>
Date: Tue, 6 Nov 2018 11:29:08 +0800
Subject: [PATCH 1/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=AF=B4=E6=98=8E?=
=?UTF-8?q?=E6=96=87=E6=A1=A3?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
README.md | 126 +++---------------------------------------------------
1 file changed, 5 insertions(+), 121 deletions(-)
diff --git a/README.md b/README.md
index 0e527c3..0d70b8c 100644
--- a/README.md
+++ b/README.md
@@ -1,126 +1,10 @@
-# InChat(当前版本1.6.0)
+# tcp-wechat
-> 当前主要更新分支,master将会再所有分支开放一定阶段后整改为maven包,提供给大家使用
+## 项目介绍
-## 分支介绍 im-api
+针对小程序与单片机硬件执行Iot物联网通讯的一套完整Demo。
-腾讯IM(云通信)后端模仿项目,均以API形式对接,如果有前端想要对接的可以运行本分支,本分支预计终版为一个单服务并发30万用户的IM后台项目
+## 项目尚未初始化
-## 分支介绍 paho-mqtt
+请勿使用。
-基于小程序端或移动web端的paho.js与[java MQTT 客户端模拟](https://github.com/eclipse/paho.mqtt.java)的消息订阅与通信,小程序Iot的Demo,目前支持ws格式
-
-## 分支介绍 tcp-wechat
-
-基于小程序端与单片机等硬件的TCP/IP的主要通信,Iot中心作为中转,本demo将完全实现具体功能,详情请看分支主页
-
-## 简介
-
-***(InChat)Iot Netty Chat***
-
-仿微信聊天应用,一步一步更新,基于SpringBoot-WebSocket通用框架,结合Netty进行聊天社交,并记录聊天日志,
-异步存储,前端暂用SUI Mobile,添加实现TCP/IP后端通信端口(MQTT协议、可实时与单片机等TCP硬件通信)、加入图片处理流,
-聊天实现文字与图片发送功能、API调用Netty长链接执行发送消息(在线数、用户列表)
-
-## 基本架构图(1.5.2版)
-
-
-
-## 功能
-
->实时聊天
->异步CRUD处理消息日志
->获取聊天历史
->用户登录、记录登录用户聊天历史
->防止二次登录
->SUI Mobile仿微信样式
->TCP/IP软硬件通信(8092)
->MQTT协议下的Iot物联网通信(8094)
->图片发送聊天功能
->API调用Netty长链接执行发送消息(在线用户数、用户列表)
->下版(1.7.0):好友功能等
-
-## 版本迭代介绍
-
-* 1.0.0版本
-
-用户登录,聊天历史,随机用户名,异步数据写入:https://segmentfault.com/a/1190000016615063
-
-* 1.2.0版本
-
-修复聊天记录功能,实现重复信息录入,完善前端页面,回车监听等:https://segmentfault.com/a/1190000016637814
-
-* 1.3.0版本
-
-用户注册登录功能,系统聊天绑定用户,禁止二次登录等,前端页面大改
-
-* 1.4.1版本
-
-本人主导SUI Mobile构建仿微信样式页面版,使用时开F12手机界面
-
-* 1.5.2版本
-
-TCP/IP软硬件通信-单片机等应用的TCP通信,Netty处理二进制图片发送聊天功能
-
-* 1.5.8版本
-
-MQTT协议软硬件通信等,Iot物联网
-
-* 1.6.0版本
-
-API调用Netty长链接执行发送消息(在线数、用户列表):https://segmentfault.com/a/1190000016603392
-
-
-## 配置
-
->application.yml 数据库配置、Netty参数配置
-
->TCP需先去com.myself.nettychat.tcptest包下执行CRC16myself获取发送数据,
-
->再执行TCPTestClient发送数据,请勿随意更改发送格式(通信协议来的)
-
->http://localhost:8080/susu/admin/loginsui 启动访问路径
-
->mqtt协议测试在mqttclient包下
-
->http://localhost:8080/susu/swagger-ui.html 查看API文档
-
-## 效果图
-
-.png)
-.png)
-.png)
-.png)
-.png)
-
-
-
-
-## 预留BUG
-
-```
-io.netty.handler.codec.CorruptedFrameException: Max frame length of 65536 has been exceeded.
-图片过大,需要在前端做图片上传压缩
-
-Uncaught TypeError: msg.substring is not a function at WebSocket.socket.onmessage (newChat.js:38)
-前端代码的一点问题,不影响项目正常运行
-
-java.io.IOException: 远程主机强迫关闭了一个现有的连接。
-TCP客户端连接主动关闭,不影响,良性报错
-```
-
-## 下载地址
-
-下载地址:https://github.com/UncleCatMySelf/SBToNettyChat/releases
-
-## 交流与提问
-
-提问与Bug上报:https://github.com/UncleCatMySelf/SBToNettyChat/issues
-
-QQ群:628793702(仅供交流,不提供问题解答)
-
-## 关于作者
-
-个人公众号:UncleCatMySelf
-
-
From 1ee163b4df50e30015424971db03d3a1a749bf06 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=83=A1=E5=AE=8F=E9=9F=AC?= <3183764662@qq.com>
Date: Tue, 6 Nov 2018 14:16:11 +0800
Subject: [PATCH 2/9] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E5=A4=A7?=
=?UTF-8?q?=E8=87=B4Demo?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
h5/chat.html | 105 -----
h5/home.html | 47 --
h5/index.html | 67 ---
h5/logoSmall.png | Bin 8661 -> 0 bytes
pom.xml | 9 +-
sql/nettychat.sql | 71 ---
.../myself/nettychat/DefaultAutoService.java | 19 -
.../nettychat/NettychatApplication.java | 14 +-
.../com/myself/nettychat/auto/InitServer.java | 38 --
.../nettychat/auto/ServerAutoConfigure.java | 86 ----
.../bootstrap/AbstractBootstrapServer.java | 117 -----
.../myself/nettychat/bootstrap/BaseApi.java | 76 ---
.../nettychat/bootstrap/BaseAuthService.java | 13 -
.../nettychat/bootstrap/BootstrapServer.java | 18 -
.../nettychat/bootstrap/ChannelService.java | 40 --
.../bootstrap/NettyBootstrapServer.java | 134 ------
.../nettychat/bootstrap/bean/MqttChannel.java | 115 -----
.../bootstrap/bean/RetainMessage.java | 18 -
.../bootstrap/bean/SendMqttMessage.java | 37 --
.../bootstrap/bean/SessionMessage.java | 29 --
.../nettychat/bootstrap/bean/WillMeaasge.java | 23 -
.../channel/AbstractChannelService.java | 112 -----
.../channel/ClientSessionService.java | 36 --
.../bootstrap/channel/MqttChannelService.java | 440 ------------------
.../bootstrap/channel/MqttHandlerService.java | 223 ---------
.../bootstrap/channel/PublishApiSevice.java | 162 -------
.../bootstrap/channel/WillService.java | 52 ---
.../bootstrap/channel/cache/CacheMap.java | 137 ------
.../coder/ByteBufToWebSocketFrameEncoder.java | 26 --
.../coder/WebSocketFrameToByteBufDecoder.java | 25 -
.../bootstrap/handler/DefaultMqttHandler.java | 105 -----
.../bootstrap/scan/SacnScheduled.java | 59 ---
.../bootstrap/scan/ScanRunnable.java | 44 --
.../common/properties/InitNetty.java | 1 +
.../nettychat/common/utils/ResultVOUtil.java | 60 +--
.../myself/nettychat/config/NettyConfig.java | 75 ---
.../NettyWebSocketChannelInitializer.java | 40 --
.../myself/nettychat/config/TCPServer.java | 11 -
.../config/TextWebSocketFrameHandler.java | 141 ------
.../nettychat/constont/CookieConstant.java | 15 -
.../myself/nettychat/constont/H5Constant.java | 29 --
.../constont/LikeSomeCacheTemplate.java | 36 --
.../controller/NCBackController.java | 35 --
.../controller/NcChangeController.java | 88 ----
.../controller/NcChatController.java | 76 ---
.../controller/NcLoginController.java | 145 ------
.../com/myself/nettychat/dataobject/User.java | 39 --
.../myself/nettychat/dataobject/UserMsg.java | 38 --
.../com/myself/nettychat/form/LoginForm.java | 20 -
.../repository/UserMsgRepository.java | 13 -
.../nettychat/repository/UserRepository.java | 16 -
.../myself/nettychat/service/UserService.java | 22 -
.../service/impl/UserServiceImpl.java | 43 --
.../myself/nettychat/store/TokenStore.java | 45 --
.../myself/nettychat/task/MsgAsyncTesk.java | 50 --
.../myself/nettychat/task/ScheduledPool.java | 33 --
.../nettychat/tcptest/TCPTestClient.java | 4 +-
.../com/myself/nettychat/vo/ResultVo.java | 52 +--
src/main/resources/application.yml | 14 -
src/main/resources/static/css/allchat.css | 59 ---
src/main/resources/static/css/chat.css | 43 --
src/main/resources/static/css/newChat.css | 172 -------
src/main/resources/static/css/registered.css | 160 -------
src/main/resources/static/image/logoBig.jpg | Bin 137374 -> 0 bytes
src/main/resources/static/image/logoSmall.png | Bin 8661 -> 0 bytes
src/main/resources/static/image/nuandao.png | Bin 62980 -> 0 bytes
src/main/resources/static/js/chat.js | 51 --
src/main/resources/static/js/newChat.js | 165 -------
src/main/resources/static/js/registered.js | 72 ---
src/main/resources/templates/chat/allchat.ftl | 49 --
src/main/resources/templates/chat/chat.ftl | 33 --
src/main/resources/templates/common/floor.ftl | 3 -
.../resources/templates/common/header.ftl | 12 -
src/main/resources/templates/find/find.ftl | 41 --
src/main/resources/templates/h5.ftl | 58 ---
src/main/resources/templates/home/home.ftl | 62 ---
src/main/resources/templates/login/login.ftl | 57 ---
.../resources/templates/login/loginSui.ftl | 124 -----
src/main/resources/templates/me/me.ftl | 78 ----
79 files changed, 65 insertions(+), 4812 deletions(-)
delete mode 100644 h5/chat.html
delete mode 100644 h5/home.html
delete mode 100644 h5/index.html
delete mode 100644 h5/logoSmall.png
delete mode 100644 sql/nettychat.sql
delete mode 100644 src/main/java/com/myself/nettychat/DefaultAutoService.java
delete mode 100644 src/main/java/com/myself/nettychat/auto/InitServer.java
delete mode 100644 src/main/java/com/myself/nettychat/auto/ServerAutoConfigure.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/AbstractBootstrapServer.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/BaseApi.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/BaseAuthService.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/BootstrapServer.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/ChannelService.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/NettyBootstrapServer.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/bean/MqttChannel.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/bean/RetainMessage.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/bean/SendMqttMessage.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/bean/SessionMessage.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/bean/WillMeaasge.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/channel/AbstractChannelService.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/channel/ClientSessionService.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/channel/MqttChannelService.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/channel/MqttHandlerService.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/channel/PublishApiSevice.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/channel/WillService.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/channel/cache/CacheMap.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/coder/ByteBufToWebSocketFrameEncoder.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/coder/WebSocketFrameToByteBufDecoder.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/handler/DefaultMqttHandler.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/scan/SacnScheduled.java
delete mode 100644 src/main/java/com/myself/nettychat/bootstrap/scan/ScanRunnable.java
delete mode 100644 src/main/java/com/myself/nettychat/config/NettyConfig.java
delete mode 100644 src/main/java/com/myself/nettychat/config/NettyWebSocketChannelInitializer.java
delete mode 100644 src/main/java/com/myself/nettychat/config/TextWebSocketFrameHandler.java
delete mode 100644 src/main/java/com/myself/nettychat/constont/CookieConstant.java
delete mode 100644 src/main/java/com/myself/nettychat/constont/H5Constant.java
delete mode 100644 src/main/java/com/myself/nettychat/constont/LikeSomeCacheTemplate.java
delete mode 100644 src/main/java/com/myself/nettychat/controller/NcChangeController.java
delete mode 100644 src/main/java/com/myself/nettychat/controller/NcChatController.java
delete mode 100644 src/main/java/com/myself/nettychat/controller/NcLoginController.java
delete mode 100644 src/main/java/com/myself/nettychat/dataobject/User.java
delete mode 100644 src/main/java/com/myself/nettychat/dataobject/UserMsg.java
delete mode 100644 src/main/java/com/myself/nettychat/form/LoginForm.java
delete mode 100644 src/main/java/com/myself/nettychat/repository/UserMsgRepository.java
delete mode 100644 src/main/java/com/myself/nettychat/repository/UserRepository.java
delete mode 100644 src/main/java/com/myself/nettychat/service/UserService.java
delete mode 100644 src/main/java/com/myself/nettychat/service/impl/UserServiceImpl.java
delete mode 100644 src/main/java/com/myself/nettychat/store/TokenStore.java
delete mode 100644 src/main/java/com/myself/nettychat/task/MsgAsyncTesk.java
delete mode 100644 src/main/java/com/myself/nettychat/task/ScheduledPool.java
delete mode 100644 src/main/resources/static/css/allchat.css
delete mode 100644 src/main/resources/static/css/chat.css
delete mode 100644 src/main/resources/static/css/newChat.css
delete mode 100644 src/main/resources/static/css/registered.css
delete mode 100644 src/main/resources/static/image/logoBig.jpg
delete mode 100644 src/main/resources/static/image/logoSmall.png
delete mode 100644 src/main/resources/static/image/nuandao.png
delete mode 100644 src/main/resources/static/js/chat.js
delete mode 100644 src/main/resources/static/js/newChat.js
delete mode 100644 src/main/resources/static/js/registered.js
delete mode 100644 src/main/resources/templates/chat/allchat.ftl
delete mode 100644 src/main/resources/templates/chat/chat.ftl
delete mode 100644 src/main/resources/templates/common/floor.ftl
delete mode 100644 src/main/resources/templates/common/header.ftl
delete mode 100644 src/main/resources/templates/find/find.ftl
delete mode 100644 src/main/resources/templates/h5.ftl
delete mode 100644 src/main/resources/templates/home/home.ftl
delete mode 100644 src/main/resources/templates/login/login.ftl
delete mode 100644 src/main/resources/templates/login/loginSui.ftl
delete mode 100644 src/main/resources/templates/me/me.ftl
diff --git a/h5/chat.html b/h5/chat.html
deleted file mode 100644
index 91d15b2..0000000
--- a/h5/chat.html
+++ /dev/null
@@ -1,105 +0,0 @@
-
-
-
-
- WebSocket Chat
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/h5/home.html b/h5/home.html
deleted file mode 100644
index 541e6b7..0000000
--- a/h5/home.html
+++ /dev/null
@@ -1,47 +0,0 @@
-
-
-
-
-
- 酥酥
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/h5/index.html b/h5/index.html
deleted file mode 100644
index 32e88ea..0000000
--- a/h5/index.html
+++ /dev/null
@@ -1,67 +0,0 @@
-
-
-
-
-
- 酥酥
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/h5/logoSmall.png b/h5/logoSmall.png
deleted file mode 100644
index 08b5d51eb40008001afd2293b59a107f0bfc474d..0000000000000000000000000000000000000000
GIT binary patch
literal 0
HcmV?d00001
literal 8661
zcmZX2byyT#*!NHh(j{FBf`o!}gLHQzNOwsutstG!oze?}v@9VY-QC^Y4evbP_uu>O
zb?u&+ojLdE`xjvn;JR5*d?YkiV3{{M>OlL1V0Fh8;1(_OW
ztN*VhN*4f}AVEF-Tz`o?AXGX6h=9}Ndpf-C)#tz_6?$!eIu2NY;BBKL#Hs;3?=^Y|
zfF5Gtm-=tBS3m*)*g<{7AV4W5aBv|ji~`J*#4iv5GkN4Cs6Z3~kos1d4pH|yU+Y?8KFwN=mQ*$IDGJvBrd_4Dgb8`yGsT<*!
zTdQq}J}{>RSS{ZgaLl9S=)D^8x{ZMHK8R9g<*M`uQjotwM?ZXHk_%^Bj}U(5San>9
z@Vf$^KFxcb0AJEXQ()>B|W0Wdtd_P!OCq)buVlC?b`f~5D+7q0GX47a5~H1Py$s*SC4Z6Vu_ed
zoPiAO**j+fzNk;4cd_$5)a>C8;my(?624ZeB26H=i8dBfk%s@vaT#^x#5M>>%Cj9M
zt)UbOsLLxFmEUo51$@J!O#?VR_U{#U=_q`n(^#uG~p=%0gd30(833zf`<|3QOKeO4zQgcdG2TQUh
zP)kGkv-M={bKF%})k8|~Io;CU45q&p?=N_lKRJ?SZE1aN#k%X-gtiz+DVaQyJZ8J|
za(8<-@WKNFGZ-_lTgK!qi7kmYNfB{932UlCA=M1G9kn!FcAp
z=33@V4xJBQAMVc`yE$cwX0l{n^4s%YyP-94H>oy#Z<==--V+`--G}a?UG`mK?JkZ=
zU)o>fUlQ#dPaTg^@vw2Ca>tU`ec&}c8HhFAu}!gV7(EGEEynG)YKrT$${&@@XK0{5
z3b9=0@+1LBgl#UbC7@zt9v`>SD!
z0dhQwDFIgjA7@v$0e3s6=KX?I&W@7Kl8#+%OESw`m&|i6qC)xDe!OJ&WFmF@qVOVz
zY_I;rP12*LvD)#={PaKR3$R)HiNyh-F5j|y#rNrCy#^gZ#~&|Fu65SF46J7sr5F7@
zkKJZ9bT`yByoXaN|IVP#nqsr>neWtJf%khG@e2!?-wo{|3I7vD%=XF-X(?=hEDN>J
z`c3(z`n`O}x)-?Xe~h_lU4@>#K2<_7LTE?WM1X%K{rVfp7KsWu75V2&E|e&gKQCA@
zgbBqk&@gqWoZ}tah|b7vO*s}US<*%c`%p6p^~mnn2Iv(ir*NZ#hf$Yt{2U8!6?O;e
zP(xRw?Lr%6bf5+LEN@W<@Awij5;F4DFs+knn5Wt5
z-_|QzFf+$vZa}x}`^e%$`mQ%-S89r|#(rq*e0IYuX5PB9HeU4Ah#;Jgra0eEY5XtCJp8
z)^yY~&upLzXVGebV?Te2Gk??OOe{(0?RTiS&}%d$G)^?rJhn-m)YtN7B1-&R?o?h}
zaq~;6VQWoW>GPz|CDeXyZb`1b?{HsczhK{?{93oQQVwqOadIU11QSMf8nsFG*f1s{>{w$X}|H>=V-=JUX5mr{;GD|-<)w<~6eU24KK`5|v
zoN&-w%j~Ib3zzi0bQnAOwirWdLe|u}<+s=%?V;S2732NW`{kA7@xG0#quoMfYil){
zy&88Er3}B*W_p0*T>4>}N3D|{6V~64o9ztCg+AWvDm%GD9Sa?U%c=hLm#F(PQ06iI
zR=4tZ`Sv+Nx$xti$oxzSLkj7Z`e(o659_9G-CQx+Svi?+g-&lbzQwN>nH6PaDv1b9
zq%J_O6&_M;4O9)_mg2)%B1Eo73iY3_d(W4)V+>B3oh~5PCi8FVK%a4YHx|<9`Qa^$zrl)bayOhR_3eY`lgjz%ooRQ`i1M}Yw$IZ8>vhoi{hP6D
z4v=k9o64vu0)W>W00;;KfZJ!V+ywwPHUQW&0s#IL03dSw@wrP1ED>cTK5BT*9c21z
zBuv*{i=2B08yktL_`S^&9g)GX=c+oM?^J6nkxg?`_nx=lB6W29@voC*Q115}og-~j
z*EF3MJ}gbckwy05bgAskr1_)-&D2{frs9vK)ZbnOLfF_ui~??Jmqh}KeiY*3MT+w;
z*IK|*&jq&yxA~S`uK4aV;Mc_x1z5x=H)!ipV(2rP5XfpBDn;?vIwy`GjJb(bZd5of
zo>WRK+uxudG&EWTng4T%OHZdDLo6QbUZ7EvoBP$YJNnDTjw%x~GoSyHVDy!HQ&ZE<
z&H*y@nB6}!cH^ip@X6EJ}O#fjNDrXNhd|8+3^e&a?#Oe$-*~B8}Z7Wnv_q<;8
z_`;tl=OreFxNB`q_lCi=N{5)sh5&H?Ls36J`17CC(B#zE#6-bHSah_-PzK*XI!{bS
zMuy5}J|=FJJZihe@7~|T!p!vXvH*hFRJm@o#ULr4rYen)kWk?JvZ5ksB~6W(QI&N+)AXUV_
zrKYCl?X-=p?dE3k=cLX4Fa&v$O0W8S;?3TNBR*C{5wH(&yv
zsf_ja^?|*V1fZu~M9+<=jow#w2xg;sl1QBjets?BnAhdYx}zxs=tM;P18($dtl|LY
z>x0?PzG7ZpUUas71PEsLb@p;zA3uIA*RMYc?hSL>pOOXd&b-0@5oM+l7{&D_quz=&
z8&(!Xk}(7%BquXlvo@c6)6mphJejR2EtNC$Ttzwcf$YU3DHPBHP+)Tq(Oo=@!6=9hX=bl(PtM
z_0;Mo7Sd%dn8#9IOAWJc)Hf)6n&TOAO$=*_Rk6-2dU^2hXk%}*9=h917S^j?h*K-%E
zS;6|Ahm5j3KHN}*9rd=dC9K&$d&k4EnT9gPvJ-#p)N{kbHzFRNK?#-mnd+*>@(!
ziRhi7f}S4f?Ck8#qu&ArMT$Zpncs*}{rs24PklwTwTXFokhS&o+sjIS-@ym^cr*{`
zmwv6;2M2azV`Dlso*!Za@2qyRO8h-fs0|DZnxF3Wf%4C-SzQsYG5P%^B~dUjF~P__
z!c^@>IIYXdUS9k*Zk0m8?MBr6{AosPyel_#RECvAe-@Q&ZJ7rL2YJb{?6}Dm7Z-Um
zmFDK>t*xzdo0`)4`sDnscJf`l1$QuCOpp#*PC1_DN&eT+cXXn&yZy@Z#bdu@Y;4Mn
zp;Qj>_*lzqZ>&VQE(4E66j3lPKip3y7^6R#wfoQ=9~F6I|8mEbpXD|B=;$ah#)}CX
zE-!c%vv$QV%VA2T4Bn6K?%cp1kK@9M3apThT%56)nI5niD=Vv;3Clt{hS8}xF}nw=
z$Z6tUet6c@looK*wEtd%iEvo>njG9ZxzeG_+v`1Xw1Q_R9RL9#;pvDtHf9w84aYgX
z-}C*qkK1fyQ5UWr`8>pL-!U2w&eb+79x1~;7G}&W6tah=Tay&~X1)|C%1FXSM-ESM
zFkZa4Y!0FH3I*4~*~JBVBk~l;!{g9-e4abGD;5!Zz2r|7@LgGWmbpdvA$DP5fhI97
zJ-vV0q49ecI%WQ!{=mVh(4sQ*L_J9l5A1ioaP{x6CYUseaav<4hOk`uDbx!U+7I2&
z+pwRbM5#qBy!Z=I@fmx2bzoX1?zAe}A8<2$t=C7b~6Y&(<_)(|4tBfk_-pr
ztX}!oGjTIfyHU<{@~flPXFr=VB~wQS`jzW^wKCPR$>K@U2~sy!Ts;b}e1yRE&~~d%
zBX(o9w2|GH%N^_M>xsSa=f#kSSmWRPVNqc;LbQRc51Xvyf}XLe`BHI-iCmmt-+UY1
z?%KY@JGq}V)al87s!2b7-fv|;9}=*#u@RGyQIbr}D=`!)X{)NcB=Nay#l~etOO;8M9a7McMp*=C_8Y0_>V_>kSb~_b;687kQC5-*
zgZt`~D+PBYOzdBd9XYXq$r0%5?+;OsY9)y=bZjHHZZ`_g$!(W1Frbi?m34LV_NxJpdT{M%g6Xq{pf`+>8tS
zcnVU!=i*%1-Ju@C`!03gWeD$IZ(l!kx<9OEwm*ULue#9`G&PY13=AlMQ29Yx%Fp=C
zQH=isEtqfD|9(erzMAJ!NMZY^JH$O>S5Q!(VrZ!J_MTLs2acWyf8YL$JE
zlo%Ns`>V*a$XW1!6%q*c0G#m>LAUT{uJv?$M=-OEQX!WLx-B&L{A5*
z77N^&(3l4dz*3*gAvwOeS6iBYMoywv7i=3U1&8;Xb`+lMz5?V%bxBxjS=pf{XKcpY$z8)
z{&UIVUIXRyY_tDUiw{&(@9JIs$f~n(oz4r`N>_Kc%Uz{EifHBZv$*+1FwWTQ>@ANN
zB0{9LVrif+-!4bWk*kNwvP$KJhI|5wzlw~40?1{;!tm9@s`X?Q6@TXCi7}eCn~HQS
z`Jh;OlSNQ{7G)!rkhobHeyLl1@O%|jA>H6R=c-{tcvZ(OXs}qU!N$b@DtY{CyBR4b
zjZ{9aKXne+>UKUJUJ#g|AN*RY$4>8k#IvaH(Fk$%()6vYt#>CQyNkwlcg4o*PkH$L
zIJacbdwYA4P)Bz!C3H3dJC^nCnC-Q$h)5El;DLekkv&(C<`u+n
zW*Wxm!6?V@W{%Gp$+G-fsRG+^_=6;*B}d1{$Mtix1dI?VF*tq6f*K^Xoj3jCfW=MRcJJYTl8v4L=LCB`!-
z`@OH4j&byP+<&;CG8?|!N$%id?UoRdk)S?%S%NiT6a`g-N-f?zyb!G6mNU46(
zEH#YVbK>Q5zVNjeNI7k!a651p0C`0C{KAa!nK5}1EdI&d;jqyKJS1o#9UnH>Bc*6h
zPfs~#>?U|Pof;cS{MkagNEM~F6zz1L1(f_;gJ@rdGqUnsq?%I*c_m~DcvL$s3xf{u
z%NtTe0{;_)#Kc6wr<-+MluV7QeCTaVG%3}=#!yrB6YMz5TO(RRAtee13?6dNq;
z>s>FLjUCwDc9vMl?q-84oE_5p->Yq3Rb-pka4c1d+9a
zgT>?BCBfmca+^rQNEgVe`jUE|@*o#pkFhZ>-bneTvkSDi)i>E!~b^3gKg*1
zwZT9qFF%Yey>eF3*47qOhlMRI9w^uAFDX%l#6Vc?=)e|H`1lb43WfG^@^C@UD
zfiwnWz8i}Me{PhDLLwtET)ilUDku+*u)w8bN_>?)+zo1|l7soW9fn>r_Orck!cUD8
zp~y%n0~RLhYrAOdPO_zHx#MJ^AJRbn=rd06(K3WAp3y`liQzGxPxL=2b3`K@I%#7r
zX2Vpb4TG?<#!x-1yllSRN*A2<`1h3R6{@EQ!WQ7>=0?sW1ZrgbUpg#E7xCb+@88TT
zEqyi2F!Tu|qf;})-w;GVmR-DKm1kmNW$)p%i|-XD_fyP~EoZbdhlh0cXiuzc(u6&|
zvIOS1)SN0!2}KT=EYtcx!{w%uc_hz(yfV$&(gO;}lG0L>u>#rf-T@a0rrZw(eSfRFhJb%7-bo6ika?QCB
zod1DTju7T#JLCrhkO(u??zzDApZ|by!LtP2D=8v&$EdHWo!%!DJvwxm`0{75+Us)9
zb_*T(VQg{o4TVE;1Nq^al`Xz>x^G+2pE7v~32@gIw6yq%N(6%mWw&Y1%XpQ_?D^?Y
zO<5kScG_{TJMo@)3WEG4cYN2%
z*0$sL+U3d)v$3&}7$evaJ-)#c`D6Yl7L*|ulyqP)2rVthZ882_AJ91V;<4nIJegpRP%Q=~V#=-<4IV`Z1e|a)Q1E^(R%l&j
z<<<|>(9q~f<*?j1+mx>cBT5F<*4OtZGHZ9PI?1T3$KUw0u#&TygEhd45)yQ*tjYoV
zBsp7mUaRkQeu)BnUKhoX?d!w&>&AZc7lCW%w*{c-gZMQ7)SHqOpRo_->y^Kl3xIC{
z)kf{#;tq@a&Nq4yW%TvaLofbu@j*2SvQ)Zy@fi|s80_us5qk#X8DbvV5Z))t1F{MV
z8e>xx24xq`DiYS#CFKTxv~Dv~Qr-qYnxr^E?m9U+iP)3CsGd**)35IXO$V{)$ang6
zJyS6K#AWrnJR(hPZGV!O=d*9a8N7aH4cs2S
zv%c&9f^PPYj*>Dm$PvtznxSKhi?C1Lpn}r{@y#u9793hN2y~#P`-k9`#^aa;{D_a|
z$Wi(X6}vm_W3GH_them94?~z)L8N>2r=EX0v<;RU=N_%nI11bdEdk8
z6TF(w`j~l~N&lRFO(-iZP2q8*G&eT~33v6G$a6f^CpScGb#-x7Ra~W__|bgn^C(B3
zz=44Q+c|6x4-a(%14VGtfv$jrv~+xWy1bNBu(R^&pX0NEVgDgelBaOnlcsaqt0^Hl
z;>?NBS6_k-lum;aP4asK2t9r9c9wT^qsumv<@V!0YeGW8iHV67C*Gai-H&CXZ{r{!
zK%Trr1o>sT7BdM6i9#m7o2b)Q&?Wc{J-0|={jB8eEda_j(65TVKW@#&5!LTOSzoTT
zC6Z5KQ7xx(Y}|ZJ>2vb&2!3tYC0^3lnA+6DKa?pTEiaGWBK$0zX{hYuBe=V}tB}E~
zI;&A$UT$`Mv}iY5{aw9C2?4(JR~|gyE%!&XK_joLpQD}bF04VL2?VZopuYoDc7;lr
zeFXyw|DwsHU0pdRQqCf*m51j5E>W%T0
z^II!N$D(_D{~Z&~)X=c7#`hu8#$r#mI8TB1D&n9cOk)h1!}$NtC;s1;|KBneCemOZ
uP;xf*`C#?+yzOlpl|wYy(D%n@Y=8w#^;CvXwGed90a-~Ui3%~JZ~p@)5(Gm4
diff --git a/pom.xml b/pom.xml
index 458d0cf..a8d0223 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,9 +29,14 @@
org.springframework.boot
spring-boot-starter-tomcat
+
- org.springframework.boot
- spring-boot-starter-data-jpa
+ org.springframework
+ spring-aop
+
+
+ org.springframework
+ spring-aspects
mysql
diff --git a/sql/nettychat.sql b/sql/nettychat.sql
deleted file mode 100644
index 452c711..0000000
--- a/sql/nettychat.sql
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-Navicat MySQL Data Transfer
-
-Source Server : mypc
-Source Server Version : 50717
-Source Host : localhost:3306
-Source Database : nettychat
-
-Target Server Type : MYSQL
-Target Server Version : 50717
-File Encoding : 65001
-
-Date: 2018-08-23 10:32:48
-*/
-
-SET FOREIGN_KEY_CHECKS=0;
-
--- ----------------------------
--- Table structure for user
--- ----------------------------
-DROP TABLE IF EXISTS `user`;
-CREATE TABLE `user` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `user_name` varchar(255) DEFAULT NULL,
- `pass_word` varchar(255) DEFAULT NULL,
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
- `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- PRIMARY KEY (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;
-
--- ----------------------------
--- Records of user
--- ----------------------------
-INSERT INTO `user` VALUES ('2', 'Myself', '123456', '2018-08-14 19:47:49', '2018-08-14 19:47:49');
-INSERT INTO `user` VALUES ('3', 'Chen', '123456abc', '2018-08-20 16:31:49', '2018-08-20 16:31:49');
-
--- ----------------------------
--- Table structure for user_msg
--- ----------------------------
-DROP TABLE IF EXISTS `user_msg`;
-CREATE TABLE `user_msg` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(255) DEFAULT NULL,
- `msg` varchar(255) DEFAULT NULL,
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- PRIMARY KEY (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=43 DEFAULT CHARSET=utf8mb4;
-
--- ----------------------------
--- Records of user_msg
--- ----------------------------
-INSERT INTO `user_msg` VALUES ('8', 'Myself', '你好呀', '2018-08-20 17:26:58', '2018-08-20 17:26:58');
-INSERT INTO `user_msg` VALUES ('9', 'Myself', '你是谁?', '2018-08-20 17:27:13', '2018-08-20 17:27:13');
-INSERT INTO `user_msg` VALUES ('10', 'Myself', '在吗?', '2018-08-21 17:54:12', '2018-08-21 17:54:12');
-INSERT INTO `user_msg` VALUES ('11', 'Chen', '嗯呢', '2018-08-21 17:54:12', '2018-08-21 17:54:12');
-INSERT INTO `user_msg` VALUES ('13', 'Myself', 'yo', '2018-08-21 18:01:26', '2018-08-21 18:01:26');
-INSERT INTO `user_msg` VALUES ('14', 'Myself', '你好', '2018-08-22 16:24:22', '2018-08-22 16:24:22');
-INSERT INTO `user_msg` VALUES ('30', 'Myself', '你好呀!', '2018-08-22 17:03:42', '2018-08-22 17:03:42');
-INSERT INTO `user_msg` VALUES ('31', 'Myself', '我很好!', '2018-08-22 17:03:42', '2018-08-22 17:03:42');
-INSERT INTO `user_msg` VALUES ('32', 'Myself', '哈哈哈哈', '2018-08-22 17:11:56', '2018-08-22 17:11:56');
-INSERT INTO `user_msg` VALUES ('33', 'Myself', '哇哈哈哈哈', '2018-08-22 17:11:56', '2018-08-22 17:11:56');
-INSERT INTO `user_msg` VALUES ('34', 'Myself', 'asdf', '2018-08-22 17:22:19', '2018-08-22 17:22:19');
-INSERT INTO `user_msg` VALUES ('35', 'Myself', '厉害 厉害', '2018-08-22 17:23:20', '2018-08-22 17:23:20');
-INSERT INTO `user_msg` VALUES ('36', 'Myself', '哈哈', '2018-08-22 17:23:43', '2018-08-22 17:23:43');
-INSERT INTO `user_msg` VALUES ('37', 'Myself', '你好!', '2018-08-23 10:19:14', '2018-08-23 10:19:14');
-INSERT INTO `user_msg` VALUES ('38', 'Chen', '收到。', '2018-08-23 10:19:15', '2018-08-23 10:19:15');
-INSERT INTO `user_msg` VALUES ('39', 'Myself', '前端框架用Vue?', '2018-08-23 10:19:15', '2018-08-23 10:19:15');
-INSERT INTO `user_msg` VALUES ('40', 'Chen', '可以试试', '2018-08-23 10:19:15', '2018-08-23 10:19:15');
-INSERT INTO `user_msg` VALUES ('41', 'Myself', '下版再加一些新的功能', '2018-08-23 10:19:15', '2018-08-23 10:19:15');
-INSERT INTO `user_msg` VALUES ('42', 'Chen', 'okay', '2018-08-23 10:19:15', '2018-08-23 10:19:15');
diff --git a/src/main/java/com/myself/nettychat/DefaultAutoService.java b/src/main/java/com/myself/nettychat/DefaultAutoService.java
deleted file mode 100644
index e4b4cc7..0000000
--- a/src/main/java/com/myself/nettychat/DefaultAutoService.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.myself.nettychat;
-
-import com.myself.nettychat.bootstrap.BaseAuthService;
-import org.springframework.stereotype.Service;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 默认权限
- **/
-@Service
-public class DefaultAutoService implements BaseAuthService {
-
- @Override
- public boolean authorized(String username, String password) {
- return true;
- }
-}
diff --git a/src/main/java/com/myself/nettychat/NettychatApplication.java b/src/main/java/com/myself/nettychat/NettychatApplication.java
index 9c679bd..8eabd26 100644
--- a/src/main/java/com/myself/nettychat/NettychatApplication.java
+++ b/src/main/java/com/myself/nettychat/NettychatApplication.java
@@ -1,6 +1,6 @@
package com.myself.nettychat;
-import com.myself.nettychat.config.NettyConfig;
+
import com.myself.nettychat.config.NettyTcpConfig;
import com.myself.nettychat.config.TCPServer;
import org.springframework.boot.SpringApplication;
@@ -19,20 +19,8 @@ public class NettychatApplication {
public static void main(String[] args) throws Exception{
// SpringApplication.run(NettychatApplication.class, args);
ConfigurableApplicationContext context = SpringApplication.run(NettychatApplication.class, args);
- NettyConfig nettyConfig = context.getBean(NettyConfig.class);
NettyTcpConfig nettyTcpConfig = context.getBean(NettyTcpConfig.class);
TCPServer tcpServer = context.getBean(TCPServer.class);
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- System.out.println("Web端Netty通信服务端启动成功!端口:8090");
- tcpServer.startWeb();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }).start();
new Thread(new Runnable() {
@Override
public void run() {
diff --git a/src/main/java/com/myself/nettychat/auto/InitServer.java b/src/main/java/com/myself/nettychat/auto/InitServer.java
deleted file mode 100644
index 3729fc3..0000000
--- a/src/main/java/com/myself/nettychat/auto/InitServer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.myself.nettychat.auto;
-
-
-import com.myself.nettychat.bootstrap.BootstrapServer;
-import com.myself.nettychat.bootstrap.NettyBootstrapServer;
-import com.myself.nettychat.common.properties.InitNetty;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 初始化服务
- **/
-public class InitServer {
-
- private InitNetty serverBean;
-
- public InitServer(InitNetty serverBean) {
- this.serverBean = serverBean;
- }
-
- BootstrapServer bootstrapServer;
-
- public void open(){
- if(serverBean!=null){
- bootstrapServer = new NettyBootstrapServer();
- bootstrapServer.setServerBean(serverBean);
- bootstrapServer.start();
- }
- }
-
-
- public void close(){
- if(bootstrapServer!=null){
- bootstrapServer.shutdown();
- }
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/auto/ServerAutoConfigure.java b/src/main/java/com/myself/nettychat/auto/ServerAutoConfigure.java
deleted file mode 100644
index 961e458..0000000
--- a/src/main/java/com/myself/nettychat/auto/ServerAutoConfigure.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.myself.nettychat.auto;
-
-import com.myself.nettychat.bootstrap.scan.SacnScheduled;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import com.myself.nettychat.common.enums.ProtocolEnum;
-import com.myself.nettychat.common.properties.InitNetty;
-import org.apache.commons.lang3.ObjectUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 自动化配置初始化服务
- **/
-@Configuration
-@ConditionalOnClass
-@EnableConfigurationProperties({InitNetty.class})
-public class ServerAutoConfigure {
-
- private static final int _BLACKLOG = 1024;
-
- private static final int CPU =Runtime.getRuntime().availableProcessors();
-
- private static final int SEDU_DAY =10;
-
- private static final int TIMEOUT =120;
-
- private static final int BUF_SIZE=10*1024*1024;
-
-
- public ServerAutoConfigure(){
-
- }
-
- @Bean
- @ConditionalOnMissingBean(name = "sacnScheduled")
- public ScanRunnable initRunable(@Autowired InitNetty serverBean){
- long time =(serverBean==null || serverBean.getPeriod()<5)?10:serverBean.getPeriod();
- ScanRunnable sacnScheduled = new SacnScheduled(time);
- Thread scanRunnable = new Thread(sacnScheduled);
- scanRunnable.setDaemon(true);
- scanRunnable.start();
- return sacnScheduled;
- }
-
-
- @Bean(initMethod = "open", destroyMethod = "close")
- @ConditionalOnMissingBean
- public InitServer initServer(InitNetty serverBean){
- if(!ObjectUtils.allNotNull(serverBean.getMqttport(),serverBean.getServerName())){
- throw new NullPointerException("not set port");
- }
- if(serverBean.getBacklog()<1){
- serverBean.setBacklog(_BLACKLOG);
- }
- if(serverBean.getBossThread()<1){
- serverBean.setBossThread(CPU);
- }
- if(serverBean.getInitalDelay()<0){
- serverBean.setInitalDelay(SEDU_DAY);
- }
- if(serverBean.getPeriod()<1){
- serverBean.setPeriod(SEDU_DAY);
- }
- if(serverBean.getHeart()<1){
- serverBean.setHeart(TIMEOUT);
- }
- if(serverBean.getRevbuf()<1){
- serverBean.setRevbuf(BUF_SIZE);
- }
- if(serverBean.getWorkerThread()<1){
- serverBean.setWorkerThread(CPU*2);
- }
- if(serverBean.getProtocol()==null){
- serverBean.setProtocol(ProtocolEnum.MQTT);
- }
- return new InitServer(serverBean);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/AbstractBootstrapServer.java b/src/main/java/com/myself/nettychat/bootstrap/AbstractBootstrapServer.java
deleted file mode 100644
index a647848..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/AbstractBootstrapServer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package com.myself.nettychat.bootstrap;
-
-
-import com.myself.nettychat.bootstrap.coder.ByteBufToWebSocketFrameEncoder;
-import com.myself.nettychat.bootstrap.coder.WebSocketFrameToByteBufDecoder;
-import com.myself.nettychat.common.properties.InitNetty;
-import com.myself.nettychat.common.ssl.SecureSocketSslContextFactory;
-import com.myself.nettychat.common.utils.SpringBeanUtils;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
-import io.netty.handler.codec.mqtt.MqttDecoder;
-import io.netty.handler.codec.mqtt.MqttEncoder;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.timeout.IdleStateHandler;
-import org.apache.commons.lang3.ObjectUtils;
-import org.jboss.netty.util.internal.SystemPropertyUtil;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import java.security.KeyStore;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 抽象类,负责加载edec handler
- **/
-public abstract class AbstractBootstrapServer implements BootstrapServer {
-
- private String PROTOCOL = "TLS";
-
- private SSLContext SERVER_CONTEXT;
-
- private static final String MQTT_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
-
-
- /**
- *
- * @param channelPipeline channelPipeline
- * @param serverBean 服务配置参数
- */
- protected void initHandler(ChannelPipeline channelPipeline, InitNetty serverBean){
- if(serverBean.isSsl()){
- if(!ObjectUtils.allNotNull(serverBean.getJksCertificatePassword(),serverBean.getJksFile(),serverBean.getJksStorePassword())){
- throw new NullPointerException("SSL file and password is null");
- }
- initSsl(serverBean);
- SSLEngine engine =
- SERVER_CONTEXT.createSSLEngine();
- engine.setUseClientMode(false);
- channelPipeline.addLast("ssl", new SslHandler(engine));
- }
-
- intProtocolHandler(channelPipeline,serverBean);
- channelPipeline.addLast(new IdleStateHandler(serverBean.getHeart(),0,0));
- channelPipeline.addLast( SpringBeanUtils.getBean(serverBean.getMqttHander()));
-
- }
-
- private void intProtocolHandler(ChannelPipeline channelPipeline,InitNetty serverBean){
- switch (serverBean.getProtocol()){
- case MQTT:
- channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
- channelPipeline.addLast("decoder", new MqttDecoder());
- break;
- case MQTT_WS_MQTT:
- channelPipeline.addLast("httpCode", new HttpServerCodec());
- channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
- channelPipeline.addLast("webSocketHandler",
- new WebSocketServerProtocolHandler("/", MQTT_CSV_LIST));
- channelPipeline.addLast("wsDecoder", new WebSocketFrameToByteBufDecoder());
- channelPipeline.addLast("wsEncoder", new ByteBufToWebSocketFrameEncoder());
- channelPipeline.addLast("decoder", new MqttDecoder());
- channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
- break;
- case MQTT_WS_PAHO:
- channelPipeline.addLast("httpCode", new HttpServerCodec());
- channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
- channelPipeline.addLast("webSocketHandler",
- new WebSocketServerProtocolHandler("/mqtt", MQTT_CSV_LIST));
- channelPipeline.addLast("wsDecoder", new WebSocketFrameToByteBufDecoder());
- channelPipeline.addLast("wsEncoder", new ByteBufToWebSocketFrameEncoder());
- channelPipeline.addLast("decoder", new MqttDecoder());
- channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
- break;
- }
- }
-
- private void initSsl(InitNetty serverBean){
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.submit(() -> {});
- String algorithm = SystemPropertyUtil.get("ssl.KeyManagerFactory.algorithm");
- if (algorithm == null) {
- algorithm = "SunX509";
- }
- SSLContext serverContext;
- try {
- //
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load( SecureSocketSslContextFactory.class.getResourceAsStream(serverBean.getJksFile()),
- serverBean.getJksStorePassword().toCharArray());
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
- kmf.init(ks,serverBean.getJksCertificatePassword().toCharArray());
- serverContext = SSLContext.getInstance(PROTOCOL);
- serverContext.init(kmf.getKeyManagers(), null, null);
- } catch (Exception e) {
- throw new Error(
- "Failed to initialize the server-side SSLContext", e);
- }
- SERVER_CONTEXT = serverContext;
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/BaseApi.java b/src/main/java/com/myself/nettychat/bootstrap/BaseApi.java
deleted file mode 100644
index edd8720..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/BaseApi.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.myself.nettychat.bootstrap;
-
-import javax.validation.constraints.NotNull;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 逻辑操作封装
- **/
-public interface BaseApi {
-
- default void doIfElse(T t, Predicate predicate, Consumer consumer){
- if(t!=null){
- if(predicate.test(t)){
- consumer.accept(t);
- }
- }
- }
-
-
- default void doIfElse(T t, Predicate predicate, Consumer consumer, Consumer consumer2){
- if(t!=null){
- if(predicate.test(t)){
- consumer.accept(t);
- }
- else{
- consumer2.accept(t);
- }
- }
- }
- default boolean doIf(T t, Predicate... predicates){
- if(t!=null){
- for(Predicate p:predicates){
- if(!p.test(t)){
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- default void doIfAnd(T t, Consumer consumer2, Predicate... predicates){
- boolean flag =true;
- if(t!=null){
- for(Predicate p:predicates){
- if(!p.test(t)){
- flag= false;
- break;
- }
- }
- }
- if(flag){
- consumer2.accept(t);
- }
- }
-
- default void doIfAnd1(@NotNull T t, @NotNull Consumer consumer2, @NotNull Predicate... predicates){
- Predicate one = predicates[0];
- int l;
- if((l=predicates.length)>1){
- for(int i=1;i topics);
-
- void loginSuccess(Channel channel, String deviceId, MqttConnectMessage mqttConnectMessage);
-
- void publishSuccess(Channel channel, MqttPublishMessage mqttPublishMessage);
-
- void closeSuccess(String deviceId,boolean isDisconnect);
-
- void sendWillMsg(WillMeaasge willMeaasge);
-
- String getDeviceId(Channel channel);
-
- void unsubscribe(String deviceId, List topics1);
-
- void doPubrel(Channel channel, int mqttMessage);
-
- void doPubrec(Channel channel, int mqttMessage);
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/NettyBootstrapServer.java b/src/main/java/com/myself/nettychat/bootstrap/NettyBootstrapServer.java
deleted file mode 100644
index 816eea7..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/NettyBootstrapServer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package com.myself.nettychat.bootstrap;
-
-import com.myself.nettychat.common.ip.IpUtils;
-import com.myself.nettychat.common.properties.InitNetty;
-import com.myself.nettychat.common.utils.RemotingUtil;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc mtqq netty启动服务类
- **/
-@Slf4j
-@Data
-public class NettyBootstrapServer extends AbstractBootstrapServer {
-
- private InitNetty serverBean;
-
- public InitNetty getServerBean() {
- return serverBean;
- }
-
- public void setServerBean(InitNetty serverBean) {
- this.serverBean = serverBean;
- }
-
- private EventLoopGroup bossGroup;
-
- private EventLoopGroup workGroup;
-
- ServerBootstrap bootstrap=null ;// 启动辅助类
-
- /**
- * 服务开启
- */
- public void start() {
- initEventPool();
- bootstrap.group(bossGroup, workGroup)
- .channel(useEpoll()?EpollServerSocketChannel.class:NioServerSocketChannel.class)
- .option(ChannelOption.SO_REUSEADDR, serverBean.isReuseaddr())
- .option(ChannelOption.SO_BACKLOG, serverBean.getBacklog())
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.SO_RCVBUF, serverBean.getRevbuf())
- .childHandler(new ChannelInitializer() {
- protected void initChannel(SocketChannel ch) throws Exception {
- initHandler(ch.pipeline(),serverBean);
- }
- })
- .childOption(ChannelOption.TCP_NODELAY, serverBean.isNodelay())
- .childOption(ChannelOption.SO_KEEPALIVE, serverBean.isKeepalive())
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.bind(IpUtils.getHost(),serverBean.getMqttport()).addListener((ChannelFutureListener) channelFuture -> {
- if (channelFuture.isSuccess())
- log.info("服务端启动成功【" + IpUtils.getHost() + ":" + serverBean.getMqttport() + "】");
- else
- log.info("服务端启动失败【" + IpUtils.getHost() + ":" + serverBean.getMqttport() + "】");
- });
- }
- /**
- * 初始化EnentPool 参数
- */
- private void initEventPool(){
- bootstrap= new ServerBootstrap();
- if(useEpoll()){
- bossGroup = new EpollEventLoopGroup(serverBean.getBossThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "LINUX_BOSS_" + index.incrementAndGet());
- }
- });
- workGroup = new EpollEventLoopGroup(serverBean.getWorkerThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "LINUX_WORK_" + index.incrementAndGet());
- }
- });
-
- }
- else {
- bossGroup = new NioEventLoopGroup(serverBean.getBossThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "BOSS_" + index.incrementAndGet());
- }
- });
- workGroup = new NioEventLoopGroup(serverBean.getWorkerThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "WORK_" + index.incrementAndGet());
- }
- });
- }
- }
-
- /**
- * 关闭资源
- */
- public void shutdown() {
- if(workGroup!=null && bossGroup!=null ){
- try {
- bossGroup.shutdownGracefully().sync();// 优雅关闭
- workGroup.shutdownGracefully().sync();
- } catch (InterruptedException e) {
- log.info("服务端关闭资源失败【" + IpUtils.getHost() + ":" + serverBean.getMqttport() + "】");
- }
- }
- }
-
- private boolean useEpoll() {
- return RemotingUtil.isLinuxPlatform()
- && Epoll.isAvailable();
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/MqttChannel.java b/src/main/java/com/myself/nettychat/bootstrap/bean/MqttChannel.java
deleted file mode 100644
index 9c830ce..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/MqttChannel.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-import io.netty.channel.Channel;
-import io.netty.util.AttributeKey;
-import lombok.Builder;
-import lombok.Data;
-
-import com.myself.nettychat.common.enums.SubStatus;
-import com.myself.nettychat.common.enums.SessionStatus;
-
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc channel 封装类
- **/
-@Builder
-@Data
-public class MqttChannel {
-
- private transient volatile Channel channel;
-
-
- private String deviceId;
-
-
- private boolean isWill;
-
-
- private volatile SubStatus subStatus; // 是否订阅过主题
-
-
- private Set topic ;
-
-
-
- private volatile SessionStatus sessionStatus; // 在线 - 离线
-
-
-
- private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除 此channel
-
-
-
-
- private ConcurrentHashMap message ; // messageId - message(qos1) // 待确认消息
-
-
- private Set receive;
-
- public void addRecevice(int messageId){
- receive.add(messageId);
- }
-
- public boolean checkRecevice(int messageId){
- return receive.contains(messageId);
- }
-
- public boolean removeRecevice(int messageId){
- return receive.remove(messageId);
- }
-
-
- public void addSendMqttMessage(int messageId,SendMqttMessage msg){
- message.put(messageId,msg);
- }
-
-
- public SendMqttMessage getSendMqttMessage(int messageId){
- return message.get(messageId);
- }
-
-
- public void removeSendMqttMessage(int messageId){
- message.remove(messageId);
- }
-
-
- /**
- * 判断当前channel 是否登录过
- * @return
- */
- public boolean isLogin(){
- return Optional.ofNullable(this.channel).map(channel1 -> {
- AttributeKey _login = AttributeKey.valueOf("login");
- return channel1.isActive() && channel1.hasAttr(_login);
- }).orElse(false);
- }
-
- /**
- * 非正常关闭
- */
- public void close(){
- Optional.ofNullable(this.channel).ifPresent(channel1 -> channel1.close());
- }
-
- /**
- * 通道是否活跃
- * @return
- */
- public boolean isActive(){
- return channel!=null&&this.channel.isActive();
- }
-
-
-
- public boolean addTopic(Set topics){
- return topic.addAll(topics);
- }
-
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/RetainMessage.java b/src/main/java/com/myself/nettychat/bootstrap/bean/RetainMessage.java
deleted file mode 100644
index 814696a..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/RetainMessage.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-import io.netty.handler.codec.mqtt.MqttQoS;
-import lombok.Builder;
-import lombok.Data;
-
-@Builder
-@Data
-public class RetainMessage {
-
- private byte[] byteBuf;
-
- private MqttQoS qoS;
- public String getString(){
- return new String(byteBuf);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/SendMqttMessage.java b/src/main/java/com/myself/nettychat/bootstrap/bean/SendMqttMessage.java
deleted file mode 100644
index ac5faef..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/SendMqttMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import lombok.Builder;
-import lombok.Data;
-
-import com.myself.nettychat.common.enums.ConfirmStatus;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc mqtts 消息
- **/
-@Builder
-@Data
-public class SendMqttMessage {
-
-
- private int messageId;
-
- private Channel channel;
-
- private volatile ConfirmStatus confirmStatus;
-
- private long time;
-
- private byte[] byteBuf;
-
- private boolean isRetain;
-
- private MqttQoS qos;
-
- private String topic;
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/SessionMessage.java b/src/main/java/com/myself/nettychat/bootstrap/bean/SessionMessage.java
deleted file mode 100644
index 2613cd6..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/SessionMessage.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-
-import io.netty.handler.codec.mqtt.MqttQoS;
-import lombok.Builder;
-import lombok.Data;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc Session会话数据保存
- **/
-@Builder
-@Data
-public class SessionMessage {
-
- private byte[] byteBuf;
-
- private MqttQoS qoS;
-
- private String topic;
-
-
- public String getString(){
- return new String(byteBuf);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/WillMeaasge.java b/src/main/java/com/myself/nettychat/bootstrap/bean/WillMeaasge.java
deleted file mode 100644
index 4022d28..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/WillMeaasge.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-import lombok.Builder;
-import lombok.Data;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 遗嘱消息
- **/
-@Builder
-@Data
-public class WillMeaasge {
-
- private String willTopic;
-
- private String willMessage;
-
- private boolean isRetain;
-
- private int qos;
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/AbstractChannelService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/AbstractChannelService.java
deleted file mode 100644
index c0e51f2..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/AbstractChannelService.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.BaseApi;
-import com.myself.nettychat.bootstrap.ChannelService;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.myself.nettychat.bootstrap.bean.MqttChannel;
-import com.myself.nettychat.bootstrap.bean.RetainMessage;
-import com.myself.nettychat.bootstrap.channel.cache.CacheMap;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import io.netty.channel.Channel;
-import io.netty.util.AttributeKey;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Collection;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 抽象类
- **/
-@Slf4j
-public abstract class AbstractChannelService extends PublishApiSevice implements ChannelService , BaseApi {
-
- protected AttributeKey _login = AttributeKey.valueOf("login");
-
- protected AttributeKey _deviceId = AttributeKey.valueOf("deviceId");
-
- protected static char SPLITOR = '/';
-
- protected ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
-
-
- protected static CacheMap cacheMap= new CacheMap<>();
-
-
- protected static ConcurrentHashMap mqttChannels = new ConcurrentHashMap<>(); // deviceId - mqChannel 登录
-
-
- protected static ConcurrentHashMap> retain = new ConcurrentHashMap<>(); // topic - 保留消息
-
-
-
- protected static Cache> mqttChannelCache = CacheBuilder.newBuilder().maximumSize(100).build();
-
- public AbstractChannelService(ScanRunnable scanRunnable) {
- super(scanRunnable);
- }
-
-
- protected Collection getChannels(String topic,TopicFilter topicFilter){
- try {
- return mqttChannelCache.get(topic, () -> topicFilter.filter(topic));
- } catch (Exception e) {
- log.info(String.format("guava cache key topic【%s】 channel value== null ",topic));
- }
- return null;
- }
-
-
- @FunctionalInterface
- interface TopicFilter{
- Collection filter(String topic);
- }
-
- protected boolean deleteChannel(String topic,MqttChannel mqttChannel){
- return Optional.ofNullable(topic).map(s -> {
- mqttChannelCache.invalidate(s);
- return cacheMap.delete(getTopic(s),mqttChannel);
- }).orElse(false);
- }
-
- protected boolean addChannel(String topic,MqttChannel mqttChannel)
- {
- return Optional.ofNullable(topic).map(s -> {
- mqttChannelCache.invalidate(s);
- return cacheMap.putData(getTopic(s),mqttChannel);
- }).orElse(false);
- }
-
- /**
- * 获取channel
- */
- public MqttChannel getMqttChannel(String deviceId){
- return Optional.ofNullable(deviceId).map(s -> mqttChannels.get(s))
- .orElse(null);
-
- }
-
- /**
- * 获取channelId
- */
- public String getDeviceId(Channel channel){
- return Optional.ofNullable(channel).map( channel1->channel1.attr(_deviceId).get())
- .orElse(null);
- }
-
-
-
- protected String[] getTopic(String topic) {
- return Optional.ofNullable(topic).map(s ->
- StringUtils.split(topic,SPLITOR)
- ).orElse(null);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/ClientSessionService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/ClientSessionService.java
deleted file mode 100644
index 9a23c8b..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/ClientSessionService.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.bean.SessionMessage;
-import org.springframework.stereotype.Service;
-
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 会话保留处理
- **/
-@Service
-public class ClientSessionService {
-
- private static ConcurrentHashMap> queueSession = new ConcurrentHashMap<>(); // 连接关闭后 保留此session 数据 deviceId
-
-
- public void saveSessionMsg(String deviceId, SessionMessage sessionMessage) {
- ConcurrentLinkedQueue sessionMessages = queueSession.getOrDefault(deviceId, new ConcurrentLinkedQueue<>());
- boolean flag;
- do{
- flag = sessionMessages.add(sessionMessage);
- }
- while (!flag);
- queueSession.put(deviceId,sessionMessages);
- }
-
- public ConcurrentLinkedQueue getByteBuf(String deviceId){
- return Optional.ofNullable(deviceId).map(s -> queueSession.get(s))
- .orElse(null);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttChannelService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/MqttChannelService.java
deleted file mode 100644
index fa5ec21..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttChannelService.java
+++ /dev/null
@@ -1,440 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.bean.*;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import com.myself.nettychat.common.enums.ConfirmStatus;
-import com.myself.nettychat.common.enums.SessionStatus;
-import com.myself.nettychat.common.enums.SubStatus;
-import com.myself.nettychat.common.exception.ConnectionException;
-import com.myself.nettychat.common.utils.ByteBufUtil;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.*;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.util.CollectionUtils;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc Channel事件处理service
- **/
-@Slf4j
-@Component
-public class MqttChannelService extends AbstractChannelService {
-
- @Autowired
- private ClientSessionService clientSessionService;
-
- @Autowired
- private WillService willService;
-
- private final ScanRunnable scanRunnable;
-
- public MqttChannelService(ScanRunnable scanRunnable) {
- super(scanRunnable);
- this.scanRunnable = scanRunnable;
- }
-
-
- /**
- * 取消订阅
- */
- @Override
- public void unsubscribe(String deviceId, List topics1) {
- Optional.ofNullable(mqttChannels.get(deviceId)).ifPresent(mqttChannel -> {
- topics1.forEach(topic -> {
- deleteChannel(topic,mqttChannel);
- });
- });
- }
-
- /**
- * 登录成功后 回复
- */
- private void replyLogin(Channel channel, MqttConnectMessage mqttConnectMessage) {
- MqttFixedHeader mqttFixedHeader1 = mqttConnectMessage.fixedHeader();
- MqttConnectVariableHeader mqttConnectVariableHeader = mqttConnectMessage.variableHeader();
- final MqttConnectPayload payload = mqttConnectMessage.payload();
- String deviceId = getDeviceId(channel);
- MqttChannel build = MqttChannel.builder().channel(channel).cleanSession(mqttConnectVariableHeader.isCleanSession())
- .deviceId(payload.clientIdentifier())
- .sessionStatus(SessionStatus.OPEN)
- .isWill(mqttConnectVariableHeader.isWillFlag())
- .subStatus(SubStatus.NO)
- .topic(new CopyOnWriteArraySet<>())
- .message(new ConcurrentHashMap<>())
- .receive(new CopyOnWriteArraySet<>())
- .build();
- if (connectSuccess(deviceId, build)) { // 初始化存储mqttchannel
- if (mqttConnectVariableHeader.isWillFlag()) { // 遗嘱消息标志
- boolean b = doIf(mqttConnectVariableHeader, mqttConnectVariableHeader1 -> (payload.willMessage() != null)
- , mqttConnectVariableHeader1 -> (payload.willTopic() != null));
- if (!b) {
- throw new ConnectionException("will message and will topic is not null");
- }
- // 处理遗嘱消息
- final WillMeaasge buildWill = WillMeaasge.builder().
- qos(mqttConnectVariableHeader.willQos())
- .willMessage(deviceId)
- .willTopic(payload.willTopic())
- .isRetain(mqttConnectVariableHeader.isWillRetain())
- .build();
- willService.save(payload.clientIdentifier(), buildWill);
- } else {
- willService.del(payload.clientIdentifier());
- boolean b = doIf(mqttConnectVariableHeader, mqttConnectVariableHeader1 -> (!mqttConnectVariableHeader1.isWillRetain()),
- mqttConnectVariableHeader1 -> (mqttConnectVariableHeader1.willQos() == 0));
- if (!b) {
- throw new ConnectionException("will retain should be null and will QOS equal 0");
- }
- }
- doIfElse(mqttConnectVariableHeader, mqttConnectVariableHeader1 -> (mqttConnectVariableHeader1.isCleanSession()), mqttConnectVariableHeader1 -> {
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false);
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
- MqttMessageType.CONNACK, mqttFixedHeader1.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeader1.isRetain(), 0x02);
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
- channel.writeAndFlush(connAck);// 清理会话
- }, mqttConnectVariableHeader1 -> {
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, true);
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
- MqttMessageType.CONNACK, mqttFixedHeader1.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeader1.isRetain(), 0x02);
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
- channel.writeAndFlush(connAck);// 非清理会话
-
- }); //发送 session 数据
- ConcurrentLinkedQueue sessionMessages = clientSessionService.getByteBuf(payload.clientIdentifier());
- doIfElse(sessionMessages, messages -> messages != null && !messages.isEmpty(), byteBufs -> {
- SessionMessage sessionMessage;
- while ((sessionMessage = byteBufs.poll()) != null) {
- switch (sessionMessage.getQoS()) {
- case EXACTLY_ONCE:
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,getMqttChannel(deviceId), sessionMessage.getTopic(), sessionMessage.getByteBuf());
- break;
- case AT_MOST_ONCE:
- sendQos0Msg(channel, sessionMessage.getTopic(), sessionMessage.getByteBuf());
- break;
- case AT_LEAST_ONCE:
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,getMqttChannel(deviceId), sessionMessage.getTopic(), sessionMessage.getByteBuf());
- break;
- }
- }
-
- });
- }
- }
-
-
-
- /**
- * qos2 第二步
- */
- @Override
- public void doPubrel(Channel channel, int messageId) {
- MqttChannel mqttChannel = getMqttChannel(getDeviceId(channel));
- doIfElse(mqttChannel,mqttChannel1 ->mqttChannel1.isLogin(),mqttChannel1 -> {
- mqttChannel1.removeRecevice(messageId);
- sendToPubComp(channel,messageId);
- });
- }
-
-
-
- /**
- * qos2 第三步
- */
- @Override
- public void doPubrec(Channel channel, int mqttMessage) {
- sendPubRel(channel,false,mqttMessage);
- }
-
- /**
- * 连接成功后
- * @param deviceId
- * @param build
- */
- @Override
- public boolean connectSuccess(String deviceId, MqttChannel build) {
- return Optional.ofNullable(mqttChannels.get(deviceId))
- .map(mqttChannel -> {
- switch (mqttChannel.getSessionStatus()){
- case OPEN:
- return false;
- case CLOSE:
- switch (mqttChannel.getSubStatus()){
- case YES: // 清除订阅 topic
- deleteSubTopic(mqttChannel).stream()
- .forEach(s -> cacheMap.putData(getTopic(s),build));
- break;
- }
- }
- mqttChannels.put(deviceId,build);
- return true;
- }).orElseGet(() -> {
- mqttChannels.put(deviceId,build);
- return true;
- });
- }
-
-
- /**
- * 订阅成功后 (发送保留消息)
- */
- public void suscribeSuccess(String deviceId, Set topics){
- doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {
- MqttChannel mqttChannel = mqttChannels.get(deviceId);
- mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识
- mqttChannel.addTopic(strings);
- executorService.execute(() -> {
- Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
- if(mqttChannel1.isLogin()){
- strings.parallelStream().forEach(topic -> {
- addChannel(topic,mqttChannel);
- sendRetain(topic,mqttChannel); // 发送保留消息
- });
- }
- });
- });
- });
- }
-
-
- /**
- *成功登陆 (发送会话消息)
- * @param channel
- * @param deviceId
- * @param mqttConnectMessage
- */
- @Override
- public void loginSuccess(Channel channel, String deviceId, MqttConnectMessage mqttConnectMessage) {
- channel.attr(_login).set(true);
- channel.attr(_deviceId).set(deviceId);
- replyLogin(channel, mqttConnectMessage);
- }
-
-
- /**
- * 发布消息成功 ()
- * @param channel
- * @param mqttPublishMessage
- */
- @Override
- public void publishSuccess(Channel channel, MqttPublishMessage mqttPublishMessage) {
- MqttFixedHeader mqttFixedHeader = mqttPublishMessage.fixedHeader();
- MqttPublishVariableHeader mqttPublishVariableHeader = mqttPublishMessage.variableHeader();
- MqttChannel mqttChannel = getMqttChannel(getDeviceId(channel));
- ByteBuf payload = mqttPublishMessage.payload();
- byte[] bytes = ByteBufUtil.copyByteBuf(payload); //
- int messageId = mqttPublishVariableHeader.messageId();
- executorService.execute(() -> {
- if (channel.hasAttr(_login) && mqttChannel != null) {
- boolean isRetain;
- switch (mqttFixedHeader.qosLevel()) {
- case AT_MOST_ONCE: // 至多一次
- break;
- case AT_LEAST_ONCE:
- sendPubBack(channel, messageId);
- break;
- case EXACTLY_ONCE:
- sendPubRec(mqttChannel, messageId);
- break;
- }
- if ((isRetain=mqttFixedHeader.isRetain()) && mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE) { //是保留消息 qos >0
- saveRetain(mqttPublishVariableHeader.topicName(),
- RetainMessage.builder()
- .byteBuf(bytes)
- .qoS(mqttFixedHeader.qosLevel())
- .build(), false);
- } else if (mqttFixedHeader.isRetain() && mqttFixedHeader.qosLevel() == MqttQoS.AT_MOST_ONCE) { // 是保留消息 qos=0 清除之前保留消息 保留现在
- saveRetain(mqttPublishVariableHeader.topicName(),
- RetainMessage.builder()
- .byteBuf(bytes)
- .qoS(mqttFixedHeader.qosLevel())
- .build(), true);
- }
- if (!mqttChannel.checkRecevice(messageId)) {
- push(mqttPublishVariableHeader.topicName(), mqttFixedHeader.qosLevel(), bytes,isRetain);
- mqttChannel.addRecevice(messageId);
- }
- }
- });
-
- }
- /**
- * 推送消息给订阅者
- */
- private void push(String topic, MqttQoS qos, byte[] bytes, boolean isRetain){
- Collection subChannels = getChannels(topic, topic1 -> cacheMap.getData(getTopic(topic1)));
- if(!CollectionUtils.isEmpty(subChannels)){
- subChannels.parallelStream().forEach(subChannel -> {
- switch (subChannel.getSessionStatus()){
- case OPEN: // 在线
- if(subChannel.isActive()){ // 防止channel失效 但是离线状态没更改
- switch (qos){
- case AT_LEAST_ONCE:
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,subChannel,topic,bytes);
- break;
- case AT_MOST_ONCE:
- sendQos0Msg(subChannel.getChannel(),topic,bytes);
- break;
- case EXACTLY_ONCE:
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,subChannel,topic,bytes);
- break;
- }
- }
- else{
- if(!subChannel.isCleanSession() & !isRetain){
- clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
- SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
- break;
- }
- }
- break;
- case CLOSE: // 连接 设置了 clean session =false
- clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
- SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
- break;
- }
- });
- }
- }
-
- /**
- * 关闭channel 操作
- * @param deviceId
- */
- @Override
- public void closeSuccess(String deviceId,boolean isDisconnect) {
- if(StringUtils.isNotBlank(deviceId)){
- executorService.execute(() -> {
- MqttChannel mqttChannel = mqttChannels.get(deviceId);
- Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
- mqttChannel1.setSessionStatus(SessionStatus.CLOSE); // 设置关闭
- mqttChannel1.close(); // 关闭channel
- mqttChannel1.setChannel(null);
- if(!mqttChannel1.isCleanSession()){ // 保持会话
- // 处理 qos1 未确认数据
- ConcurrentHashMap message = mqttChannel1.getMessage();
- Optional.ofNullable(message).ifPresent(integerConfirmMessageConcurrentHashMap -> {
- integerConfirmMessageConcurrentHashMap.forEach((integer, confirmMessage) -> doIfElse(confirmMessage, sendMqttMessage ->sendMqttMessage.getConfirmStatus()== ConfirmStatus.PUB, sendMqttMessage ->{
- clientSessionService.saveSessionMsg(mqttChannel.getDeviceId(), SessionMessage.builder()
- .byteBuf(sendMqttMessage.getByteBuf())
- .qoS(sendMqttMessage.getQos())
- .topic(sendMqttMessage.getTopic())
- .build()); // 把待确认数据转入session中
- }
- ));
-
- });
- }
- else{ // 删除sub topic-消息
- mqttChannels.remove(deviceId); // 移除channelId 不保持会话 直接删除 保持会话 旧的在重新connect时替换
- switch (mqttChannel1.getSubStatus()){
- case YES:
- deleteSubTopic(mqttChannel1);
- break;
- }
- }
- if(mqttChannel1.isWill()){ // 发送遗言
- if(!isDisconnect){ // 不是disconnection操作
- willService.doSend(deviceId);
- }
- }
- });
- });
- }
- }
-
- /**
- * 清除channel 订阅主题
- * @param mqttChannel
- */
- public Set deleteSubTopic(MqttChannel mqttChannel){
- Set topics = mqttChannel.getTopic();
- topics.parallelStream().forEach(topic -> cacheMap.delete(getTopic(topic),mqttChannel));
- return topics;
- }
-
- /**
- * 发送 遗嘱消息(有的channel 已经关闭 但是保持了 session 此时加入session 数据中 )
- * @param willMeaasge 遗嘱消息
- */
- public void sendWillMsg(WillMeaasge willMeaasge){
- Collection mqttChannels = getChannels(willMeaasge.getWillTopic(), topic -> cacheMap.getData(getTopic(topic)));
- if(!CollectionUtils.isEmpty(mqttChannels)){
- mqttChannels.forEach(mqttChannel -> {
- switch (mqttChannel.getSessionStatus()){
- case CLOSE:
- clientSessionService.saveSessionMsg(mqttChannel.getDeviceId(),
- SessionMessage.builder()
- .topic(willMeaasge.getWillTopic())
- .qoS(MqttQoS.valueOf(willMeaasge.getQos()))
- .byteBuf(willMeaasge.getWillMessage().getBytes()).build());
- break;
- case OPEN:
- writeWillMsg(mqttChannel,willMeaasge);
- break;
- }
- });
- }
- }
-
- /**
- * 保存保留消息
- * @param topic 主题
- * @param retainMessage 信息
- */
- private void saveRetain(String topic, RetainMessage retainMessage, boolean isClean){
- ConcurrentLinkedQueue retainMessages = retain.getOrDefault(topic, new ConcurrentLinkedQueue<>());
- if(!retainMessages.isEmpty() && isClean){
- retainMessages.clear();
- }
- boolean flag;
- do{
- flag = retainMessages.add(retainMessage);
- }
- while (!flag);
- retain.put(topic, retainMessages);
- }
-
- /**
- * 发送保留消息
- */
- public void sendRetain(String topic,MqttChannel mqttChannel){
- retain.forEach((_topic, retainMessages) -> {
- if(StringUtils.startsWith(_topic,topic)){
- Optional.ofNullable(retainMessages).ifPresent(pubMessages1 -> {
- retainMessages.parallelStream().forEach(retainMessage -> {
- log.info("【发送保留消息】"+mqttChannel.getChannel().remoteAddress()+":"+retainMessage.getString()+"【成功】");
- switch (retainMessage.getQoS()){
- case AT_MOST_ONCE:
- sendQos0Msg(mqttChannel.getChannel(),_topic,retainMessage.getByteBuf());
- break;
- case AT_LEAST_ONCE:
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,mqttChannel,_topic,retainMessage.getByteBuf());
- break;
- case EXACTLY_ONCE:
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,mqttChannel,_topic,retainMessage.getByteBuf());
- break;
- }
- });
- });
- }
- });
-
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttHandlerService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/MqttHandlerService.java
deleted file mode 100644
index abc4d56..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttHandlerService.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.BaseApi;
-import com.myself.nettychat.bootstrap.BaseAuthService;
-import com.myself.nettychat.bootstrap.ChannelService;
-import com.myself.nettychat.bootstrap.bean.SendMqttMessage;
-import com.myself.nettychat.common.enums.ConfirmStatus;
-import com.myself.nettychat.common.mqtts.ServerMqttHandlerService;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.*;
-import io.netty.handler.timeout.IdleStateEvent;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Component
-public class MqttHandlerService extends ServerMqttHandlerService implements BaseApi {
-
- @Autowired
- ChannelService mqttChannelService;
-
- private final BaseAuthService baseAuthService;
-
- public MqttHandlerService(BaseAuthService baseAuthService) {
- this.baseAuthService = baseAuthService;
- }
-
- /**
- * 登录
- *
- */
- @Override
- public boolean login(Channel channel, MqttConnectMessage mqttConnectMessage) {
-// 校验规则 自定义校验规则
- MqttConnectPayload payload = mqttConnectMessage.payload();
- String deviceId = payload.clientIdentifier();
- if (StringUtils.isBlank(deviceId)) {
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
- connectBack(channel,connectReturnCode);
- return false;
- }
-
- if(mqttConnectMessage.variableHeader().hasPassword() && mqttConnectMessage.variableHeader().hasUserName()
- && !baseAuthService.authorized(payload.userName(),payload.password())){
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
- connectBack(channel,connectReturnCode);
- return false;
- }
- return Optional.ofNullable(mqttChannelService.getMqttChannel(deviceId))
- .map(mqttChannel -> {
- switch (mqttChannel.getSessionStatus()){
- case OPEN:
- return false;
- }
- mqttChannelService.loginSuccess(channel, deviceId, mqttConnectMessage);
- return true;
- }).orElseGet(() -> {
- mqttChannelService.loginSuccess(channel, deviceId, mqttConnectMessage);
- return true;
- });
-
- }
-
- private void connectBack(Channel channel, MqttConnectReturnCode connectReturnCode){
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, true);
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
- MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0x02);
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
- channel.writeAndFlush(connAck);
- }
-
-
- /**
- * 发布
- */
- @Override
- public void publish(Channel channel, MqttPublishMessage mqttPublishMessage) {
- mqttChannelService.publishSuccess(channel, mqttPublishMessage);
- }
-
- /**
- * 订阅
- */
- @Override
- public void subscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {
- Set topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription ->
- mqttTopicSubscription.topicName()
- ).collect(Collectors.toSet());
- mqttChannelService.suscribeSuccess(mqttChannelService.getDeviceId(channel), topics);
- subBack(channel, mqttSubscribeMessage, topics.size());
- }
-
- private void subBack(Channel channel, MqttSubscribeMessage mqttSubscribeMessage, int num) {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
- MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(mqttSubscribeMessage.variableHeader().messageId());
- List grantedQoSLevels = new ArrayList<>(num);
- for (int i = 0; i < num; i++) {
- grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
- }
- MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
- MqttSubAckMessage mqttSubAckMessage = new MqttSubAckMessage(mqttFixedHeader, variableHeader, payload);
- channel.writeAndFlush(mqttSubAckMessage);
- }
-
-
- /**
- * 关闭通道
- */
- @Override
- public void close(Channel channel) {
- mqttChannelService.closeSuccess(mqttChannelService.getDeviceId(channel), false);
- channel.close();
- }
-
- /**
- * 回复pong消息
- */
- @Override
- public void pong(Channel channel) {
- if (channel.isOpen() && channel.isActive() && channel.isWritable()) {
- log.info("收到来自:【" + channel.remoteAddress().toString() + "】心跳");
- MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
- channel.writeAndFlush(new MqttMessage(fixedHeader));
- }
- }
-
- /**
- * 取消订阅
- */
- @Override
- public void unsubscribe(Channel channel, MqttUnsubscribeMessage mqttMessage) {
- List topics1 = mqttMessage.payload().topics();
- mqttChannelService.unsubscribe(mqttChannelService.getDeviceId(channel), topics1);
- unSubBack(channel, mqttMessage.variableHeader().messageId());
- }
-
- /**
- * 回复取消订阅
- */
- private void unSubBack(Channel channel, int messageId) {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
- MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
- MqttUnsubAckMessage mqttUnsubAckMessage = new MqttUnsubAckMessage(mqttFixedHeader, variableHeader);
- channel.writeAndFlush(mqttUnsubAckMessage);
- }
-
-
- /**
- * 消息回复确认(qos1 级别 保证收到消息 但是可能会重复)
- */
- @Override
- public void puback(Channel channel, MqttMessage mqttMessage) {
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = messageIdVariableHeader.messageId();
- mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId).setConfirmStatus(ConfirmStatus.COMPLETE); // 复制为空
- }
-
-
- /**
- * disconnect 主动断线
- */
- @Override
- public void disconnect(Channel channel) {
- mqttChannelService.closeSuccess(mqttChannelService.getDeviceId(channel), true);
- }
-
-
- /**
- * qos2 发布收到
- */
- @Override
- public void pubrec(Channel channel, MqttMessage mqttMessage ) {
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = messageIdVariableHeader.messageId();
- mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId).setConfirmStatus(ConfirmStatus.PUBREL); // 复制为空
- mqttChannelService.doPubrec(channel, messageId);
- }
-
- /**
- * qos2 发布释放
- */
- @Override
- public void pubrel(Channel channel, MqttMessage mqttMessage ) {
- MqttMessageIdVariableHeader mqttMessageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = mqttMessageIdVariableHeader.messageId();
- mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId).setConfirmStatus(ConfirmStatus.COMPLETE); // 复制为空
- mqttChannelService.doPubrel(channel, messageId);
-
- }
-
- /**
- * qos2 发布完成
- */
- @Override
- public void pubcomp(Channel channel, MqttMessage mqttMessage ) {
- MqttMessageIdVariableHeader mqttMessageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = mqttMessageIdVariableHeader.messageId();
- SendMqttMessage sendMqttMessage = mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId);
- sendMqttMessage.setConfirmStatus(ConfirmStatus.COMPLETE); // 复制为空
- }
-
- @Override
- public void doTimeOut(Channel channel, IdleStateEvent evt) {
- log.info("【PingPongService:doTimeOut 心跳超时】" + channel.remoteAddress() + "【channel 关闭】");
- switch (evt.state()) {
- case READER_IDLE:
- close(channel);
- case WRITER_IDLE:
- close(channel);
- case ALL_IDLE:
- close(channel);
- }
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/PublishApiSevice.java b/src/main/java/com/myself/nettychat/bootstrap/channel/PublishApiSevice.java
deleted file mode 100644
index 4af733f..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/PublishApiSevice.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.bean.MqttChannel;
-import com.myself.nettychat.bootstrap.bean.SendMqttMessage;
-import com.myself.nettychat.bootstrap.bean.WillMeaasge;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import com.myself.nettychat.common.utils.MessageId;
-import com.myself.nettychat.common.enums.ConfirmStatus;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.*;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 发送消息以及确认
- **/
-@Slf4j
-public class PublishApiSevice {
-
- private final ScanRunnable scanRunnable;
-
- public PublishApiSevice(ScanRunnable scanRunnable) {
- this.scanRunnable = scanRunnable;
- }
-
-
- /**
- * 写入遗嘱消息
- */
- protected void writeWillMsg(MqttChannel mqttChannel, WillMeaasge willMeaasge) {
-// dup保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等
- switch (willMeaasge.getQos()){
- case 0: // qos0
- sendQos0Msg(mqttChannel.getChannel(),willMeaasge.getWillTopic(),willMeaasge.getWillMessage().getBytes());
- break;
- case 1: // qos1
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,mqttChannel,willMeaasge.getWillTopic(),willMeaasge.getWillMessage().getBytes());
- break;
- case 2: // qos2
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,mqttChannel,willMeaasge.getWillTopic(),willMeaasge.getWillMessage().getBytes());
- break;
- }
-
-
- }
-
- protected void sendQosConfirmMsg(MqttQoS qos, MqttChannel mqttChannel, String topic, byte[] bytes) {
- if(mqttChannel.isLogin()){
- int messageId = MessageId.messageId();
- switch (qos){
- case AT_LEAST_ONCE:
- mqttChannel.addSendMqttMessage(messageId,sendQos1Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
- break;
- case EXACTLY_ONCE:
- mqttChannel.addSendMqttMessage(messageId,sendQos2Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
- break;
- }
- }
-
- }
-
-
- /**
- * 发送 qos1 类的消息
- */
- private SendMqttMessage sendQos1Msg(Channel channel, String topic, boolean isDup, byte[] byteBuf, int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,isDup, MqttQoS.AT_LEAST_ONCE,false,0);
- MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic,messageId );
- MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader,mqttPublishVariableHeader, Unpooled.wrappedBuffer(byteBuf));
- channel.writeAndFlush(mqttPublishMessage);
- return addQueue(channel,messageId,topic,byteBuf,MqttQoS.AT_LEAST_ONCE, ConfirmStatus.PUB);
- }
-
-
-
- /**
- * 发送 qos0 类的消息 byte
- */
- protected void sendQos0Msg(Channel channel, String topic, byte[] byteBuf){
- if(channel!=null){
- sendQos0Msg(channel,topic,byteBuf,0);
- }
- }
- private void sendQos0Msg(Channel channel, String topic, byte[] byteBuf,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
- MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic,messageId );
- MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader,mqttPublishVariableHeader,Unpooled.wrappedBuffer(byteBuf));
- channel.writeAndFlush(mqttPublishMessage);
- }
-
-
-
-
- private SendMqttMessage sendQos2Msg(Channel channel, String topic,boolean isDup, byte[] byteBuf, int messageId) {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,isDup, MqttQoS.EXACTLY_ONCE,false,0);
- MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic,messageId );
- MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader,mqttPublishVariableHeader, Unpooled.wrappedBuffer(byteBuf));
- channel.writeAndFlush(mqttPublishMessage);
- return addQueue(channel,messageId,topic,byteBuf,MqttQoS.EXACTLY_ONCE,ConfirmStatus.PUB);
- }
-
-
- /**
- * 发送qos1 publish 确认消息
- */
- protected void sendPubBack(Channel channel,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,false, MqttQoS.AT_MOST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- channel.writeAndFlush(mqttPubAckMessage);
- }
-
-
- /**
- * 发送qos2 publish 确认消息 第一步
- */
- protected void sendPubRec( MqttChannel mqttChannel,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- Channel channel = mqttChannel.getChannel();
- channel.writeAndFlush(mqttPubAckMessage);
- SendMqttMessage sendMqttMessage = addQueue(channel, messageId, null, null, null, ConfirmStatus.PUBREC);
- mqttChannel.addSendMqttMessage(messageId,sendMqttMessage);
- }
-
- /**
- * 发送qos2 publish 确认消息 第二步
- */
- protected void sendPubRel(Channel channel,boolean isDup,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL,isDup, MqttQoS.AT_LEAST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- channel.writeAndFlush(mqttPubAckMessage);
- }
-
- /**
- * 发送qos2 publish 确认消息 第三步
- */
- protected void sendToPubComp(Channel channel,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- channel.writeAndFlush(mqttPubAckMessage);
- }
-
- private SendMqttMessage addQueue(Channel channel,int messageId,String topic,byte[] datas,MqttQoS mqttQoS,ConfirmStatus confirmStatus){
- SendMqttMessage build = SendMqttMessage.builder().
- channel(channel).
- confirmStatus(confirmStatus).
- messageId(messageId)
- .topic(topic)
- .qos(mqttQoS)
- .byteBuf(datas)
- .time(System.currentTimeMillis()).build();
- scanRunnable.addQueue(build);
- return build;
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/WillService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/WillService.java
deleted file mode 100644
index 3aa4213..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/WillService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.BaseApi;
-import com.myself.nettychat.bootstrap.ChannelService;
-import com.myself.nettychat.bootstrap.bean.WillMeaasge;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-@Slf4j
-@Component
-@Data
-@NoArgsConstructor
-public class WillService implements BaseApi {
-
- @Autowired
- ChannelService channelService;
-
- private static ConcurrentHashMap willMeaasges = new ConcurrentHashMap<>(); // deviceid -WillMeaasge
-
-
-
- /**
- * 保存遗嘱消息
- */
- public void save(String deviceid, WillMeaasge build) {
- willMeaasges.put(deviceid,build); // 替换旧的
- }
-
-
- public void doSend( String deviceId) { // 客户端断开连接后 开启遗嘱消息发送
- if(StringUtils.isNotBlank(deviceId)&&(willMeaasges.get(deviceId))!=null){
- WillMeaasge willMeaasge = willMeaasges.get(deviceId);
- channelService.sendWillMsg(willMeaasge); // 发送遗嘱消息
- if(!willMeaasge.isRetain()){ // 移除
- willMeaasges.remove(deviceId);
- log.info("deviceId will message["+willMeaasge.getWillMessage()+"] is removed");
- }
- }
- }
-
- /**
- * 删除遗嘱消息
- */
- public void del(String deviceid ) {willMeaasges.remove(deviceid);}
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/cache/CacheMap.java b/src/main/java/com/myself/nettychat/bootstrap/channel/cache/CacheMap.java
deleted file mode 100644
index 68f805e..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/cache/CacheMap.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package com.myself.nettychat.bootstrap.channel.cache;
-
-import lombok.extern.slf4j.Slf4j;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 缓存操作
- **/
-@Slf4j
-public class CacheMap {
-
- private ConcurrentHashMap> datas = new ConcurrentHashMap<>();
-
- public boolean putData(K[] topic, V v){
- if(topic.length==1){
- Node kvNode = buildOne(topic[0], v);
- if(kvNode!=null && kvNode.topic.equals(topic[0])){
- return true;
- }
- }
- else{
- Node kvNode = buildOne(topic[0], null);
- for(int i=1;i kvNode = datas.get(ks[0]);
- for(int i=1;i getData(K[] ks){
- if(ks.length==1){
- return datas.get(ks[0]).get();
- }
- else{
- Node node = datas.get(ks[0]);
- if(node!=null){
- List all = new ArrayList<>();
- all.addAll(node.get());
- for(int i=1;i buildOne(K k,V v){
-
- Node node = this.datas.computeIfAbsent(k, key -> {
- Node kObjectNode = new Node<>(k);
- return kObjectNode;
- });
- if(v!=null){
- node.put(v);
- }
- return node;
- }
-
-
-
- class Node{
-
- private final K topic;
-
-
- private volatile ConcurrentHashMap> map =new ConcurrentHashMap<>() ;
-
-
- List vs = new CopyOnWriteArrayList<>();
-
-
- public K getTopic() {return topic;}
-
- Node(K topic) {
- this.topic = topic;
- }
-
- public boolean delValue(V v){
- return vs.remove(v);
- }
-
- public Node putNextValue(K k,V v){
- Node kvNode = map.computeIfAbsent(k, key -> {
- Node node = new Node<>(k);
- return node;
- });
- if(v!=null){
- kvNode.put(v);
- }
- return kvNode;
- }
-
-
- public Node getNext(K k){
- return map.get(k);
- }
-
-
- public boolean put(V v){
- return vs.add(v);
- }
-
-
- public List get(){
- return vs;
- }
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/coder/ByteBufToWebSocketFrameEncoder.java b/src/main/java/com/myself/nettychat/bootstrap/coder/ByteBufToWebSocketFrameEncoder.java
deleted file mode 100644
index 6c34606..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/coder/ByteBufToWebSocketFrameEncoder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.myself.nettychat.bootstrap.coder;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageEncoder;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-
-import java.util.List;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 转换
- **/
-public class ByteBufToWebSocketFrameEncoder extends MessageToMessageEncoder {
-
- @Override
- protected void encode(ChannelHandlerContext ctx, ByteBuf byteBuf, List