mmq使用文档

1. 简介

​ mmq主要实现了事件发布订阅以及环形队列的部分功能,发布组件通过http, grpc, tcp, websocket通道发布事件到mmq,订阅组件通过grpc, tcp, websocket通道从mmq订阅事件。

2. Event

2.1 发布事件

通过http的方式, 发布事件到mmq,主要格式如下:

// 通过http://host:port/api接口,然后才能发布事件
{
    "action": "pub_event",
    "event": {
        "event": "statistics_realtime_result",
        "block": "statistics",
        "uniqueid": "PUB1395846800005",
        "datetime": "2022-03-31 16:12:00",
        "uid": "STA105313",
        "object_id": "OBJ1781373100297",
        "type": "statistics",
        "data": 17    
    }
}

通过grpc流的方式, 发布事件到mmq,主要格式如下:

{
    "event": "statistics_realtime_result",
    "block": "statistics",
    "uniqueid": "PUB1395846800005",
    "datetime": "2022-03-31 16:12:00",
    "uid": "STA105313",
    "object_id": "OBJ1781373100297",
    "type": "statistics",
    "data": 17
}

通过tcp流的方式, 发布事件到mmq,主要格式如下:

{
    "action": "pub_event",
    "event": {
        "event": "statistics_realtime_result",
        "block": "statistics",
        "uniqueid": "PUB1395846800005",
        "datetime": "2022-03-31 16:12:00",
        "uid": "STA105313",
        "object_id": "OBJ1781373100297",
        "type": "statistics",
        "data": 17    
    }
}

通过websocket流的方式, 发布事件到mmq,主要格式如下:

// 通过ws://host:port/api建立websocket连接,然后才能发布事件
{
    "action": "pub_event",
    "event": {
        "event": "statistics_realtime_result",
        "block": "statistics",
        "uniqueid": "PUB1395846800005",
        "datetime": "2022-03-31 16:12:00",
        "uid": "STA105313",
        "object_id": "OBJ1781373100297",
        "type": "statistics",
        "data": 17    
    }
}

2.2 订阅事件

通过grpc长连接方式, 从mmq订阅事件,主要格式如下:

{
    "action":"sub_event",
    "event": {
        "event":"statistics_realtime_result",
        "block":"statistics",
        "uid":"STA105313"
    }
}

通过tcp长连接方式, 从mmq订阅事件,主要格式如下:

{
    "action":"sub_event",
    "event": {
        "event":"statistics_realtime_result",
        "block":"statistics",
        "uid":"STA105313"
    }
}

通过websocket长连接方式, 从mmq订阅事件,主要格式如下:

// 通过ws://host:port/api建立websocket连接,然后才能订阅事件
{
    "action":"sub_event",
    "event": {
        "event":"statistics_realtime_result",
        "block":"statistics",
        "uid":"STA105313"
    }
}

2.3 取消订阅

取消事件订阅,通过grpc长连接的方式从mmq取消事件订阅,主要格式如下:

{
    "action":"unsub_event",
    "event": {
        "uid":"5a212cbc-c58f-42c9-9509-bd114026388d"
    }
}

取消事件订阅,通过tcp长连接的方式从mmq取消事件订阅,主要格式如下:

{
    "action":"unsub_event",
    "event": {
        "uid":"5a212cbc-c58f-42c9-9509-bd114026388d"
    }
}

取消事件订阅,通过websocket长连接的方式从mmq取消事件订阅,主要格式如下:

{
    "action":"unsub_event",
    "event": {
        "uid":"5a212cbc-c58f-42c9-9509-bd114026388d"
    }
}

3. Action

3.1 事件

3.1.1 发布事件

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
event object 事件

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息

result:

参数名称 数据类型 描述
uid string 返回终端配置唯一编号

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "pub_event",
    "event": {
        "event": "statistics_realtime_result",
        "block": "statistics",
        "uniqueid": "PUB1395846800005",
        "datetime": "2022-03-31 16:12:00",
        "uid": "STA105313",
        "object_id": "OBJ1781373100297",
        "type": "statistics",
        "data": 17
    }
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "pub_event",
    "code": 200,
    "msg": "成功"
}

3.1.2 发布用户事件

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
user_event object 事件
其他参数

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息

result:

参数名称 数据类型 描述
uid string 返回终端配置唯一编号

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "pub_user_event",
    "block": "mapping",
    "uniqueid": "PUB2530595500002",
    "datetime": "2022-05-13 09:50:05",
    "user_event": {
      "user_event": "update_control",
      "id": 80,
      "uid": "CTL2414319900001",
      "name": "cc1",
      "description": "",
      "script": [],
      "template": [],
      "reference": "",
      "created": "0001-01-01 00:00:00",
      "created_by": 1,
      "last_modified": "2022-05-13 09:50:05",
      "last_modified_by": 1,
      "is_available": 1,
      "object_list": null
    }
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "pub_user_event",
    "code": 200,
    "msg": "成功"
}

3.1.3 取消订阅

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
uid string 唯一编号

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息

result:

参数名称 数据类型 描述
uid string 返回终端配置唯一编号

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "unsub_event_register_by_uid",
    "uid": "132ed645-6a60-4c63-892f-a15e5c2fef4e"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "unsub_event_register_by_uid",
    "code": 200,
    "msg": "成功"
}

3.1.4 获取事件注册详情(通过事件名)

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
event string 事件名称

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_event_register_item",
    "event": "statistics_realtime_result"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_event_register_item",
    "code": 200,
    "msg": "成功",
    "result": [
        {
            "event": {
                "block": "statistics",
                "event": "statistics_realtime_result",
                "uid": "STA105313"
            },
            "address": "192.168.88.202:52532",
            "uid": "1e3f2807-5cb8-467e-9064-d79ea7e48889"
        },
        {
            "event": {
                "block": "statistics",
                "event": "statistics_realtime_result",
                "uid": "STA105313"
            },
            "address": "192.168.88.202:52530",
            "uid": "a58dfba7-fea5-4fd9-9c03-751c6d463eb9"
        },
        {
            "event": {
                "block": "statistics",
                "event": "statistics_realtime_result",
                "uid": "STA105313"
            },
            "address": "192.168.88.202:52534",
            "uid": "90c374f3-622b-4f28-bbf2-cf90e82956c7"
        }
    ]
}

3.1.5 获取事件注册详情(通过客户端地址)

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
addr string 客户端地址

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_event_register_item_by_addr",
    "addr": "192.168.88.202:52530"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_event_register_item_by_addr",
    "code": 200,
    "msg": "成功",
    "result": [
        {
            "event": {
                "block": "statistics",
                "event": "statistics_realtime_result",
                "uid": "STA105313"
            },
            "address": "192.168.88.202:52530",
            "uid": "a58dfba7-fea5-4fd9-9c03-751c6d463eb9"
        }
    ]
}

3.1.6 获取事件注册详情(通过ip地址)

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
ip string 客户端ip地址

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_event_register_item_by_ip",
    "ip": "192.168.88.202"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_event_register_item_by_ip",
    "code": 200,
    "msg": "成功",
    "result": [
        {
            "event": {
                "block": "statistics",
                "event": "statistics_realtime_result",
                "uid": "STA105313"
            },
            "address": "192.168.88.202:52532",
            "uid": "1e3f2807-5cb8-467e-9064-d79ea7e48889"
        },
        {
            "event": {
                "block": "statistics",
                "event": "statistics_realtime_result",
                "uid": "STA105313"
            },
            "address": "192.168.88.202:52530",
            "uid": "a58dfba7-fea5-4fd9-9c03-751c6d463eb9"
        },
        {
            "event": {
                "block": "statistics",
                "event": "statistics_realtime_result",
                "uid": "STA105313"
            },
            "address": "192.168.88.202:52534",
            "uid": "90c374f3-622b-4f28-bbf2-cf90e82956c7"
        }
    ]
}

3.1.7 获取事件注册详情(通过订阅号)

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
uid string 唯一编号

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_event_register_item_by_uid",
    "uid": "a58dfba7-fea5-4fd9-9c03-751c6d463eb9"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_event_register_item_by_uid",
    "code": 200,
    "msg": "成功",
    "result": {
        "event": {
            "block": "statistics",
            "event": "statistics_realtime_result",
            "uid": "STA105313"
        },
        "address": "192.168.88.202:52530",
        "uid": "a58dfba7-fea5-4fd9-9c03-751c6d463eb9"
    }
}

3.1.8 获取事件注册列表

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_event_register_list"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_event_register_list",
    "code": 200,
    "msg": "成功",
    "result": {
        "statistics_realtime_result": [
            {
                "event": {
                    "block": "statistics",
                    "event": "statistics_realtime_result",
                    "uid": "STA105313"
                },
                "address": "192.168.88.202:52532",
                "uid": "1e3f2807-5cb8-467e-9064-d79ea7e48889"
            },
            {
                "event": {
                    "block": "statistics",
                    "event": "statistics_realtime_result",
                    "uid": "STA105313"
                },
                "address": "192.168.88.202:52530",
                "uid": "a58dfba7-fea5-4fd9-9c03-751c6d463eb9"
            },
            {
                "event": {
                    "block": "statistics",
                    "event": "statistics_realtime_result",
                    "uid": "STA105313"
                },
                "address": "192.168.88.202:52534",
                "uid": "90c374f3-622b-4f28-bbf2-cf90e82956c7"
            }
        ]
    }
}

3.1.9 定时事件

mmq服务自身会发布定时事件,即每分钟,每5分钟,每30分钟,每小时都会发布一个事件。Mixiot系统的其他服务可以通过订阅定时事件执行定时任务。

事件名称 描述
one_minute_event 每分钟事件
five_minute_event 每5分钟事件
thirty_minute_event 每30分钟事件
one_hour_event 每小时事件
  • 示例
{
    "action": "pub_event",
    "event": {
        "event": "timer_event",
        "block": "agent",
        "type": "one_minute_event",
        "uniqueid": "PUB1344800006032",
        "datetime": "2021-04-21 18:08:08"
    }
}

3.2 队列

3.2.1 创建队列

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名
size int 队列大小

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "create_queue",
    "name": "my_queue",
    "size": 100
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "create_queue",
    "code": 200,
    "msg": "成功"
}

3.2.2 删除队列

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "delete_queue",
    "name": "my_queue"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "create_queue",
    "code": 200,
    "msg": "成功"
}

3.2.3 推入队列

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名
value object

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "push_queue",
    "name": "my_queue",
    "value": {
        "key": "value",
        "k1": 2
    }
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "create_queue",
    "code": 200,
    "msg": "成功"
}

3.2.4 弹出队列

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名
size int 数量

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "pop_queue",
    "name": "my_queue",
    "size": 1
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "pop_queue",
    "code": 200,
    "msg": "成功",
    "result": {
        "values": [
            "{\"key\":\"value\",\"k1\":2}"
        ]
    }
}

3.2.5 弹出队列并删除

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名
size int 数量

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "pop_and_del_queue",
    "name": "my_queue",
    "size": 2
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "pop_and_del_queue",
    "code": 200,
    "msg": "成功",
    "result": {
        "values": [
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}"
        ]
    }
}

3.2.6 获取队列当前长度

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_size_queue",
    "name": "my_queue"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_size_queue",
    "code": 200,
    "msg": "成功",
    "result": {
        "size": 8
    }
}

3.2.7 获取队列最大长度

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_max_size_queue",
    "name": "my_queue"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_max_size_queue",
    "code": 200,
    "msg": "成功",
    "result": {
        "size": 10
    }
}

3.2.8 获取队列名列表

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_queue_name_list"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_queue_name_list",
    "code": 200,
    "msg": "成功",
    "result": {
        "names": [
            "my_queue"
        ]
    }
}

3.2.9 获取队列元素列表

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_queue_value_list",
    "name": "my_queue"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_queue_value_list",
    "code": 200,
    "msg": "成功",
    "result": {
        "values": [
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"k1\":2,\"key\":\"value\"}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}"
        ]
    }
}

3.2.10 获取队列列表

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_queue_list"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_queue_list",
    "code": 200,
    "msg": "成功",
    "result": {
        "my_queue": [
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"k1\":2,\"key\":\"value\"}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            "{\"key\":\"value\",\"k1\":2}",
            null,
            null
        ]
    }
}

3.2.11 获取队列信息列表

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_queue_info_list"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_queue_info_list",
    "code": 200,
    "msg": "成功",
    "result": {
        "my_queue": {
            "max_size": 10,
            "size": 8
        }
    }
}

3.2.12 获取队列信息详情

请求参数:

参数名称 是否必选 数据类型 描述
action string 指令
name string 队列名称

响应参数:

参数名称 数据类型 描述
resp string 返回指令
code int 返回码
msg string 提示消息
result object 返回结果

请求示例:

curl -H "Content-Type: application/json" -X POST -d '{
    "action": "get_queue_info_item",
    "name": "my_queue"
}' http://192.168.88.202:8791/api

响应示例:

{
    "resp": "get_queue_info_item",
    "code": 200,
    "msg": "成功",
    "result": {
        "max_size": 10,
        "size": 8
    }
}

4. 修订记录

  • 2022-04-27
    • 初始化提交,新增发布事件,订阅事件等
    • 添加事件相关接口,以及事件匹配机制
  • 2022-05-13
    • mmq添加队列及接口,并修复事件部分逻辑
    • 调整事件匹配机制,队列加锁
文档更新时间: 2024-05-27 10:15   作者:技术支持