Processing commands that interact with sensors in Java
Now that you have understood how to work with the Paho Java Client, its non-blocking API, and its callbacks, we can develop the Java application that will process the commands that interact with sensors, based on the previously explained requirements.
We can start another Maven project in our favorite IDE and make the same edits to the pom.xml
file that we did in our previous example to use the Paho Java Client with the Bouncy Castle libraries. In addition, we must include the previously coded SecurityHelper
class. The code file for the sample is included in the mqtt_essentials_gaston_hillar_04
folder, in the Java02
folder.
Now, we will create a new class named SensorsManager
that implements the previously analyzed MqttCallback
interface, specifically the org.eclipse.paho.client.mqttv3.MqttCallback
interface. The class will process the messages that the MQTT client receives, provide many methods that will process the commands included in these messages, and simulate the retrieval of values from two sensors.
The SensorsManager
class will provide the following methods:
publishMessage
: This method publishes the payload specified as a string to the indicated topic with a QoS level and a retained flag value. The method uses an UTF-8 encoding to transform the string into a bytes array, uses the asynchronous client to publish the message, and passes the receivedIMqttActionListener
instance as the callback for events related to the publication of the message. We will use this method to make it easier to publish messages to different topics with diverse QoS levels.publishProcessedCommandMessage
: This method publishes a message to the appropriate topic, indicating that a command has been successfully processed. The method ends up calling the previously explainedpublishMessage
method to publish the message.messageArrived
: When a message has arrived from the MQTT server, this method will be executed. This method processes theTURN ON
andTURN OFF
commands, and based on these commands and the topics in which the commands were received, the code changes the value of the flags that determine whether the sensors must publish data every 1 second or not.
loop
: This method checks the Boolean values for the flags that determine whether the sensors must publish data every 1 second or not. In order to keep the example simple, the code generates pseudo-random values to simulate that it is retrieving data from the sensors and, based on the values of the flags, publishes messages with the data for the sensors in the different topics that we described when we analyzed the requirements for the application.
We will split the code for the SensorsManager
class into many code snippets to make it easier to understand each code section. The following lines declare all the necessary imports. The code file for the sample is included in the mqtt_essentials_gaston_hillar_04
folder, in the Java02/src/SensorsManager.java
file:
package com.packt.mqttessentials.Sensors02; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.io.UnsupportedEncodingException; import java.util.concurrent.ThreadLocalRandom;
The following lines start the declaration of the SensorsManager
class. The code declares the many string constants, private fields, and the two volatile Boolean flags that indicate whether the sunlight sensor is turned on and whether the earth humidity sensor is turned on: isSunlightSensorTurnedOn
and isEarthHumiditySensorTurnedOn
. The code also declares the constructor for this class. The code file for the sample is included in the mqtt_essentials_gaston_hillar_04
folder, in the Java01/src/SensorsManager.java
file:
public class SensorsManager implements MqttCallback { private static final String SENSOR_EARTH_HUMIDITY = "earthhumidity"; private static final String SENSOR_SUNLIGHT = "sunlight"; private static final String TOPIC_SEPARATOR = "/"; private final String boardCommandsTopic; private final String boardDataBaseTopic; private final String encoding; private final MqttAsyncClient asyncClient; private final String earthHumidityTopic; private final String visibleLightTopic; private final String infraredLightTopic; private final String ultraVioletIndexTopic; private volatile boolean isSunlightSensorTurnedOn = false; private volatile boolean isEarthHumiditySensorTurnedOn = false; public SensorsManager(final MqttAsyncClient asyncClient, final String boardCommandsTopic, final String boardDataBaseTopic, final String encoding) { this.boardCommandsTopic = boardCommandsTopic; this.boardDataBaseTopic = boardDataBaseTopic; this.encoding = encoding; this.asyncClient = asyncClient; // Build and save the topic names that we will use to publish the data from the sensors this.earthHumidityTopic = this.boardDataBaseTopic.concat(SENSOR_EARTH_HUMIDITY); final String sunlightDataBaseTopic = boardDataBaseTopic.concat(SENSOR_SUNLIGHT); this.visibleLightTopic = String.join(TOPIC_SEPARATOR, sunlightDataBaseTopic, "visiblelight"); this.infraredLightTopic = String.join(TOPIC_SEPARATOR, sunlightDataBaseTopic, "ir"); this.ultraVioletIndexTopic = String.join(TOPIC_SEPARATOR, sunlightDataBaseTopic, "uv"); }
The constructor receives the asynchronous client (asyncClient
), the topic that we will use for the commands (boardCommandsTopic
), the prefix for the topic that we will use to publish the data for the sensors (boardDataBaseTopic
), and the encoding we are using for strings in the payload (encoding
). The constructor saves all these received arguments in fields with the same names. Then, the constructor builds and saves the topic names that we will use to publish the data from the sensors.
The following lines declare the publishMessage
and publishProcessedCommandMessage
methods within the SensorsManager
class. The code file for the sample is included in the mqtt_essentials_gaston_hillar_04
folder, in the Java02/src/SensorsManager.java
file:
public IMqttDeliveryToken publishMessage(final String topic, final String textForMessage, IMqttActionListener actionListener, final int qos, final boolean retained) { byte[] bytesForPayload; try { bytesForPayload = textForMessage.getBytes(this.encoding); return asyncClient.publish(topic, bytesForPayload, qos, retained, null, actionListener); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } catch (MqttException e) { e.printStackTrace(); return null; } } public void publishProcessedCommandMessage(final String sensorName, final String command) { final String topic = String.format("%s/%s", boardCommandsTopic, sensorName); final String textForMessage = String.format( "%s successfully processed command: %s", sensorName, command); publishMessage(topic, textForMessage, null, 0, false); }
The following lines declare the three methods required by the MqttCallback
interface within the SensorsManager
class: connectionLost
, deliveryComplete
and messageArrived
. The code file for the sample is included in the mqtt_essentials_gaston_hillar_04
folder, in the Java02/src/SensorsManager.java
file:
@Override public void connectionLost(Throwable cause) { cause.printStackTrace(); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String messageText = new String(message.getPayload(), encoding); System.out.println( String.format("Topic: %s. Payload: %s", topic, messageText)); // A message has arrived from the MQTT broker // The MQTT broker doesn't send back // an acknowledgment to the server until // this method returns cleanly if (!topic.startsWith(boardCommandsTopic)) { // The topic for the arrived message doesn't start with boardTopic return; } final boolean isTurnOnMessage = messageText.equals("TURN ON"); final boolean isTurnOffMessage = messageText.equals("TURN OFF"); boolean isInvalidCommand = false; boolean isInvalidTopic = false; // Extract the sensor name from the topic String sensorName = topic.replaceFirst(boardCommandsTopic, "").replaceFirst(TOPIC_SEPARATOR, ""); switch (sensorName) { case SENSOR_SUNLIGHT: if (isTurnOnMessage) { isSunlightSensorTurnedOn = true; } else if (isTurnOffMessage) { isSunlightSensorTurnedOn = false; } else { isInvalidCommand = true; } break; case SENSOR_EARTH_HUMIDITY: if (isTurnOnMessage) { isEarthHumiditySensorTurnedOn = true; } else if (isTurnOffMessage) { isEarthHumiditySensorTurnedOn = false; } else { isInvalidCommand = true; } break; default: isInvalidTopic = true; } if (!isInvalidCommand && !isInvalidTopic) { publishProcessedCommandMessage(sensorName, messageText); } }
The following lines declare the loop
method and finish the declaration of the SensorsManager
class. The code file for the sample is included in the mqtt_essentials_gaston_hillar_04
folder, in the Java02/src/SensorsManager.java
file:
public void loop() { if (isEarthHumiditySensorTurnedOn) { // Retrieve the humidity level from the sensor // In this case, we just generate a random number final int humidityLevel = ThreadLocalRandom.current().nextInt(1, 101); // Publish the message to the appropriate topic publishMessage(earthHumidityTopic, String.format("%d %%", humidityLevel), null, 0, false); } if (isSunlightSensorTurnedOn) { // Retrieve the visible light level from the sensor // In this case, we just generate a random number final int visibleLight = ThreadLocalRandom.current().nextInt(201, 301); // Publish the message to the appropriate topic publishMessage(visibleLightTopic, String.format("%d lm", visibleLight), null, 0, false); // Retrieve the infrared light level from the sensor // In this case, we just generate a random number final int infraredLight = ThreadLocalRandom.current().nextInt(251, 281); // Publish the message to the appropriate topic publishMessage(infraredLightTopic, String.format("%d lm", infraredLight), null, 0, false); // Retrieve the ultraviolet (UV) index from the sensor // In this case, we just generate a random number final int ultraVioletIndex = ThreadLocalRandom.current().nextInt(0, 16); // Publish the message to the appropriate topic publishMessage(ultraVioletIndexTopic, String.format("%d UV Index", ultraVioletIndex), null, 0, false); } } }