Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[mosquitto-dev] Ping requests not sent when using my own loop handler

Hi,

 

I am new to this mailing list. I am developing an MQTT client application with a large number of file descriptors used.

As referenced in GitLab issue #1299 this is causing the MQTT library to return MOSQ_ERR_INVAL errors.

 

Following an advice in this GitLab issue, I am now trying to use my own event loop, but am having some issues with it. I found the client is not sending the PING command, causing the MQTT server to disconnect the client. Server log shows:

1632236557: Client mosq-dDa3ooaeKWKrdxSdQa has exceeded timeout, disconnecting.

 

I am reproducing the issue with the sample code below (also in attachment).

The on_disconnect callback is called before the message is published.

When replacing my custom loop with a call to mosquito_loop_start, this all works fine.

This is with libmosquitto version 1.6.9, on Ubuntu 20.04

 

Would you have any pointers indicating what I’m doing wrong?

 

Many thanks,

Jeremy

 

#include <mosquittopp.h>

 

#include <atomic>

#include <cassert>

#include <iostream>

#include <sys/epoll.h>

#include <thread>

 

static void on_connect(struct mosquitto*, void*, int rc)

{

      std::cerr << "on_connect " + std::to_string(rc) << std::endl;

}

 

static void on_disconnect(struct mosquitto*, void*, int rc)

{

      std::cerr << "on_disconnect " + std::to_string(rc) << std::endl;

}

 

static void on_publish(struct mosquitto*, void*, int mid)

{

      std::cerr << "on_publish " + std::to_string(mid) << std::endl;

}

 

static void on_message(struct mosquitto*, void*, const mosquitto_message*)

{

      std::cerr << "on_message" << std::endl;

}

 

static void on_subscribe(struct mosquitto*, void*, int, int, const int *)

{

      std::cerr << "on_subscribe" << std::endl;

}

 

int main()

{

      auto ret = mosquitto_lib_init();

      assert(ret == MOSQ_ERR_SUCCESS);

 

      struct mosquitto * mosq = mosquitto_new(NULL, true, NULL);

      assert(mosq != nullptr);

 

      mosquitto_connect_callback_set(mosq, on_connect);

      mosquitto_disconnect_callback_set(mosq, on_disconnect);

      mosquitto_publish_callback_set(mosq, on_publish);

      mosquitto_message_callback_set(mosq, on_message);

      mosquitto_subscribe_callback_set(mosq, on_subscribe);

 

      ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 5);

      assert(ret == MOSQ_ERR_SUCCESS);

 

      /**

      * Use our own thread with a polling logic.

      */

      std::atomic_bool ended(false);

      auto worker = std::thread([mosq, &ended]

      {

            /**

            * Tell the library we are using threads, but not mosquitto_loop_start.

            */

            mosquitto_threaded_set(mosq, true);

 

            struct epoll_event ev;

            auto epfd = epoll_create1(0);

            assert(epfd >= 0);

 

            int sock = mosquitto_socket(mosq);

            ev.data.fd = sock;

            ev.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;

            auto err = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);

            assert(err == 0);

 

            while (!ended)

            {

                  auto err = epoll_wait(epfd, &ev, 1, 250);

 

                  if (err > 0)

                  {

                        if (ev.events & EPOLLIN)

                             mosquitto_loop_read(mosq, 1);

                        if (ev.events & EPOLLOUT)

                        {

                             if (mosquitto_want_write(mosq))

                                   mosquitto_loop_write(mosq, 1);

                        }

                  }

 

                  mosquitto_loop_misc(mosq);

            }

      });

 

      /**

      * Wait for keepalive * 2 to see if Ping requests are working.

      */

      std::this_thread::sleep_for(std::chrono::seconds{10});

 

      std::string msg = "test message";

      ret = mosquitto_publish(mosq, NULL, "test-topic", msg.size(), msg.c_str(), 2, false);

      std::cerr << "publish result=" + std::to_string(ret) << std::endl;

 

      std::this_thread::sleep_for(std::chrono::milliseconds{500});

 

      mosquitto_disconnect(mosq);

 

      ended = true;

      worker.join();

 

      mosquitto_lib_cleanup();

}

#include <mosquittopp.h>

#include <atomic>
#include <cassert>
#include <iostream>
#include <sys/epoll.h>
#include <thread>

static void on_connect(struct mosquitto*, void*, int rc)
{
	std::cerr << "on_connect " + std::to_string(rc) << std::endl;
}

static void on_disconnect(struct mosquitto*, void*, int rc)
{
	std::cerr << "on_disconnect " + std::to_string(rc) << std::endl;
}

static void on_publish(struct mosquitto*, void*, int mid)
{
	std::cerr << "on_publish " + std::to_string(mid) << std::endl;
}

static void on_message(struct mosquitto*, void*, const mosquitto_message*)
{
	std::cerr << "on_message" << std::endl;
}

static void on_subscribe(struct mosquitto*, void*, int, int, const int *)
{
	std::cerr << "on_subscribe" << std::endl;
}

int main()
{
	auto ret = mosquitto_lib_init();
	assert(ret == MOSQ_ERR_SUCCESS);

	struct mosquitto * mosq = mosquitto_new(NULL, true, NULL);
	assert(mosq != nullptr);

	mosquitto_connect_callback_set(mosq, on_connect);
	mosquitto_disconnect_callback_set(mosq, on_disconnect);
	mosquitto_publish_callback_set(mosq, on_publish);
	mosquitto_message_callback_set(mosq, on_message);
	mosquitto_subscribe_callback_set(mosq, on_subscribe);

	ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 5);
	assert(ret == MOSQ_ERR_SUCCESS);

	/**
	 * Use our own thread with a polling logic.
	 */
	std::atomic_bool ended(false);
	auto worker = std::thread([mosq, &ended]
	{
		/**
		 * Tell the library we are using threads, but not mosquitto_loop_start.
		 */
		mosquitto_threaded_set(mosq, true);

		struct epoll_event ev;
		auto epfd = epoll_create1(0);
		assert(epfd >= 0);

		int sock = mosquitto_socket(mosq);
		ev.data.fd = sock;
		ev.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
		auto err = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
		assert(err == 0);

		while (!ended)
		{
			auto err = epoll_wait(epfd, &ev, 1, 250);

			if (err > 0)
			{
				if (ev.events & EPOLLIN)
					mosquitto_loop_read(mosq, 1);
				if (ev.events & EPOLLOUT)
				{
					if (mosquitto_want_write(mosq))
						mosquitto_loop_write(mosq, 1);
				}
			}

			mosquitto_loop_misc(mosq);
		}
	});

	/**
	 * Wait for keepalive * 2 to see if Ping requests are working.
	 */
	std::this_thread::sleep_for(std::chrono::seconds{10});

	std::string msg = "test message";
	ret = mosquitto_publish(mosq, NULL, "test-topic", msg.size(), msg.c_str(), 2, false);
	std::cerr << "publish result=" + std::to_string(ret) << std::endl;

	std::this_thread::sleep_for(std::chrono::milliseconds{500});

	mosquitto_disconnect(mosq);

	ended = true;
	worker.join();

	mosquitto_lib_cleanup();
}

Back to the top