数据订阅服务使用说明
版本编号 | *变化状态 | 变更内容和范围 | 变更日期 | 变更人 | 批准日期 | 批准人 |
---|---|---|---|---|---|---|
1.0.0 | A | 新增 | 2022-7-8 | 高玉坤 | 程揭章 |
*变化状态:A——增加,M——修改,D——删除,N——正式发布
1 引言
1.1 执行摘要
- 为智慧时空大数据平台建立统一的数据订阅服务,不同的客户端均使用统一的方式对数据进行订阅。作为第三方应用程序开发的依据和输入。
2 公共说明
2.1 接入地址
接口分类 | 接入地址 |
---|---|
数据订阅 | ws://geovis.center.com/wsserver |
2.1 签名认证
说明
- 调用方需要对发送到ws的请求进行签名,执行签名计算的签名值需要赋值到URL的sign属性,以便服务端进行签名验证。
参数介绍
- 待签名字符串为: accesskeyId + accesskeySecret + timestamp;
- accesskeyId:授权账户,40位以内字符,智慧创新研究院智慧中台全局唯一;
- accesskeySecret:在智慧创新研究院智慧中台申请的accesskeySecret,不能明文发送;
- timestamp:Unix时间戳,精确到毫秒;
算法
- 签名算法就是对待签名字符串计算32位小写SHA-256值,算法示例见附录。
3 数据订阅服务接口
3.1 数据通信模型
客户端向服务端发起连接请求,服务端对连接请求参数进行鉴权,鉴权失败,则断开连接并返回客户端鉴权失败信息;鉴权成功,则建立通道。
鉴权成功后,客户端可多次向服务端发起订阅关系,多次订阅同一个topic的key组合,服务端则自动合并为一个订阅,默认为同一个客户端订阅;
针对同一个云服务,发起多次连接,服务端则认为是多个客户端发起请求,所有消息将平衡分发到多个客户端上,每个云服务最多可发起连接数为系统可配置参数。
取消订阅,则删除订阅关系,服务端返回客户端取消响应结果。
3.2 建立连接
- 提供建立连接的功能。客户端发起连接请求,数据订阅系统会对客户端连接请求进行鉴权后,建立与客户端的连接。
3.2.1 建立连接
功能描述
- 客户端发起连接请求,请求时携带对应应用的accesskeyId和accesskeySecret签名信息进行鉴权,鉴权成功返回鉴权结果,鉴权失败,则返回错误信息并断开连接。
接入地址: /websocket
输入参数
参数名 | 类型 | 位置 | 必填 | 说明 |
---|---|---|---|---|
accessKeyId | String | URL | 必填 | 授权ID,平台全局唯一。 |
sign | String | URL | 必填 | 对请求进行签名运算产生的签名,签名算法见2.5章节。 |
timestamp | String | URL | 必填 | Unix时间戳,精确到毫秒。 |
resetTime | Integer | URL | 选填 | 指定消息消费的时间,以分钟为单位。取值范围:可以最多指定到2个小时之前的数据(0-120);不传当前参数时,从最后一次消费的时间开始消费; 当传入时间时,从指定时间开始消费消息。如:传入0时,从当前时间开始消费;传入60,从当前时间1小时前开始消费。 |
输出参数
参数名 | 类型 | 位置 | 必填 | 说明 |
---|---|---|---|---|
cmd | String | Body | 必填 | 操作说明 |
data | String | Body | 必填 | 响应结果 |
请求样例
1 |
|
3.2.2 订阅接口
功能描述
- 订阅指定topic消息,消息订阅必须在建立连接成功的前提下进行,如果建立连接返回成功,才可以发送订阅,如果失败,则无法进行订阅。
接口定义
1 |
|
说明:
- (1) 客户端一个连接情况下只能发起一次订阅消息(服务器端做了限制,多发不起作用),订阅信息中的多个topic,其中如果有任何一个订阅验证失败,则本次请求全部订阅均失败,只有当全部topic的keys订阅成功,则本次订阅成功。
- (2) *代表统配所有topic的意思 错误码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20云端向客户端返回订阅结果的响应JSON字符串格式数据如下(红色部分为示例数据):
{
"cmd": "subscribe-ack",
"data": {
"code": "00000",
"result": "success",
"desc": "subscribed ok"
}
}
订阅成功之后,当topic有数据更新时,客户端会收到如下JSON字符串格式数据:
{
"partition": "1",
"data": "{\"msg\":\"1111111111\",\"gid\":\"1111111111\",\"t\":\"1111111111\",\"name\":\"土壤水TDS\",\"id\":\"1111111111\",\"gname\":\"田间气象端口1\",\"value\":\"327\"}",
"topic": "SocketData01",
"time": "2022-07-12 13:33:07"
}
- 34001,34002,34003,34004,34005,34006,34999
3.2.3 取消订阅接口
功能描述
- 取消订阅指定topic消息,取消订阅必须在建立连接成功的前提下进行,如果建立连接返回成功,才可以发取消订阅,如果失败,则无法进行取消订阅。
接口定义
1 |
|
错误码:
- 34001,34002,34003,34004,34005,34006,34999
3.2.4 心跳报文
功能描述
- 心跳间隔时间,具体根据实际情况设置,如nginx默认websocket超时时间是60s,则可以在连接无数据交互情况持续到快过期前发送一次心跳
接口定义
1 |
|
错误码:
- 34001,34002,34003,34004,34005,34006,34999
3.2.5 关闭连接功能
- 关闭连接具体无交互接口,只需客户端关闭session即可。
3.2.6 自动重连机制
- 由于网络闪断、Websocket Server服务端重启升级等原因,势必造成已有Websocket Client接入端连接中断,所以强烈建议Websocket Client接入端代码增加自动重连机制,可参照3.9示例中绿色标注代码或在此基础上优化。
注:
- (1) 自动重连尝试间隔可逐步递增,如5s尝试一次连接,如果不成功则2min后再尝试一次连接,如果还未成功则5min后再尝试连接一次。
- (2) 如果(1)未重连成功,则可尝试在以(1)为一个周期,持续循环重连。
- (3) 建议重连尝试间隔不易过短或频繁,如几秒钟一循环,以防止瞬间大量访问,对服务端造成连接压力。
4 附录
4.1 返回码列表公共错误码
错误码 | 描述 |
---|---|
34001 | Illegal parameters.(参数非法) |
34002 | The type information obtained is illegal. (获取到的类型信息非法) |
34003 | Add subscribe relationship fail. (添加订阅关系鉴权失败) |
34004 | Delete subscribe relationship fail. (取消订阅关系鉴权失败) |
34005 | Illegal result of search. (查询结果异常) |
34006 | Invalid connection. (连接异常) |
34999 | UnKnown error. (未知异常) |
4.2 签名算法示例
1 |
|
4.3 示例
- 以下提供Client端简单功能示例代码,仅方便引导本地开发使用,具体可根据本地实际情况进行二次开发。
1、 客户端Websocket实现包的pom依赖说明
1 |
|
2、 初始化客户端建立连接
1 |
|
3、 订阅详细消息
1 |
|
4、 取消订阅详细消息
1 |
|
5、 关闭当前客户端连接
1 |
|
注:如果每次建立连接后只是进行查询等一次性操作(即不需要连接长时间保留),则建议操作完后关闭当前链接,同时不建议使用心跳。
6、 整示例代码(可复用)
7、 代码功能:
- (1) 支持Client端订阅消息。
- (2) 支持心跳检测,心跳功能简单支持当客户端和服务器端连接处于无数据交互状态时才发送心跳检测消息,有数据交互时不发送。支持连接断开后指定时间进行重连尝试。说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
WebsocketSubscriberClient类:
import com.google.common.base.Throwables;
import org.apache.tomcat.websocket.WsWebSocketContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.*;
import java.net.URI;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ClientEndpoint
public class WebsocketSubscriberClient {
private Logger logger = LoggerFactory.getLogger(WebsocketSubscriberClient.class);
// 最新收消息时间,心跳发消息时做时间间隔计算
public static AtomicLong lastReceiveTime = new AtomicLong();
// 是否继续心跳标志位
public static AtomicBoolean isHeartBeatContinue = null;
private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 创建容器对象(建立连接时要用)
private WebSocketContainer container = ContainerProvider.getWebSocketContainer();
// 一个连接对应一个session
private static Session session = null;
// 心跳线程池
private ExecutorService executorService = null;
// 关闭心跳线程要用到
private Future<?> future = null;
private static MsgProcessor msgProcessor;
private static String url;
private static List<String> cmdInfos;
public WebsocketSubscriberClient() {
}
public WebsocketSubscriberClient(String uri, List<String> cmdInfos, MsgProcessor msgProcessor) throws Exception {
WebsocketSubscriberClient.url = uri;
WebsocketSubscriberClient.msgProcessor = msgProcessor;
WebsocketSubscriberClient.cmdInfos = cmdInfos;
session = container.connectToServer(WebsocketSubscriberClient.class, URI.create(uri));
logger.info("WebsocketSubscriberClient init finished...");
if (session.isOpen()) {
executorService = new ThreadPoolExecutor(1, 1, 6000, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>());
lastReceiveTime = new AtomicLong(System.currentTimeMillis());
isHeartBeatContinue = new AtomicBoolean(true);
// 值为0表示会话连接不会因为长时间无数据交互而超时
session.setMaxIdleTimeout(0);
// 1Mb
session.setMaxTextMessageBufferSize(1024 * 1024);
// 启动心跳
startHeartBeat();
} else {
container = null;
}
}
@OnMessage
public void onMessage(final String message) {
try {
long currentTime = System.currentTimeMillis();
// 更新最新收消息时间
lastReceiveTime.set(currentTime);
String currentDateStr = dateFormat.format(currentTime);
logger.debug(currentDateStr + " 收到 message:" + message);
msgProcessor.processorMsg(message);
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
}
}
@OnClose
public void onClose(CloseReason closeReason) {
stopHeartBeat();
stopContainer();
logger.error("Connection closed! CloseReason : code:{},reason:{}", closeReason.getCloseCode().getCode(),
closeReason.getReasonPhrase());
if (closeReason.getCloseCode().getCode() != CloseReason.CloseCodes.NORMAL_CLOSURE.getCode()) {
boolean isSuccess = false;
while (!isSuccess) {
try {
logger.error("Try restarting the connection after 5s...");
Thread.sleep(5000);
isSuccess = buildClient();
if (!isSuccess) {
logger.error("Try restarting the connection after 2min...");
Thread.sleep(120000);
isSuccess = buildClient();
if (!isSuccess) {
logger.error("Try restarting the connection after 5min...");
Thread.sleep(300000);
isSuccess = buildClient();
}
}
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
}
}
}
}
private boolean buildClient() {
try {
new WebsocketSubscriberClient(url, cmdInfos, msgProcessor);
cmdInfos.forEach(cmd -> {
addCmdInfo(cmd);
});
return true;
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
return false;
}
}
public void addCmdInfo(final String cmdInfo) {
try {
if (session.isOpen()) {
synchronized (session) {
session.getBasicRemote().sendText(cmdInfo);
}
}
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
}
}
public void close() {
try {
// 关闭心跳
stopHeartBeat();
// 关闭session连接
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "NORMAL_CLOSE"));
// 关闭容器
stopContainer();
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
}
}
private void stopContainer() {
if (container instanceof WsWebSocketContainer) {
try {
((WsWebSocketContainer) container).destroy();
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
}
}
}
private void startHeartBeat() {
future = executorService.submit(new HeartBeat());
}
private void stopHeartBeat() {
if (future != null) {
try {
isHeartBeatContinue.set(false);
future.cancel(true);
executorService.shutdown();
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
}
}
}
private class HeartBeat implements Runnable {
// 心跳间隔时间,具体根据实际情况设置,如nginx默认websocket超时时间是60s,则可以在连接无数据交互情况持续到快过期前发送一次心跳(这里设置55s时),
private long heartBeatInterval = 55000;
@Override
public void run() {
try {
while (isHeartBeatContinue.get()) {
long startTime = System.currentTimeMillis();
if (startTime - lastReceiveTime.get() > heartBeatInterval) {
String cmdInfo = "{\"cmd\":\"keepAlive\"}";
if (session.isOpen()) {
synchronized (session) {
session.getBasicRemote().sendText(cmdInfo);
}
// 更新最新收消息时间
lastReceiveTime.set(System.currentTimeMillis());
}
}
Thread.sleep(1000);
}
} catch (Exception e) {
logger.error("Exception = {}", Throwables.getStackTraceAsString(e));
}
}
}
}
MsgProcessor接口:
public interface MsgProcessor {
void processorMsg(String msg);
}
测试示例代码:
import java.util.ArrayList;
import java.util.List;
public class TestSubscribeMsg {
public static void main(String[] args) {
buildConnectionAndSubscribeMsg();
}
private static void buildConnectionAndSubscribeMsg() {
try {
String url = "ws://192.168.131.2:8091/websocket?accesskeyId=3o80mxreadyagomr×tamp=1651747253729&sign=b38528d5ef61118a1b72e47a4142f2def04fef3babc17894a54beaadd0a9691f";
String cmdInfo= "{\"cmd\":\"subscribe\", \"topics\":[\"*\"]}";
// cmdInfos 用于websocket非正常端开时重连订阅。
List<String> cmdInfos = new ArrayList<>();
cmdInfos.add(cmdInfo);
MsgProcessor processor = new MsgProcessor() {
@Override
public void processorMsg(String msg) {
System.out.println("receive msg: " + msg);
}
};
WebsocketSubscriberClient client = new WebsocketSubscriberClient(url,cmdInfos,processor);
//发送订阅请求
client.addCmdInfo(cmdInfo);
//String unSubscribeCmdInfo= "{\"cmd\":\"unsubscribe\", \"topics\":[\"DEV_STATUS\", \"DEV_BIGDATA\"]}";
//发送取消订阅请求并关闭当前客户端连接
//client.addCmdInfo(unSubscribeCmdInfo);
//关闭当前客户端连接
// client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
(1) 心跳的JSON字符串格式如下:
{"cmd": "keepAlive"}
(2) 服务端相应如下:
{
“cmd”:”keepAlive”,
“code”:”000000”,
“desc”:”success”
}
4.4 自动重连机制
由于网络闪断、Websocket Server服务端重启升级等原因,势必造成已有Websocket Client接入端连接中断,所以强烈建议Websocket Client接入端代码增加自动重连机制,可参照示例基础上优化。
说明:
- (1) 自动重连尝试间隔可逐步递增,如5s尝试一次连接,如果不成功则2min后再尝试一次连接,如果还未成功则5min后再尝试连接一次。
- (2) 如果(1)未重连成功,则可尝试在以(1)为一个周期,持续循环重连。
- (3) 建议重连尝试间隔不易过短或频繁,如几秒钟一循环,以防止瞬间大量访问,对服务端造成连接压力。