目录

Python服务端发送消息

本文介绍Python服务端通过调用环信MQTT消息云 REST API接口快速实现消息下发,使用时可参阅REST 发送消息接口介绍

1. 前提条件

1.1 获取服务器信息

调用环信MQTT消息云 REST API接口前,需要获取四个配置信息,包括:应用clientID应用clientSecretREST API地址应用ID
1、应用clientID:从环信console【应用概览】→【应用详情】→【开发者ID】下 “client ID”获取;
2、应用clientSecret:从环信console【应用概览】→【应用详情】→【开发者ID】下“clientSecret”获取;
3、RSET API地址:从环信console【MQTT】→【服务概览】→【服务配置】下“REST API地址”获取;
4、应用ID:从环信console【MQTT】→【服务概览】→【服务配置】下“AppId”获取;

2. 实现流程

注:本代码对消息体内容进行GBK转码,可支持语音播报(适用于扬声器播放中文内容),如不需要此场景使用,可根据需求设置转码格式。

import requests
import time
import json
import base64


# 填写服务参数
# 1、app_client_id:应用clientID
# 2、app_client_secret:应用clientSecret
# 3、api_url_base:RSET API地址
# 4、app_id:应用ID
app_client_id = ' XXXXX'

app_client_secret = 'XXXX'

api_url_base = 'XXXXX' 

app_id = 'XXXXXX'


# 播报文字
speak_text = '欢迎使用环信mqtt'


# 获取应用token
api_url_app_token = api_url_base + '/openapi/rm/app/token'
def get_app_token():
    data = {
        'appClientId':app_client_id,
        'appClientSecret':app_client_secret
    }
    
    header = {'Content-Type': 'application/json'}

    re = requests.post(api_url_app_token, headers=header, data=json.dumps(data))
    return (json.loads(re.text)['body']['access_token'])


# 发送mqtt消息
api_url_publish = api_url_base + '/openapi/v1/rm/chat/publish' 
def send_msg(app_token, txt):

    # 智能音箱的 msgid 每次都不一样才会播报声音
    # 这里用毫秒时间戳当作 msgid
    time_millis = int(round(time.time() * 1000))

    dat ={
    'type':'tts_dynamic',
    'msgid': time_millis, 
    'txt':txt , 
    }
    
    json_text = json.dumps(dat, ensure_ascii=False)
    json_h = json_text.encode(encoding="gbk") 
    base64_bytes = base64.b64encode(json_h)
    base64_utf8 = str(base64_bytes,'utf-8')

    #topics,要发送的主题
    #clientid,当前客户端ID,格式为“xxxx@appid”
    data = {
        'topics':['861714050059769'],
        'clientid':'12@ff6sc0',
        'payload':base64_utf8,
        "encoding":'base64',
        'qos':1,
        'retain':0,
        'expire':86400
    }

    header = {
        'Content-Type': 'application/json',
        'Authorization': app_token
    }

    re = requests.post(api_url_publish, headers=header, data=json.dumps(data))
    return (json.loads(re.text))


print('正在获取应用token...')
app_token = get_app_token()
print('获取应用token成功')

print(send_msg(app_token, speak_text))
print('发送消息成功')

三、更多信息