ProtoMQTT::four()

glue it all together

In the last post I already introduced class RobotCtlr. It holds the defined protocol buffer data structure [1] as a class member as well as a reference to the mqtt client handle.
As a reaction to an incoming message the robot can move to a given position.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RobotCtrl
{
public:
 
 RobotCtrl(std::string robotName);
 ~RobotCtrl();
 
 void setClient(MQTTClient &client);
 
 int move(std::vector<double> position);
 
 int publishMessage();
 
 static int onMessageArrived(void* context, 
                             char* topic,
                             int tlen,
                             MQTTClient_message *msg);
 
 
private:
 RobotMsg mRobotMessage;
 MQTTClient mMQTTClient;
 std::vector<double> mPosition;
};

publish me

To push a message to the connected broker we have to serialize proto message to a byte sequence which is taken over by the MQTTClient_message.
Proto buffers offers a bunch of methods for this task, e.g. SerialzeToArray (line 13,14) or if you prefer to convert to std::string first just use SerializeAsString() (line 16,17). Beforhand we set the current timestamp as described in the previous post [2].

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 
int publishMessage( )
{   
 int rc = 0;
 
 ptime t0 = microsec_clock::local_time();
 std::string  timeString  = to_simple_string(t0);
 
 mRobotMessage.set_timestamp(timeString);
 
 MQTTClient_message pubmsg = MQTTClient_message_initializer;         
 
 mRobotMessage.SerializeToArray(pubmsg.payload,mRobotMessage.ByteSize());
 pubmsg.payloadlen = mRobotMessage.ByteSize();
 
// shown as example to serialize to std::string
 pubmsg.payload = (void*)(mRobotMessage.SerializeAsString()).c_str(); 
 pubmsg.payloadlen =(int)(mRobotMessage.SerializeAsString()).size();
 
 
if(nullptr != mMQTTClient)
{
 MQTTClient_deliveryToken dt =0; 
 std::string topic = "Robo/data";
 rc = MQTTClient_publish(mMQTTClient,
                         topic.c_str(),
                         pubmsg.payloadlen,
                         pubmsg.payload,
                         pubmsg.qos,
                         pubmsg.retained,
                         &dt);
 
}
else
{
    rc = -1;
}
 
return (rc);
}

In this case the mqtt message is send in qos (quality of service leves) zero, which leads to a fire and forget behaviour that means the message won’t be acknowledged by the receiver or stored and redelivered by the client. A good summary of qos can be found at [3].

someone´s calling

The clas RoboCtrl provided a static method onMessageArrived which has to be registered to MQTT subscribe notification see [4].
By casting the context void* parameter to a RobotCtrl object we can effectively use the method of the actual object.
The arrived MQTTClient_message can be parsed directly into a RobotMsg object by using ParseFromArray member function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 
static int onMessageArrived(void* context, char* topic , int tlen, MQTTClient_message *msg)
{
 RobotCtrl *parent = (RobotCtrl*) context;
 
 std::string _topic(topic);
 RobotMsg _msg;
 
 _msg.ParseFromArray(msg->payload,msg->payloadlen);
 
 std::vector<double> p(_msg.position_size(),0);
 
 memcpy(p.data(),
       _msg.mutable_position()->mutable_data(),
       _msg.position_size()*sizeof(double));
 
 
// setter with index and value 
//_msg.set_position(0,double(0.0);
 
// dynamically add element 
//  _msg.add_position(double(0.0));
 
 parent->move(p);
 
 return(1);
}

In this simple demo an arrived message is interpreted as a command to move to the new position in the included message.

accessing repeated fields of protocol buffer

The protocol buffer API provides direct access to the memory of repeated field (dynamic array) . So it is possible to copy all elements to a std::vector. This is done by calling mutable_data() of the repeated field mutable data member (line 13). In a use case where index based access is prefered one can use the set_position() call and/or add_positon() to add elements at the end of the buffer.
The commanded position is now processed by a call to the object move function.

let’s loop it

In our main function the only thing left is to subcribe to a message and then start a little loop to keep the process running, where we decide to send out the actual message every 500ms.

1
2
3
4
5
6
7
8
 
int ec = MQTTClient_subscribe(mqttClient,"Robo/Input",0);
 
for(;;)
{
  Sleep(500);
  robotControler.publishMessage();
}

With this post the small blog series about protocol buffers in combination with mqtt ends. Full source code can be found at [5].

references / further reading

[1] http://techblog.boptics.de/protomqttone/
[2] http://techblog.boptics.de/protomqttthree/
[3] http://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels
[4] http://techblog.boptics.de/protomqttthree/
[5] https://github.com/vlovo/ProtoMQTT/

Leave a Reply