MQTT C client publish/subscribe con la libreria libmosquitto su Ubuntu 18.04

Negli ultimi anni il protocollo MQTT (Message Queue Telemetry Transport) è diventato uno standard per l’internet of things (IoT), sia per la sua semplicità d’utilizzo sia perché le risorse necessarie ai dispositivi per la comunicazione sono minime. Il protocollo segue il pattern publish-subscrive cioè quando un dispositivo (publish) vuole comunicare con un altro dispositivo (subscribe) non lo fa in modo sincrono come avviene con il protocollo HTTP del web ,cioè request-response, ma tramite un broker o server binario che si occupa di ricevere i messaggi del publisher e renderli disponibili al subscriber. Esistono molti broker MQTT, uno tra i più comuni è mosquitto, creato dall’Eclipse Foundation; in questo articolo creeremo un client in C per pubblicare un messaggio e un client C per la ricezione del messaggio utilizzando la libreria libmoquitto su Ubuntu 18.04.

Prima di tutto dobbiamo installare la libreria libmoquitto:

sudo apt-get update

sudo apt-get install libmosquitto-dev

Apriamo il nostro editor preferito e inseriamo il seguente codice per creare un client publish, cioè un client che posterà un messaggio sul broker mosquitto.

mqtt_publish.c

#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "mosquitto.h"

/*
 *  Indirizzo host del Broker
 */ 
#define mqtt_host "IP_HOST"

/*
 * Porta di connesssione del Broker standard 1883
 */ 
#define mqtt_port 1883

static int run = 1;


/* 
 * signal handler per terminare il programma 
 */
void handle_signal(int s) 
{
	run = 0;
	printf("Signal Handler\n");
}

/* 
 * callback per la notifica della connessione 
 * mosq   - l'istanza mosquitto che effettua la richiamata.
 * obj    - i dati dell'utente forniti in mosquitto_new
 * result - il codice di ritorno della risposta di connessione, uno di:
 *  0 - successo
 *  1 - connessione rifiutata (versione del protocollo inaccettabile)
 *  2 - connessione rifiutata (identificatore rifiutato)
 *  3 - connessione rifiutata (broker non disponibile)
 *  4-255 - riservato per uso futuro
 */
void connect_callback(struct mosquitto *mosq, void *obj, int result) 
{
	printf("callback, connected with return code  rc=%d\n", result);
}

/* 
 * callback per la notifica di messaggio pubblicato
 * mosq     - l'istanza mosquitto che effettua la richiamata.
 * userdata - i dati dell'utente forniti in mosquitto_new
 * mid      - l'ID del messaggio inviato.
 */
void publish_callback(struct mosquitto *mosq, void *userdata, int mid) 
{
	
	printf("message published\n");
	run = 0;
	/* 
	 * Dopo l'invio del messaggio ci disconnettiamo dal server
	 */
	mosquitto_disconnect(mosq); 
}

int main(int argc, char *argv[])
{
	char id[24];
	
	/*
	 * stringa terminata da null dell'argomento su cui pubblicare.
	 */ 
	char* topic = "device/telemetry";
	
	/*
	 * puntatore ai dati da inviare.
	 */ 
	char* payload = "Hello World !";
	
	/*
	 * Struttura istanza del client
	 */
	struct mosquitto *mosq;
	int rc = 0;

	signal(SIGINT, handle_signal);
	signal(SIGTERM, handle_signal);

	/* 
	 * Inizializzazione della libreria. DEVE essere chiamata prima 
	 * di qualsiasi altra funzione
	 */
	mosquitto_lib_init(); 

	memset(id, 0, 24);
	snprintf(id, 23, "%d", getpid());

	/* 
	 * Crea una nuova istanza del client
	 */
	mosq = mosquitto_new(id, true, 0); 
    
    if(mosq){

		/* 
		 * Imposta la callback di connessione. 
		 * Chiamata quando il broker invia il messaggio CONNACK
		 */
		mosquitto_connect_callback_set(mosq, connect_callback); 
		
		/* 
		 * Imposta la callback di pubblicazione. 
		 * Chiamata quando il messaggio è inviato al broker 
		 * con successo.
		 */
		mosquitto_publish_callback_set(mosq, publish_callback);

		/* 
		 * Connessione al broker.
		 */
	    rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 15);

		/*
		 * Pubblicazione del messaggio sul broker
		 * mosq            - l'istanza mosquitto che effettua la richiamata.
		 * NULL            - eventuale id del messaggio. 
		 *                   libmosquitto assegna gli ID dei messaggi in modo
		 *                   che possano essere tracciati con questo
		 *                   parametro.
		 * topic           - stringa terminata da null dell'argomento su
		 *                   cui pubblicare.
		 * strlen(payload) - lunghezza del messaggio da pubblicare
		 * payload         - puntatore ai dati da inviare.
		 * 1               - QoS Quality of service
		 * false           - se impostato su true il messaggio viene 
		 *                  mantenuto dal servizio fino al successivo
		 */ 
		mosquitto_publish(mosq,NULL,topic,strlen(payload),(void*)payload,1,false);	
		
       /* Il blocco di codice successivo
        * si rende necessario per assicurare
        * che il messaggio venga inviato
        */ 
		while(run){
			/*
			 * Il network loop principale per il client. 
			 * Questo deve essere chiamato frequentemente
			 * per mantenere attive le comunicazioni tra il client 
			 * e il broker.
			 * mosq -
			 * -1   - Numero massimo di millisecondi di attesa per 
			 *        l'attività di rete nella chiamata select ()
			 *        prima del timeout. 
			 *        Impostare a 0 per un ritorno immediato. 
			 *        Impostare negativo per utilizzare il valore
			 *        predefinito di 1000 ms.
			 * 1    - Parametro non utilizzato
			 */
			rc = mosquitto_loop(mosq, -1, 1);
			printf("LOOP, rc=%d\n", rc);
			/*
			 * Se rc è diverso da zero significa
			 * che si è verificato un errore quindi
			 * ci riconettiamo al broker dopo un timeout di 5 secondi
			 */ 
			if(run && rc){
				printf("connection error!\n");
				sleep(5);
				mosquitto_reconnect(mosq);
			}
		}
		
		/*
		 * Da utilizzare per liberare memoria associata a un'istanza
		 * client mosquitto.
		 */ 
		mosquitto_destroy(mosq);
    }
    /*
     * Libera le risorse di memoria collegate alla libreria
     */ 
	mosquitto_lib_cleanup();

	
	return rc;
}

Makefile

WORKDIR = `pwd`

CC = gcc
CXX = g++
AR = ar
LD = g++
WINDRES = windres

INC = 
CFLAGS = -Wall -fexceptions
RESINC = 
LIBDIR = 
LIB = 
LDFLAGS = 

INC_DEBUG = $(INC)
CFLAGS_DEBUG = $(CFLAGS) -g
RESINC_DEBUG = $(RESINC)
RCFLAGS_DEBUG = $(RCFLAGS)
LIBDIR_DEBUG = $(LIBDIR)
LIB_DEBUG = $(LIB) 
LDFLAGS_DEBUG = $(LDFLAGS) -lmosquitto
OBJDIR_DEBUG = obj/Debug
DEP_DEBUG = 
OUT_DEBUG = bin/Debug/mqtt_publish

INC_RELEASE = $(INC)
CFLAGS_RELEASE = $(CFLAGS) -O2  -g
RESINC_RELEASE = $(RESINC)
RCFLAGS_RELEASE = $(RCFLAGS)
LIBDIR_RELEASE = $(LIBDIR)
LIB_RELEASE = $(LIB) 
LDFLAGS_RELEASE = $(LDFLAGS) -s  -lmosquitto
OBJDIR_RELEASE = obj/Release
DEP_RELEASE = 
OUT_RELEASE = bin/Release/mqtt_publish

OBJ_DEBUG = $(OBJDIR_DEBUG)/mqtt_publish.o

OBJ_RELEASE = $(OBJDIR_RELEASE)/mqtt_publish.o

all: debug release

clean: clean_debug clean_release

before_debug: 
	test -d bin/Debug || mkdir -p bin/Debug
	test -d $(OBJDIR_DEBUG) || mkdir -p $(OBJDIR_DEBUG)
	test -d $(OBJDIR_DEBUG)/src || mkdir -p $(OBJDIR_DEBUG)/src

after_debug: 

debug: before_debug out_debug after_debug

out_debug: before_debug $(OBJ_DEBUG) $(DEP_DEBUG)
	$(LD) $(LIBDIR_DEBUG) -o $(OUT_DEBUG) $(OBJ_DEBUG)  $(LDFLAGS_DEBUG) $(LIB_DEBUG)

$(OBJDIR_DEBUG)/mqtt_publish.o: mqtt_publish.c
	$(CC) $(CFLAGS_DEBUG) $(INC_DEBUG) -c mqtt_publish.c -o $(OBJDIR_DEBUG)/mqtt_publish.o

clean_debug: 
	rm -f $(OBJ_DEBUG) $(OUT_DEBUG)
	rm -rf bin/Debug
	rm -rf $(OBJDIR_DEBUG)
	rm -rf $(OBJDIR_DEBUG)/src

before_release: 
	test -d bin/Release || mkdir -p bin/Release
	test -d $(OBJDIR_RELEASE) || mkdir -p $(OBJDIR_RELEASE)
	test -d $(OBJDIR_RELEASE)/src || mkdir -p $(OBJDIR_RELEASE)/src

after_release: 

release: before_release out_release after_release

out_release: before_release $(OBJ_RELEASE) $(DEP_RELEASE)
	$(LD) $(LIBDIR_RELEASE) -o $(OUT_RELEASE) $(OBJ_RELEASE)  $(LDFLAGS_RELEASE) $(LIB_RELEASE)

$(OBJDIR_RELEASE)/mqtt_publish.o: mqtt_publish.c
	$(CC) $(CFLAGS_RELEASE) $(INC_RELEASE) -c mqtt_publish.c -o $(OBJDIR_RELEASE)/mqtt_publish.o

clean_release: 
	rm -f $(OBJ_RELEASE) $(OUT_RELEASE)
	rm -rf bin/Release
	rm -rf $(OBJDIR_RELEASE)
	rm -rf $(OBJDIR_RELEASE)/src

.PHONY: before_debug after_debug clean_debug before_release after_release clean_release

Per compilare il client eseguire il comando :

make

nella stessa directory dove si trovano il Makefile e il source mqtt_publish.c

E con il seguente codice creeremo un client subscriber, cioè un client che riceverà il messaggio dal broker mosquitto inviato dal precedente client.

mqtt_subscribe.c

#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "mosquitto.h"

/*
 *  Indirizzo host del Broker
 */ 
#define mqtt_host "IP_HOST"

/*
 * Porta di connesssione del Broker standard 1883
 */ 
#define mqtt_port 1883

static int run = 1;

/* 
 * signal handler per terminare il programma 
 */
void handle_signal(int s)
{
	run = 0;
	printf("Signal Handler\n");
}

/* 
 * callback per la notifica della connessione 
 * mosq   - l'istanza mosquitto che effettua la richiamata.
 * obj    - i dati dell'utente forniti in mosquitto_new
 * result - il codice di ritorno della risposta di connessione, uno di:
 *  0 - successo
 *  1 - connessione rifiutata (versione del protocollo inaccettabile)
 *  2 - connessione rifiutata (identificatore rifiutato)
 *  3 - connessione rifiutata (broker non disponibile)
 *  4-255 - riservato per uso futuro
 */
void connect_callback(struct mosquitto *mosq, void *obj, int result)
{
	printf("callback, connected with return code rc=%d\n", result);
}


/* 
 * callback per la ricezione del messaggio pubblicato dal publisher
 * mosq     - l'istanza mosquitto che effettua la richiamata.
 * obj      - i dati dell'utente forniti in mosquitto_new
 * message  - i dati del messaggio. 
 * 			  Questa variabile e la memoria associata verranno liberate
 *            dalla libreria al termine del callback.
 * 			  Il cliente dovrebbe fare copie di tutti i dati che richiede.
 */
void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
	
	bool match = 0;

	printf("received message '%.*s' for topic '%s'\n", message->payloadlen, (char*) message->payload, message->topic);

	/*
	 * Controlla se un argomento corrisponde a una sottoscrizione.
	 */ 
	mosquitto_topic_matches_sub("devices/telemetry", message->topic, &match);
	if (match) {
		printf("received message for Telemetry topic\n");	
	}
}

int main(int argc, char *argv[])
{
	
	char id[24];
	
	/*
	 * stringa terminata da null dell'argomento a cui sottoscrivere.
	 */ 
	char* topic = "device/telemetry";	
	
	/*
	 * Struttura istanza del client
	 */	
	struct mosquitto *mosq;
	
	int rc = 0;

	signal(SIGINT, handle_signal);
	signal(SIGTERM, handle_signal);

	/* 
	 * Inizializzazione della libreria. DEVE essere chiamata prima
	 *  di qualsiasi altra funzione
	 */
	mosquitto_lib_init();

	memset(id, 0, 24);
	snprintf(id, 23, "%d", getpid());
	
	/* 
	 * Crea una nuova istanza del client
	 */	
	mosq = mosquitto_new(id, true, 0);
    
    if(mosq){
		/* 
		 * Imposta la callback di connessione. 
		 * Chiamata quando il broker invia il messaggio CONNACK
		 */        
		mosquitto_connect_callback_set(mosq, connect_callback);

		/* 
		 * Imposta la callback di sottoscrizione. 
		 * Chiamata quando il messaggio è inviato dal broker con successo.
		 */		
		mosquitto_message_callback_set(mosq, message_callback);

		/* 
		 * Connessione al broker.
		 */
	    rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 15);

		/*
		 * Sottoscrizione ad un argomento 
		 * mosq   - l'istanza mosquitto che effettua la richiamata.
		 * NULL   - eventuale id del messaggio. libmosquitto assegna
		 *          gli ID dei messaggi in modo che possano essere
		 *          tracciati con questo parametro.
		 * topic  - stringa terminata da null dell'argomento a
		 *          cui sottoscrivere.
		 * 0      - QoS Quality of service
		 */ 
		mosquitto_subscribe(mosq, NULL, topic, 0);
       
       /* Il blocco di codice successivo
        * si rende necessario per assicurare
        * che il messaggio venga ricevuto
        */        
		while(run){
			/*
			 * Il network loop principale per il client. 
			 * Questo deve essere chiamato frequentemente
			 * per mantenere attive le comunicazioni tra il client e 
			 * il broker.
			 * mosq - l'istanza mosquitto che effettua la richiamata.
			 * -1   - Numero massimo di millisecondi di attesa per 
			 *        l'attività di rete nella chiamata select () 
			 *        prima del timeout. Impostare a 0 per un 
			 *        ritorno immediato. Impostare negativo per
			 *        utilizzare il valore predefinito di 1000 ms.
			 * 1    - Parametro non utilizzato
			 */			
			rc = mosquitto_loop(mosq, -1, 1);
			/*
			 * Se rc è diverso da zero significa
			 * che si è verificato un errore quindi
			 * ci riconettiamo al broker dopo un timeout di 5 secondi
			 */ 			
			if(run && rc){
				printf("connection error!\n");
				sleep(5);
				mosquitto_reconnect(mosq);
			}
		}
		/*
		 * Da utilizzare per liberare memoria associata a un'istanza 
		 * client mosquitto.
		 */ 		
		mosquitto_destroy(mosq);
    }
    /*
     * Libera le risorse di memoria collegate alla libreria
     */     
	mosquitto_lib_cleanup();

	
	return rc;
}

Makefile

WORKDIR = `pwd`

CC = gcc
CXX = g++
AR = ar
LD = g++
WINDRES = windres

INC = 
CFLAGS = -Wall -fexceptions
RESINC = 
LIBDIR = 
LIB = 
LDFLAGS = 

INC_DEBUG = $(INC)
CFLAGS_DEBUG = $(CFLAGS) -g
RESINC_DEBUG = $(RESINC)
RCFLAGS_DEBUG = $(RCFLAGS)
LIBDIR_DEBUG = $(LIBDIR)
LIB_DEBUG = $(LIB) 
LDFLAGS_DEBUG = $(LDFLAGS) -lmosquitto
OBJDIR_DEBUG = obj/Debug
DEP_DEBUG = 
OUT_DEBUG = bin/Debug/mqtt_subscribe

INC_RELEASE = $(INC)
CFLAGS_RELEASE = $(CFLAGS) -O2  -g
RESINC_RELEASE = $(RESINC)
RCFLAGS_RELEASE = $(RCFLAGS)
LIBDIR_RELEASE = $(LIBDIR)
LIB_RELEASE = $(LIB) 
LDFLAGS_RELEASE = $(LDFLAGS) -s  -lmosquitto
OBJDIR_RELEASE = obj/Release
DEP_RELEASE = 
OUT_RELEASE = bin/Release/mqtt_subscribe

OBJ_DEBUG = $(OBJDIR_DEBUG)/mqtt_subscribe.o

OBJ_RELEASE = $(OBJDIR_RELEASE)/mqtt_subscribe.o

all: debug release

clean: clean_debug clean_release

before_debug: 
	test -d bin/Debug || mkdir -p bin/Debug
	test -d $(OBJDIR_DEBUG) || mkdir -p $(OBJDIR_DEBUG)
	test -d $(OBJDIR_DEBUG)/src || mkdir -p $(OBJDIR_DEBUG)/src

after_debug: 

debug: before_debug out_debug after_debug

out_debug: before_debug $(OBJ_DEBUG) $(DEP_DEBUG)
	$(LD) $(LIBDIR_DEBUG) -o $(OUT_DEBUG) $(OBJ_DEBUG)  $(LDFLAGS_DEBUG) $(LIB_DEBUG)

$(OBJDIR_DEBUG)/mqtt_subscribe.o: mqtt_subscribe.c
	$(CC) $(CFLAGS_DEBUG) $(INC_DEBUG) -c mqtt_subscribe.c -o $(OBJDIR_DEBUG)/mqtt_subscribe.o

clean_debug: 
	rm -f $(OBJ_DEBUG) $(OUT_DEBUG)
	rm -rf bin/Debug
	rm -rf $(OBJDIR_DEBUG)
	rm -rf $(OBJDIR_DEBUG)/src

before_release: 
	test -d bin/Release || mkdir -p bin/Release
	test -d $(OBJDIR_RELEASE) || mkdir -p $(OBJDIR_RELEASE)
	test -d $(OBJDIR_RELEASE)/src || mkdir -p $(OBJDIR_RELEASE)/src

after_release: 

release: before_release out_release after_release

out_release: before_release $(OBJ_RELEASE) $(DEP_RELEASE)
	$(LD) $(LIBDIR_RELEASE) -o $(OUT_RELEASE) $(OBJ_RELEASE)  $(LDFLAGS_RELEASE) $(LIB_RELEASE)

$(OBJDIR_RELEASE)/mqtt_subscribe.o: mqtt_subscribe.c
	$(CC) $(CFLAGS_RELEASE) $(INC_RELEASE) -c mqtt_subscribe.c -o $(OBJDIR_RELEASE)/mqtt_subscribe.o

clean_release: 
	rm -f $(OBJ_RELEASE) $(OUT_RELEASE)
	rm -rf bin/Release
	rm -rf $(OBJDIR_RELEASE)
	rm -rf $(OBJDIR_RELEASE)/src

.PHONY: before_debug after_debug clean_debug before_release after_release clean_release

Per compilare eseguire il comando :

make

nella stessa directory del Makefile e del source mqtt_subcribe.c

Ora per poter testare i nostri client dobbiamo installare il broker mosquitto:

sudo apt-get update

sudo apt-get install mosquitto mosquitto-clients

Per impostazione predefinita, Ubuntu avvierà il servizio Mosquitto dopo l’installazione. Verifichiamo che il broker sia attivo eseguendo systemctl status mosquitto, il risultato sarà:

Testeremo i clients con la configurazione predefinita del broker.Apriamo un terminale e spostiamoci nella directory dove abbiamo il nostro client mqtt_subscribe e lanciamolo:

Apriamo un’altro terminale e spostiamoci dove abbiamo il nostro client mqtt_publish e lanciamolo:

Nel terminale dove abbiamo il client subcriber mqtt_subscribe otterremo il messaggio postato:

La documentazioni della libreria libmosquitto si trova al link:

https://mosquitto.org/api/files/mosquitto-h.html

Questa voce è stata pubblicata in C/C++, Linux e contrassegnata con , , , . Contrassegna il permalink.