快速使用MQTT Java版 SDK实现消息收发
本文介绍快速使用 MQTT Java版SDK 实现MQTT客户端消息的自收自发。
1. 前提条件
1.1 部署Java开发环境
- 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
- 下载安装JDK。
1.2 导入项目依赖
在IntelliJ IDEA中创建工程,并确认pom.xml中包含以下依赖。
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
2. 实现流程
2.1 获取初始化信息
登录console后台
1.点击菜单栏【MQTT】→【服务概览】→【服务配置】,获取「连接地址」、「连接端口」、「AppID」以「及REST API地址」等信息。
注:clientID由两部分组成,组织形式为“deviceID@AppID”,deviceID由用户自定义,AppID见【服务配置】。
示例:正确的clientID格式为:“device001@aitbj0”;
2.点击菜单栏【应用概览】→【应用详情】→【开发者ID】,获取「Client ID」与「ClientSecret」信息。
3.初始化代码
String endpoint = 'xxx.xxx.xxx.xxx' //环信MQTT服务器地址 通过console后台[MQTT]->[服务概览]->[服务配置]下[连接地址]获取
String port=‘xx’ // 协议服务端口 通过console后台[MQTT]->[服务概览]->[服务配置]下[连接端口]获取
String appID='xx' // appID 通过console后台[MQTT]->[服务概览]->[服务配置]下[AppID]获取
String deviceId = 'deviceId' // 自定义deviceID
String restapi = 'xxx.xxx.xxx.xxx' //环信MQTT REST API地址 通过console后台[MQTT]->[服务概览]->[服务配置]下[REST API地址]获取
String appClientId = 'xxx' //开发者ID 通过console后台[应用概览]->[应用详情]->[开发者ID]下[ Client ID]获取
String appClientSecret = 'xxx' // 开发者密钥 通过console后台[应用概览]->[应用详情]->[开发者ID]下[ ClientSecret]获取
String clientID = deviceId + '@' + appID
String username = 'test' //自定义用户名 长度不超过64位即可
2.2 获取token
// 获取token的URL
//https://{restapi}/openapi/rm/app/token
// 获取token
String token = "";
// 取token
try (final CloseableHttpClient httpClient = HttpClients.createDefault()) {
final HttpPost httpPost = new HttpPost("https://{restapi}/openapi/rm/app/token");
Map<String, String> params = new HashMap<>();
params.put("appClientId", appClientId);
params.put("appClientSecret", appClientSecret);
//设置请求体参数
StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8"));
entity.setContentEncoding("utf-8");
httpPost.setEntity(entity);
//设置请求头部
httpPost.setHeader("Content-Type", "application/json");
//执行请求,返回请求响应
try (final CloseableHttpResponse response = httpClient.execute(httpPost)) {
//请求返回状态码
int statusCode = response.getStatusLine().getStatusCode();
//请求成功
if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT) {
//取出响应体
final HttpEntity entity2 = response.getEntity();
//从响应体中解析出token
String responseBody = EntityUtils.toString(entity2, "utf-8");
JSONObject jsonObject = JSONObject.parseObject(responseBody);
token = jsonObject.getJSONObject("body").getString("access_token");
} else {
//请求失败
throw new ClientProtocolException("请求失败,响应码为:" + statusCode);
}
}
} catch (IOException e) {
e.printStackTrace();
}
String mqtt_token = "";
// 取token
try (final CloseableHttpClient httpClient = HttpClients.createDefault()) {
final HttpPost httpPost = new HttpPost("https://{restapi}/openapi/rm/user/token");
Map<String, String> params = new HashMap<>();
params.put("username", username); //username
params.put("cid", clientID); //clientID
//设置请求体参数
StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8"));
entity.setContentEncoding("utf-8");
httpPost.setEntity(entity);
//设置请求头部
httpPost.setHeader("Content-Type", "application/json");
httpPost.setHeader("Authorization", token);
//请求响应
try (final CloseableHttpResponse response = httpClient.execute(httpPost)) {
//请求返回状态码
int statusCode = response.getStatusLine().getStatusCode();
//请求成功
if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT) {
//取出响应体
final HttpEntity entity2 = response.getEntity();
//从响应体中解析出token
String responseBody = EntityUtils.toString(entity2, "utf-8");
JSONObject jsonObject = JSONObject.parseObject(responseBody);
mqtt_token = jsonObject.getJSONObject("body").getString("access_token");
} else {
//请求失败
throw new ClientProtocolException("请求失败,响应码为:" + statusCode);
}
}
//执行请求,返回请求响应
} catch (IOException e) {
e.printStackTrace();
}
2.3 连接服务器
配置连接密码、cleansession标志、心跳间隔、超时时间等信息,调用connect()函数连接至MQTT消息云。
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(mqtt_token.toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setKeepAliveInterval(45);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
mqttClient.connect(mqttConnectOptions);
//暂停1秒钟,等待连接订阅完成
Thread.sleep(1000);
2.4 订阅(subscribe)
2.4.1 订阅主题
当客户端成功连接环信MQTT消息云后,需尽快向服务器发送订阅主题消息。
mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 连接完成回调方法
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
/**
* 客户端连接成功后就需要尽快订阅需要的Topic。
*/
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {myTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
e.printStackTrace();
}
});
}
});
2.4.2 取消订阅
mqttClient.unsubscribe(new String[]{myTopic});
2.4.3 接收消息
配置接收消息回调方法,从MQTT消息云接收订阅消息。
mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 接收消息回调方法
* @param s
* @param mqttMessage
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
});
2.5 发送(publish)
配置发送消息回调方法,向指定topic发送消息。
for (int i = 0; i < 10; i++) {
/**
* 构建一个Mqtt消息
*/
MqttMessage message = new MqttMessage("hello world pub sub msg".getBytes());
//设置传输质量
message.setQos(qosLevel);
/**
* 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
*/
mqttClient.publish(myTopic, message);
}
2.6 结果验证
connect success
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg