You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

528 lines
15 KiB

5 years ago
var requestLib = require('./request');
var wxTunnel = require('./wxTunnel');
/**
* 当前打开的信道同一时间只能有一个信道打开
*/
var currentTunnel = null;
// 信道状态枚举
var STATUS_CLOSED = Tunnel.STATUS_CLOSED = 'CLOSED';
var STATUS_CONNECTING = Tunnel.STATUS_CONNECTING = 'CONNECTING';
var STATUS_ACTIVE = Tunnel.STATUS_ACTIVE = 'ACTIVE';
var STATUS_RECONNECTING = Tunnel.STATUS_RECONNECTING = 'RECONNECTING';
// 错误类型枚举
var ERR_CONNECT_SERVICE = Tunnel.ERR_CONNECT_SERVICE = 1001;
var ERR_CONNECT_SOCKET = Tunnel.ERR_CONNECT_SOCKET = 1002;
var ERR_RECONNECT = Tunnel.ERR_RECONNECT = 2001;
var ERR_SOCKET_ERROR = Tunnel.ERR_SOCKET_ERROR = 3001;
// 包类型枚举
var PACKET_TYPE_MESSAGE = 'message';
var PACKET_TYPE_PING = 'ping';
var PACKET_TYPE_PONG = 'pong';
var PACKET_TYPE_TIMEOUT = 'timeout';
var PACKET_TYPE_CLOSE = 'close';
// 断线重连最多尝试 5 次
var DEFAULT_MAX_RECONNECT_TRY_TIMES = 5;
// 每次重连前,等待时间的增量值
var DEFAULT_RECONNECT_TIME_INCREASE = 1000;
function Tunnel(serviceUrl) {
if (currentTunnel && currentTunnel.status !== STATUS_CLOSED) {
throw new Error('当前有未关闭的信道,请先关闭之前的信道,再打开新信道');
}
currentTunnel = this;
// 等确认微信小程序全面支持 ES6 就不用那么麻烦了
var me = this;
//=========================================================================
// 暴露实例状态以及方法
//=========================================================================
this.serviceUrl = serviceUrl;
this.socketUrl = null;
this.status = null;
this.open = openConnect;
this.on = registerEventHandler;
this.emit = emitMessagePacket;
this.close = close;
this.isClosed = isClosed;
this.isConnecting = isConnecting;
this.isActive = isActive;
this.isReconnecting = isReconnecting;
//=========================================================================
// 信道状态处理,状态说明:
// closed - 已关闭
// connecting - 首次连接
// active - 当前信道已经在工作
// reconnecting - 断线重连中
//=========================================================================
function isClosed() { return me.status === STATUS_CLOSED; }
function isConnecting() { return me.status === STATUS_CONNECTING; }
function isActive() { return me.status === STATUS_ACTIVE; }
function isReconnecting() { return me.status === STATUS_RECONNECTING; }
function setStatus(status) {
var lastStatus = me.status;
if (lastStatus !== status) {
me.status = status;
}
}
// 初始为关闭状态
setStatus(STATUS_CLOSED);
//=========================================================================
// 信道事件处理机制
// 信道事件包括:
// connect - 连接已建立
// close - 连接被关闭(包括主动关闭和被动关闭)
// reconnecting - 开始重连
// reconnect - 重连成功
// error - 发生错误,其中包括连接失败、重连失败、解包失败等等
// [message] - 信道服务器发送过来的其它事件类型,如果事件类型和上面内置的事件类型冲突,将在事件类型前面添加前缀 `@`
//=========================================================================
var preservedEventTypes = 'connect,close,reconnecting,reconnect,error'.split(',');
var eventHandlers = [];
/**
* 注册消息处理函数
* @param {string} messageType 支持内置消息类型"connect"|"close"|"reconnecting"|"reconnect"|"error"以及业务消息类型
*/
function registerEventHandler(eventType, eventHandler) {
if (typeof eventHandler === 'function') {
eventHandlers.push([eventType, eventHandler]);
}
}
/**
* 派发事件通知所有处理函数进行处理
*/
function dispatchEvent(eventType, eventPayload) {
eventHandlers.forEach(function (handler) {
var handleType = handler[0];
var handleFn = handler[1];
if (handleType === '*') {
handleFn(eventType, eventPayload);
} else if (handleType === eventType) {
handleFn(eventPayload);
}
});
}
/**
* 派发事件事件类型和系统保留冲突的事件名会自动加上 '@' 前缀
*/
function dispatchEscapedEvent(eventType, eventPayload) {
if (preservedEventTypes.indexOf(eventType) > -1) {
eventType = '@' + eventType;
}
dispatchEvent(eventType, eventPayload);
}
//=========================================================================
// 信道连接控制
//=========================================================================
var isFirstConnection = true;
var isOpening = false;
/**
* 连接信道服务器获取 WebSocket 连接地址获取地址成功后开始进行 WebSocket 连接
*/
function openConnect() {
if (isOpening) return;
isOpening = true;
// 只有关闭状态才会重新进入准备中
setStatus(isFirstConnection ? STATUS_CONNECTING : STATUS_RECONNECTING);
requestLib.request({
url: serviceUrl,
method: 'GET',
success: function (response) {
if (+response.statusCode === 200 && response.data && response.data.url) {
openSocket(me.socketUrl = response.data.url);
} else {
dispatchConnectServiceError(response);
}
},
fail: dispatchConnectServiceError,
complete: () => isOpening = false,
});
function dispatchConnectServiceError(detail) {
if (isFirstConnection) {
setStatus(STATUS_CLOSED);
dispatchEvent('error', {
code: ERR_CONNECT_SERVICE,
message: '连接信道服务失败,网络错误或者信道服务没有正确响应',
detail: detail || null,
});
} else {
startReconnect(detail);
}
}
}
/**
* 打开 WebSocket 连接打开后注册微信的 Socket 处理方法
*/
function openSocket(url) {
wxTunnel.listen({
onOpen: handleSocketOpen,
onMessage: handleSocketMessage,
onClose: handleSocketClose,
onError: handleSocketError,
});
wx.connectSocket({ url: url });
isFirstConnection = false;
}
//=========================================================================
// 处理消息通讯
//
// packet - 数据包,序列化形式为 `${type}` 或者 `${type}:${content}`
// packet.type - 包类型,包括 message, ping, pong, close
// packet.content? - 当包类型为 message 的时候,会附带 message 数据
//
// message - 消息体,会使用 JSON 序列化后作为 packet.content
// message.type - 消息类型,表示业务消息类型
// message.content? - 消息实体,可以为任意类型,表示消息的附带数据,也可以为空
//
// 数据包示例:
// - 'ping' 表示 Ping 数据包
// - 'message:{"type":"speak","content":"hello"}' 表示一个打招呼的数据包
//=========================================================================
// 连接还没成功建立的时候,需要发送的包会先存放到队列里
var queuedPackets = [];
/**
* WebSocket 打开之后更新状态同时发送所有遗留的数据包
*/
function handleSocketOpen() {
/* istanbul ignore else */
if (isConnecting()) {
dispatchEvent('connect');
}
else if (isReconnecting()) {
dispatchEvent('reconnect');
resetReconnectionContext();
}
setStatus(STATUS_ACTIVE);
emitQueuedPackets();
nextPing();
}
/**
* 收到 WebSocket 数据包交给处理函数
*/
function handleSocketMessage(message) {
resolvePacket(message.data);
}
/**
* 发送数据包如果信道没有激活将先存放队列
*/
function emitPacket(packet) {
if (isActive()) {
sendPacket(packet);
} else {
queuedPackets.push(packet);
}
}
/**
* 数据包推送到信道
*/
function sendPacket(packet) {
var encodedPacket = [packet.type];
if (packet.content) {
encodedPacket.push(JSON.stringify(packet.content));
}
wx.sendSocketMessage({
data: encodedPacket.join(':'),
fail: handleSocketError,
});
}
function emitQueuedPackets() {
queuedPackets.forEach(emitPacket);
// empty queued packets
queuedPackets.length = 0;
}
/**
* 发送消息包
*/
function emitMessagePacket(messageType, messageContent) {
var packet = {
type: PACKET_TYPE_MESSAGE,
content: {
type: messageType,
content: messageContent,
},
};
emitPacket(packet);
}
/**
* 发送 Ping
*/
function emitPingPacket() {
emitPacket({ type: PACKET_TYPE_PING });
}
/**
* 发送关闭包
*/
function emitClosePacket() {
emitPacket({ type: PACKET_TYPE_CLOSE });
}
/**
* 解析并处理从信道接收到的包
*/
function resolvePacket(raw) {
var packetParts = raw.split(':');
var packetType = packetParts.shift();
var packetContent = packetParts.join(':') || null;
var packet = { type: packetType };
if (packetContent) {
try {
packet.content = JSON.parse(packetContent);
} catch (e) {}
}
switch (packet.type) {
case PACKET_TYPE_MESSAGE:
handleMessagePacket(packet);
break;
case PACKET_TYPE_PONG:
handlePongPacket(packet);
break;
case PACKET_TYPE_TIMEOUT:
handleTimeoutPacket(packet);
break;
case PACKET_TYPE_CLOSE:
handleClosePacket(packet);
break;
default:
handleUnknownPacket(packet);
break;
}
}
/**
* 收到消息包直接 dispatch 给处理函数
*/
function handleMessagePacket(packet) {
var message = packet.content;
dispatchEscapedEvent(message.type, message.content);
}
//=========================================================================
// 心跳、断开与重连处理
//=========================================================================
/**
* Ping-Pong 心跳检测超时控制这个值有两个作用
* 1. 表示收到服务器的 Pong 相应之后过多久再发下一次 Ping
* 2. 如果 Ping 发送之后超过这个时间还没收到 Pong断开与服务器的连接
* 该值将在与信道服务器建立连接后被更新
*/
let pingPongTimeout = 15000;
let pingTimer = 0;
let pongTimer = 0;
/**
* 信道服务器返回 Ping-Pong 控制超时时间
*/
function handleTimeoutPacket(packet) {
var timeout = packet.content * 1000;
/* istanbul ignore else */
if (!isNaN(timeout)) {
pingPongTimeout = timeout;
ping();
}
}
/**
* 收到服务器 Pong 响应定时发送下一个 Ping
*/
function handlePongPacket(packet) {
nextPing();
}
/**
* 发送下一个 Ping
*/
function nextPing() {
clearTimeout(pingTimer);
clearTimeout(pongTimer);
pingTimer = setTimeout(ping, pingPongTimeout);
}
/**
* 发送 Ping等待 Pong
*/
function ping() {
/* istanbul ignore else */
if (isActive()) {
emitPingPacket();
// 超时没有响应,关闭信道
pongTimer = setTimeout(handlePongTimeout, pingPongTimeout);
}
}
/**
* Pong 超时没有响应信道可能已经不可用需要断开重连
*/
function handlePongTimeout() {
startReconnect('服务器已失去响应');
}
// 已经重连失败的次数
var reconnectTryTimes = 0;
// 最多允许失败次数
var maxReconnectTryTimes = Tunnel.MAX_RECONNECT_TRY_TIMES || DEFAULT_MAX_RECONNECT_TRY_TIMES;
// 重连前等待的时间
var waitBeforeReconnect = 0;
// 重连前等待时间增量
var reconnectTimeIncrease = Tunnel.RECONNECT_TIME_INCREASE || DEFAULT_RECONNECT_TIME_INCREASE;
var reconnectTimer = 0;
function startReconnect(lastError) {
if (reconnectTryTimes >= maxReconnectTryTimes) {
close();
dispatchEvent('error', {
code: ERR_RECONNECT,
message: '重连失败',
detail: lastError,
});
}
else {
wx.closeSocket();
waitBeforeReconnect += reconnectTimeIncrease;
setStatus(STATUS_RECONNECTING);
reconnectTimer = setTimeout(doReconnect, waitBeforeReconnect);
}
if (reconnectTryTimes === 0) {
dispatchEvent('reconnecting');
}
reconnectTryTimes += 1;
}
function doReconnect() {
openConnect();
}
function resetReconnectionContext() {
reconnectTryTimes = 0;
waitBeforeReconnect = 0;
}
/**
* 收到服务器的关闭请求
*/
function handleClosePacket(packet) {
close();
}
function handleUnknownPacket(packet) {
// throw away
}
var isClosing = false;
/**
* 收到 WebSocket 断开的消息处理断开逻辑
*/
function handleSocketClose() {
/* istanbul ignore if */
if (isClosing) return;
/* istanbul ignore else */
if (isActive()) {
// 意外断开的情况,进行重连
startReconnect('链接已断开');
}
}
function close() {
isClosing = true;
closeSocket();
setStatus(STATUS_CLOSED);
resetReconnectionContext();
isFirstConnection = false;
clearTimeout(pingTimer);
clearTimeout(pongTimer);
clearTimeout(reconnectTimer);
dispatchEvent('close');
isClosing = false;
}
function closeSocket(emitClose) {
if (isActive() && emitClose !== false) {
emitClosePacket();
}
wx.closeSocket();
}
//=========================================================================
// 错误处理
//=========================================================================
/**
* 错误处理
*/
function handleSocketError(detail) {
switch (me.status) {
case Tunnel.STATUS_CONNECTING:
dispatchEvent('error', {
code: ERR_SOCKET_ERROR,
message: '连接信道失败,网络错误或者信道服务不可用',
detail: detail,
});
break;
}
}
}
module.exports = Tunnel;