====== 快速使用MQTT Java版 SDK实现消息收发 ====== 本文介绍快速使用 MQTT Java版SDK 实现MQTT客户端消息的自收自发。 ===== 1. 前提条件 ===== ==== 1.1 部署Java开发环境 ==== * 安装IDE。您可以使用[[https://www.jetbrains.com/idea/download/#section=mac|IntelliJ IDEA]]或者[[https://www.eclipse.org/downloads/|Eclipse]],本文以IntelliJ IDEA为例。 * 下载安装[[https://www.oracle.com/java/technologies/javase-downloads.html|JDK]]。 ==== 1.2 导入项目依赖 ==== 在IntelliJ IDEA中创建工程,并确认pom.xml中包含以下依赖。\\ commons-codec commons-codec 1.10 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.2 org.apache.httpcomponents httpclient 4.5.2 com.alibaba fastjson 1.2.76 ===== 2. 实现流程 ===== ==== 2.1 获取初始化信息 ==== 登录console后台 \\ 1.点击菜单栏【MQTT】->【服务概览】->【服务配置】,获取「连接地址」、「连接端口」、「AppID」以「及REST API地址」等信息。 \\ 注:clientID由两部分组成,组织形式为“deviceID@AppID”,deviceID由用户自定义,AppID见【服务配置】。 \\ 示例:正确的clientID格式为:“device001@aitbj0”; \\ 2.点击菜单栏【应用概览】->【应用详情】->【开发者ID】,获取「Client ID」与「ClientSecret」信息。 \\ 3.初始化代码 String endpoint = 'xxx.xxx.xxx.xxx' //环信MQTT服务器地址 通过console后台[MQTT]->[服务概览]->[服务配置]下[连接地址]获取 String port=‘xx’ // 协议服务端口 通过console后台[MQTT]->[服务概览]->[服务配置]下[连接端口]获取 String appID='xx' // appID 通过console后台[MQTT]->[服务概览]->[服务配置]下[AppID]获取 String deviceId = 'deviceId' // 自定义deviceID String restapi = 'xxx.xxx.xxx.xxx' //环信MQTT REST API地址 通过console后台[MQTT]->[服务概览]->[服务配置]下[REST API地址]获取 String appClientId = 'xxx' //开发者ID 通过console后台[应用概览]->[应用详情]->[开发者ID]下[ Client ID]获取 String appClientSecret = 'xxx' // 开发者密钥 通过console后台[应用概览]->[应用详情]->[开发者ID]下[ ClientSecret]获取 String clientID = deviceId + '@' + appID String username = 'test' //自定义用户名 长度不超过64位即可 {{:playground:message:setting.jpg|}} {{:playground:message:develop.jpg|}} ==== 2.2 获取token ==== // 获取token的URL //https://{restapi}/openapi/rm/app/token // 获取token String token = ""; // 取token try (final CloseableHttpClient httpClient = HttpClients.createDefault()) { final HttpPost httpPost = new HttpPost("https://{restapi}/openapi/rm/app/token"); Map params = new HashMap<>(); params.put("appClientId", appClientId); params.put("appClientSecret", appClientSecret); //设置请求体参数 StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8")); entity.setContentEncoding("utf-8"); httpPost.setEntity(entity); //设置请求头部 httpPost.setHeader("Content-Type", "application/json"); //执行请求,返回请求响应 try (final CloseableHttpResponse response = httpClient.execute(httpPost)) { //请求返回状态码 int statusCode = response.getStatusLine().getStatusCode(); //请求成功 if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT) { //取出响应体 final HttpEntity entity2 = response.getEntity(); //从响应体中解析出token String responseBody = EntityUtils.toString(entity2, "utf-8"); JSONObject jsonObject = JSONObject.parseObject(responseBody); token = jsonObject.getJSONObject("body").getString("access_token"); } else { //请求失败 throw new ClientProtocolException("请求失败,响应码为:" + statusCode); } } } catch (IOException e) { e.printStackTrace(); } String mqtt_token = ""; // 取token try (final CloseableHttpClient httpClient = HttpClients.createDefault()) { final HttpPost httpPost = new HttpPost("https://{restapi}/openapi/rm/user/token"); Map params = new HashMap<>(); params.put("username", username); //username params.put("cid", clientID); //clientID //设置请求体参数 StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8")); entity.setContentEncoding("utf-8"); httpPost.setEntity(entity); //设置请求头部 httpPost.setHeader("Content-Type", "application/json"); httpPost.setHeader("Authorization", token); //请求响应 try (final CloseableHttpResponse response = httpClient.execute(httpPost)) { //请求返回状态码 int statusCode = response.getStatusLine().getStatusCode(); //请求成功 if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT) { //取出响应体 final HttpEntity entity2 = response.getEntity(); //从响应体中解析出token String responseBody = EntityUtils.toString(entity2, "utf-8"); JSONObject jsonObject = JSONObject.parseObject(responseBody); mqtt_token = jsonObject.getJSONObject("body").getString("access_token"); } else { //请求失败 throw new ClientProtocolException("请求失败,响应码为:" + statusCode); } } //执行请求,返回请求响应 } catch (IOException e) { e.printStackTrace(); } ==== 2.3 连接服务器 ==== 配置连接密码、cleansession标志、心跳间隔、超时时间等信息,调用connect()函数连接至MQTT消息云。 MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(mqtt_token.toCharArray()); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setKeepAliveInterval(45); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1); mqttConnectOptions.setConnectionTimeout(5000); mqttClient.connect(mqttConnectOptions); //暂停1秒钟,等待连接订阅完成 Thread.sleep(1000); ==== 2.4 订阅(subscribe) ==== === 2.4.1 订阅主题 === 当客户端成功连接环信MQTT消息云后,需尽快向服务器发送订阅主题消息。 mqttClient.setCallback(new MqttCallbackExtended() { /** * 连接完成回调方法 * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { /** * 客户端连接成功后就需要尽快订阅需要的Topic。 */ System.out.println("connect success"); executorService.submit(() -> { try { final String[] topicFilter = {myTopic}; final int[] qos = {qosLevel}; mqttClient.subscribe(topicFilter, qos); } catch (Exception e) { e.printStackTrace(); } }); } }); === 2.4.2 取消订阅 === mqttClient.unsubscribe(new String[]{myTopic}); === 2.4.3 接收消息 === 配置接收消息回调方法,从MQTT消息云接收订阅消息。 mqttClient.setCallback(new MqttCallbackExtended() { /** * 接收消息回调方法 * @param s * @param mqttMessage */ @Override public void messageArrived(String s, MqttMessage mqttMessage) { System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload())); } }); ==== 2.5 发送(publish) ==== 配置发送消息回调方法,向指定topic发送消息。 for (int i = 0; i < 10; i++) { /** * 构建一个Mqtt消息 */ MqttMessage message = new MqttMessage("hello world pub sub msg".getBytes()); //设置传输质量 message.setQos(qosLevel); /** * 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。 */ mqttClient.publish(myTopic, message); } ==== 2.6 结果验证 ==== connect success send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic send msg succeed topic is : myTopic receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg receive msg from topic myTopic , body is hello world pub sub msg ===== 3. 更多信息 ===== * 完整demo示例,请参见[[https://github.com/caterpillar525/mqtt-demo.git|demo]]下载。 * 目前MQTT客户端支持多种语言,请参见 [[http://docs-im.easemob.com/playground/message/sdkdownload|SDK下载]]。 * 如果您在使用MQTT服务中,有任何疑问和建议,欢迎您[[http://docs-im.easemob.com/playground/message/msgcontact|联系我们]]。