快速使用MQTT Java版 SDK实现消息收发

本文介绍快速使用 MQTT Java版SDK 实现MQTT客户端消息的自收自发。

1.1 部署Java开发环境

1.2 导入项目依赖

在IntelliJ IDEA中创建工程,并确认pom.xml中包含以下依赖。

<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.10</version>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.2</version>
</dependency>
<dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>fastjson</artifactId>
     <version>1.2.76</version>
</dependency>

2.1 获取鉴权信息

为保障客户安全性需求,环信MQTT云服务为客户提供token鉴权认证,获取token流程包含:创建用户ID获取token URL

2.1.1 创建用户ID

首先需要登录环信云console控制台,点击左侧菜单栏【应用概览】→【用户认证】页面,点击【创建IM用户】按钮,增添新的账户信息(包括用户名及密码)。

2.1.2 获取token URL

点击左侧菜单栏【即时通讯】→【服务概览】页面,查看下图中token域名、org_name、app_name。
获取token URL格式为:https:/ /{token域名}/{org_name}/{app_name}/token。

客户端获取token代码示例如下:

public static void main() {
        // 获取token的URL
        http://{token域名}/{org_name}/{app_name}/token
        // 获取token
        String token = "";
        // 取token
        try (final CloseableHttpClient httpClient = HttpClients.createDefault()) {
            final HttpPost httpPost = new HttpPost("http://{token域名}/{org_name}/{app_name}/token");
            Map<String, String> params = new HashMap<>();
            params.put("grant_type", "password");
            params.put("username", "test");
            params.put("password", "test123");
            //设置请求体参数
            StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8"));
            entity.setContentEncoding("utf-8");
            httpPost.setEntity(entity);
            //设置请求头部
            httpPost.setHeader("Content-Type", "application/json");
            //执行请求,返回请求响应
            try (final CloseableHttpResponse response = httpClient.execute(httpPost)) {
                //请求返回状态码
                int statusCode = response.getStatusLine().getStatusCode();
                //请求成功
                if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT) {
                    //取出响应体
                    final HttpEntity entity2 = response.getEntity();
                    //从响应体中解析出token
                    String responseBody = EntityUtils.toString(entity2, "utf-8");
                    JSONObject jsonObject = JSONObject.parseObject(responseBody);
                    token = jsonObject.getString("access_token");
                } else {
                    //请求失败
                    throw new ClientProtocolException("请求失败,响应码为:" + statusCode);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
     }
 
返回结果
 {
    "access_token": "YWMtN8a0oqV3EeuF0AmiqRgEh-grzF8zZk2Wp8GS3pF-orDW_F-gj3kR6os3h_oz3ROQAwMAAAF5BxhGlwBPGgAvTR8vDrdVsDPNZMQj0fFjv7EaohgZhzMHM9ncVLE30g",
    "expires_in": 5184000,
    "user": {
        "uuid": "d6fc5fa0-8f79-11ea-8b37-87fa33dd1390",
        "type": "user",
        "created": 1588756404898,
        "modified": 1588756404898,
        "username": "test",
        "activated": true
    }
 }
 access_token即为要获取的token

2.2 获取连接信息

登录console后台,点击左侧菜单栏【MQTT】→【服务概览】,在下图红色方框内获取连接地址、端口以及AppID等信息。
注:clientID由两部分组成,组织形式为“deviceID@AppID”,deviceID由用户自定义,AppID见【服务配置】。
示例:正确的clientID格式为:“device001@aitbj0”;

2.3 初始化

在IntelliJ IDEA工程中创建MQTT客户端,客户端初始配置包括创建clientID,topic名称,QoS质量,连接地址等信息。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;

public class MqttDemoStarter {
    public static void main(String[] args) throws MqttException, InterruptedException {
        /**
        * 用户指定
        * /
        String deviceId = "xxxxx-xxxx-xxxxx-xxxxx-xxxxx";
        /**
        * 从console控制台获取
        * /
        String appId = "1NQ1E9";
        /**
         * 设置接入点,进入console控制台获取
         */
        String endpoint = "1NQ1E9.sandbox.mqtt.chat";

        /**
         * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致之前的连接断开。
         * clientId由两部分组成,格式为DeviceID@appId,其中DeviceID由业务方自己设置,appId在console控制台创建,clientId总长度不得超过64个字符。
         */
        String clientId = deviceId + "@" + appId;

        /**
         * 需要订阅或发送消息的topic名称
         * 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
         */
        final String myTopic = "myTopic";

        /**
         * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
         */
        final int qosLevel = 0;
        final MemoryPersistence memoryPersistence = new MemoryPersistence();

        /**
         * 客户端协议和端口。客户端使用的协议和端口必须匹配,ws或者wss;如果是mqtt或者mqtts,使用tcp://
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endpoint + ":1883", clientId, memoryPersistence);
        /**
         * 设置客户端发送超时时间,防止无限阻塞。
         */
        mqttClient.setTimeToWait(5000);

        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
}

2.4 连接服务器

配置连接密码、cleansession标志、心跳间隔、超时时间等信息,调用connect()函数连接至MQTT消息云。

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        /**
         * 用户名,在console中注册
         */
        mqttConnectOptions.setUserName("test");
        /**
         * 用户密码为第一步中申请的token
         */
        mqttConnectOptions.setPassword(token.toCharArray());
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
        mqttConnectOptions.setConnectionTimeout(5000);

        mqttClient.connect(mqttConnectOptions);
        //暂停1秒钟,等待连接订阅完成
        Thread.sleep(1000);
        

2.5 订阅(subscribe)

2.5.1 订阅主题

当客户端成功连接环信MQTT消息云后,需尽快向服务器发送订阅主题消息。

mqttClient.setCallback(new MqttCallbackExtended() {
        /**
        * 连接完成回调方法
        * @param b
        * @param s
        */
        @Override
        public void connectComplete(boolean b, String s) {
            /**
            * 客户端连接成功后就需要尽快订阅需要的Topic。
            */
            System.out.println("connect success");
            executorService.submit(() -> {
                try {
                    final String[] topicFilter = {myTopic};
                    final int[] qos = {qosLevel};
                    mqttClient.subscribe(topicFilter, qos);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
});
        

2.5.2 取消订阅

mqttClient.unsubscribe(new String[]{myTopic});

2.5.3 接收消息

配置接收消息回调方法,从MQTT消息云接收订阅消息。

mqttClient.setCallback(new MqttCallbackExtended() {
        /**
        * 接收消息回调方法
        * @param s
        * @param mqttMessage
        */
        @Override
        public void messageArrived(String s, MqttMessage mqttMessage) {
            System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
        }
});
        

2.6 发送(publish)

配置发送消息回调方法,向指定topic发送消息。

for (int i = 0; i < 10; i++) {
        /**
        * 构建一个Mqtt消息
        */
        MqttMessage message = new MqttMessage("hello world pub sub msg".getBytes());
        //设置传输质量
        message.setQos(qosLevel);
        /**
        * 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
        */
        mqttClient.publish(myTopic, message);
}
        

2.7 结果验证

connect success
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
  • 完整demo示例,请参见demo下载。
  • 目前MQTT客户端支持多种语言,请参见 SDK下载
  • 如果您在使用MQTT服务中,有任何疑问和建议,欢迎您联系我们