服务器推送客户端方案 SSE 在规则引擎中的应用

服务器推送客户端方案 SSE 在规则引擎中的应用

应用场景:

  • 平台中约束规则约束类型包括中断 (驳回的提醒)/ 提示(通过的提醒) 两种类型 , 针对于 提示类型平台会发出消息,app 端需要接收消息实现

  • 其他服务器主动向客户端推送的场景

中断类型的提示可通过抛出异常的方式实现, 也会推送异常消息.

交互频繁的场景, 一般采用主动推送替代客户端轮询 (轮询只应用在: 并发用户量少,而且要求消息的实时性不高的场景,一般很少采用。)

客户端实时 / 准实时获取数据的实现方法有如下几种:

  1. 长 / 短轮询(客户端拉动)
  2. WebSockets(服务器推送)
  3. 服务器发送的事件(服务器推送)
  • 客户端拉取 - 客户端以一定的定期间隔向服务器请求更新
  • 服务器推送 - 服务器正在主动将更新推送到客户端(客户端拉取的反向操作)

后端主动推送的方式与两种, WebSocket & EventSource

WebSocket & EventSource 的区别

  1. WebSocket 基于 TCP 协议,EventSource 基于 http 协议。

  2. EventSource 是单向通信,而 websocket 是双向通信。

  3. EventSource 只能发送文本,而 websocket 支持发送二进制数据。

  4. 在实现上 EventSource 比 websocket 更简单。

  5. EventSource 有自动重连接(不借助第三方)以及发送随机事件的能力。

  6. websocket 的资源占用过大 EventSource 更轻量。

  7. websocket 可以跨域,EventSource 基于 http 跨域需要服务端设置请求头。

EventSource 优势

EventSource 客户端实现如下

const evtSource = new EventSource('/events');
evtSource.addEventListener('event', function(evt) {
const data = JSON.parse(evt.data);
// Use data here
},false);

我们从这种方法中获得的主要好处是:

  • 实施更简单,数据效率更高
  • 开箱即用地通过 HTTP / 2 自动多路复用
  • 将客户端上数据的连接数限制为一个

如何在 SSE,WebSocket 和 Polling 中进行选择?

经过漫长而详尽的客户端和服务器实施之后,SSE 似乎是我们解决数据交付问题的最终答案。也有一些问题,但是可以解决。

可以利用服务器发送事件的应用程序的一些简单示例:

  • 实时股价流图
  • 重要事件的实时新闻报道(发布链接,推文和图片)
  • 监视服务器统计信息(如正常运行时间,运行状况和正在运行的进程)的监视器。

但是,SSE 不仅是其他提供快速更新的方法的可行替代方案。在某些特定情况下,例如在 SSE 被证明是理想解决方案的情况下,每个人都可以胜过其他人。考虑一个像 MMO(大型多人在线)游戏这样的场景,该场景需要来自连接两端的大量消息。在这种情况下,WebSockets 将压制 SSE。

浏览器支持情况

**IE 不支持 **

Polyfills 可用于不支持 EventSource API 的浏览器。

平台设计

平台针对 SSE 有如下几点的考虑

  1. 使用 springboot 独立部署,减轻服务器压力

  2. 使用 redis 缓存,存储提示、中断消息内容、用户、类型(web/app)、事件类型、操作时间、是否推送、业务操作类型、业务操作对象、业务操作数据等字段,实现快速存储、读取

  3. EventSource 推送数据,是以广播方式推送,请求地址区分 web 端和 app 端

  4. 通过当前登录用户、类型(web/app)、事件类型获取 redis 缓存数据展示消息内容

  5. 消息判定为中断时,定义特殊异常抛出,所有 app 端后台捕获消息,不再提示

配置约束规则,服务器主动推送消息到客户端显示,使用 springBoot 独立部署服务,使用 redis 快速存储、读取提示、中断消息,HTML5 规范中提供了服务端事件 EventSource,浏览器在实现了该规范的前提下创建一个 EventSource 连接后,便可收到服务端的发送的消息。

MES 服务端修改

(注意修改 springboot 的 ip 和端口号,不要写 localhost,直接写 ip,避免影响 app 端访问)

1.application.properties(注意修改 Mestar-Home 下面)

请复制:

##是否启用规则约束通过springboot+eventSource服务端推送

server_push=true

#配置springBoot访问地址

springbootUrl=http://192.168.41.88:8081/

2. 服务端读取配置参数

(1)web 端访问平台代码 indexAction.java 修改 execute() 方法

        String server_push= EnvUtil.getMestarHomeApplicationProperties().getProperty("server_push");
        String springbootUrl= EnvUtil.getMestarHomeApplicationProperties().getProperty("springbootUrl");
        ServletActionContext.getRequest().setAttribute("server_push", server_push);
        ServletActionContext.getRequest().setAttribute("springbootUrl", springbootUrl);

(2)app 端访问产品代码 PadController.java 修改 login() 方法

            String server_push= EnvUtil.getMestarHomeApplicationProperties().getProperty("server_push");
            String springbootUrl= EnvUtil.getMestarHomeApplicationProperties().getProperty("springbootUrl");
            this.returnData.toSuccess(result + "," + pw + "," + multipleWorkCentersFlag+","+server_push+","+springbootUrl);

3. 不符合约束规则的消息存入 redis 缓存,中断类型消息抛出自定义异常 MestarRuleException,

(1) 修改 MestarRuleEngine.java 方法 public MessageResult sendToRuleCheck(String payload, String entityClass, String eventType) 中修改提示和中断信息,去掉选择信息

此处有捕获异常并抛出,抛出自定义的约束规则异常;

(2) 新增 TipsBO.java 用于存储 json 对象到 redis

package com.epichust.mestar.rule.bo;

import java.io.Serializable;
import java.util.Date;

public class TipsBO implements Serializable
{

    /**
     * serialVersionUID:TODO(用一句话描述这个变量表示什么).
     * 
     * @since JDK 1.6
     */
    private static final long serialVersionUID = 1L;
    private String msgContent;// 消息内容
    private String level;// 消息级别 info,error
    private Date noticeDate;// 通知日期
    private String userName;
    private String msgType = "message";// 事件类型,可以自定义事件event发送不同事件的监听,默认message
    private String operType;// 操作类型
    private String operObj;// 操作对象
    private String operData;// 操作数据
    private String type;//WEB/APP
    public String getMsgContent()
    {
        return msgContent;
    }

    public void setMsgContent(String msgContent)
    {
        this.msgContent = msgContent;
    }

    public String getLevel()
    {
        return level;
    }

    public void setLevel(String level)
    {
        this.level = level;
    }

    public Date getNoticeDate()
    {
        return noticeDate;
    }

    public void setNoticeDate(Date noticeDate)
    {
        this.noticeDate = noticeDate;
    }

    public String getUserName()
    {
        return userName;
    }

    public void setUserName(String userName)
    {
        this.userName = userName;
    }

    public String getMsgType()
    {
        return msgType;
    }

    public void setMsgType(String msgType)
    {
        this.msgType = msgType;
    }

    public String getOperType()
    {
        return operType;
    }

    public void setOperType(String operType)
    {
        this.operType = operType;
    }

    public String getOperObj()
    {
        return operObj;
    }

    public void setOperObj(String operObj)
    {
        this.operObj = operObj;
    }

    public String getOperData()
    {
        return operData;
    }

    public void setOperData(String operData)
    {
        this.operData = operData;
    }

    public String getType()
    {
        return type;
    }

    public void setType(String type)
    {
        this.type = type;
    }

}

(3)新增 MestarRuleException.java 自定义约束规则异常,app 端针对此类异常进行捕获异常不弹框,统一由服务端推送弹框

/**
 * CopyRight ©2012-2022 Epic-HUST Technology(Wuhan)Co.,Ltd. All Rights Reserved. 
 */

package com.epichust.mestar.utils.exception;


import java.io.PrintWriter;
import java.io.StringWriter;

import com.epichust.mestar.logging.MestarLogger;

/**
 * 
 * ClassName: MestarRuleException <br/>   
 * Function: 定义MestarRuleException用来处理约束规则异常. <br/>   
 * Reason: TODO ADD REASON(可选). <br/>   
 * date: 2019年4月12日 上午9:24:09 <br/>   
 *   
 * @author nicole   
 * @version    
 * @since JDK 1.6
 */
public class MestarRuleException extends RuntimeException
{

	protected MestarRuleException()
	{
		super();
	}

	/**
	 * 自动给出错误信息
	 * Creates a new instance of MestarException.   
	 *   
	 * @param cause
	 */
	public MestarRuleException(Throwable cause)
	{
	        super(MestarRuleException.getCoreMsg(cause), cause);
	       MestarLogger.error("ERROR:"+ cause+":"+MestarRuleException.getCoreMsg(cause), cause);
	}

	/**
	 * 不记录原始堆栈, 1.7之后的版本支持这种先进的写法
	 * @author rico 2016-01-08 增加Info日志输出
	 * @param message
	 */
	public MestarRuleException(final String message)
	{
		super(message, null, false, false);
		MestarLogger.info("INFO:"+ message);
	}

	/**
	 * 不记录原始堆栈,记录日志,1.7之后的版本支持这种先进的写法
	 * 
	 * @param message
	 * @param cause
	 */
	public MestarRuleException(String message, Throwable cause)
	{
		// 该方式主要是记录Message,不再Log日志
		super(message, null, false, false);
		MestarLogger.error(cause+":"+message);
	}

	/**
	 * 记录原始堆栈,记录详细日志
	 * 
	 * @param cause
	 * @param message
	 */
	public MestarRuleException(Throwable cause, String message)
	{
		super(message, cause);

		// 默认throw之后记录日志.
		MestarLogger.error(message, cause);	
	}

	private static final long serialVersionUID = 9189822891866606895L;

	public static String getStackTrace(Exception e)
	{
		StringWriter writer = new StringWriter();
		e.printStackTrace(new PrintWriter(writer, true));
		return writer.toString();
	}
	
	public static String getStackTrace(Throwable e)
	{
		StringWriter writer = new StringWriter();
		e.printStackTrace(new PrintWriter(writer, true));
		return writer.toString();
	}
	/**
	 * getCoreMsgInStackTrace:(得到原始堆栈信息). <br/>   
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>   
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>   
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>   
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>   
	 *   
	 * @author kim.jin   
	 * @param cause
	 * @return   
	 * @since JDK 1.6
	 */
	public static String getCoreMsgInStackTrace(Throwable cause)
	{
		if (cause == null)
		{
			return "";
		} else
		{
			return getStackTrace(cause);
		}
	}
	
	/**
     * 得到原始报错信息
     * @param t
     * @return
     */
    public static String getCoreMsg(Throwable t)
    {
        Throwable next = t.getCause();  
        if (next == null) 
        {  
           return t.getMessage();  
        } else 
        {  
           return getCoreMsg(next);  
        }
    }  

}

(4)所有 app 端 controller 层,捕获异常增加捕获 MestarRuleException,此类异常不需弹出消息

锡柴生产报工示例

项目页面修改

index-desktop.jsp 在 $(function(){}) 中追加 eventSource 的 js 代码,跨域请求地址 springboot 地址 /sseMsg/pushWeb/ 用户,实现 web 端消息推送

请复制:

	if('${server_push}'=="true"){
		 if (window.EventSource) {
				console.log("Event source available");
				//var source = new EventSource("${ctx}"+"/sseMsg");
		        var source = new EventSource('${springbootUrl}'+"sseMsg/pushWeb/"+'${user.loginName}');
				source.addEventListener('message', function(e) {
					console.log(e.data);
					var tip=$.parseJSON(e.data);
						 if(tip.level=="info"){
							$.pCommon.message(tip.msgContent);
						}else if(tip.level=="error"){
							$.pCommon.alert(tip.msgContent);
						}
					//source.close(); // close connection, otherwisse browser will request the server constantly
				});
				source.addEventListener('open', function(e) {
					console.log("Connection was opened.");
				}, false);
		 
				source.addEventListener('error', function(e) {
					console.log(e);
					
				}, false);

			 } else {
					console.log("服务器不支持EvenSource对象");
			}
	 }

项目 App 端实现消息推送

(1)login.html 中修改

请复制:

localStorage.setItem('server_push', retData[3]);

localStorage.setItem('springbootUrl', retData[4]);

(2)app 中 mainJS.html 中追加 eventSource 的 js 代码,跨域请求地址 springboot 地址 /sseMsg/pushApp/ 用户,实现 app 端消息推送

请复制:

 var user=localStorage.getItem('un');

       var server_push=localStorage.getItem('server_push');

        var springbootUrl=localStorage.getItem('springbootUrl');

        if(server_push=="true"){

          var msgUri= springbootUrl+"sseMsg/pushApp/"+user;

          initSseMsg(msgUri);

        }

function initSseMsg(msgUrl){

      console.log(msgUrl);

    	if (window.EventSource) {

    		console.log("Event source available");

    		var source = new EventSource(msgUrl);

    		source.addEventListener('message', function(e) {

    			console.log(e.data);

          var tip=$.parseJSON(e.data);

            api.alert({

                title: '提示',

                msg: tip.msgContent,

            }, function(ret, err) {

            });

    		});

    		source.addEventListener('open', function(e) {

    			console.log("Connection was opened.");

    		}, false);

    		source.addEventListener('error', function(e) {

    			console.log(e);

    		}, false);

    	 } else {

    			console.log("服务器不支持EvenSource对象");

    	}

    }

(3)所有 app 端业务功能界面,针对 MestarRuleException 不提示

锡柴示例:锡柴所有的 ajax 请求封装在 http.js 中,只需更改针对 errcode=2 不处理,其他项目按照项目自己封装处理

Spingboot 中核心代码

(1)application.properties 配置检测对应的端口配置以及 redis 缓存地址

(2)针对 web 端请求

event-source 必须编码成 utf-8 的格式,消息的每个字段使用 "\n" 来做分割,并且需要下面 4 个规范定义好的字段:

  1. Event: 事件类型
  2. Data: 发送的数据
  3. ID: 每一条事件流的 ID
  4. Retry: 告知浏览器在所有的连接丢失之后重新开启新的连接等待的时间,在自动重新连接的过程中,之前收到的最后一个事件流 ID 会被发送到服务端。

EventSource 接收数据的固定格式:retry:${毫秒数}\ndata:${返回数据}\n\n,retry 是指每隔多久请求一次服务器,data 是指要接收的数据。注意这个 retry 参数不是必须的,如果不填写,对应的浏览器会有一个默认间隔时间,谷歌默认是 3000 毫秒,也就是 3 秒钟。

只有”:“时,这就会被服务端理解为”注释“,并不会被发送至浏览器端

注释行可以用来防止连接超时, 服务器可以定期发送一条消息注释行, 以保持连接不断。

(3)针对 app 端请求

(4)使用 bat 方式启动 springboot 服务,注意 springboot 启动必须使用 jdk1.8 环境

效果展示

WEB 端

APP 端

redis 缓存查看

redis 存储未推送的消息,以及存储一周类已推送的消息历史