Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [paho-dev] MQTT C++ Client options

Hi Frank,

 

I expect disconnect callback to be received in two ways as following.

  • When TCP socket is terminated from remote side
  • When keepalive timer expires on Client side

 

But none of them are helping. No disconnect callback is received in either of the cases above.

 

Attached is my application code using aync_client. Please help.

 

Regards

Pankaj

 

From: paho-dev-bounces@xxxxxxxxxxx [mailto:paho-dev-bounces@xxxxxxxxxxx] On Behalf Of Kumar, Pankaj
Sent: Monday, November 5, 2018 8:38 PM
To: Frank Pagliughi <fpagliughi@xxxxxxxxxxxxxx>; General development discussions for paho project <paho-dev@xxxxxxxxxxx>
Subject: Re: [paho-dev] MQTT C++ Client options

 

Hi Frank,

 

When running on ARM, I am facing an issue that disconnect callback is not being received to application when message broker goes offline. I am using async_client.

 (Note: This same is not an issue when running on x86, there it works fine as expected.)

 

Any known issue around this ? That can help. I will debug it further tomorrow and will provide more detail on it. 

  

Regards

Pankaj

 

From: Frank Pagliughi [mailto:fpagliughi@xxxxxxxxxxxxxx]
Sent: Tuesday, October 23, 2018 5:16 PM
To: Kumar, Pankaj <
pankaj_kumar2@xxxxxxxxxx>; General development discussions for paho project <paho-dev@xxxxxxxxxxx>
Subject: Re: [paho-dev] MQTT C++ Client options

 

There should be no trouble getting it working on an A7 with Yocto. I know of several people who have incorporated Paho C++ into a Yocto build, but unfortunately, no one has contributed those builds back to the project. I hope to do it eventually, but it's not very high on my list at the moment.

Frank

On 10/22/18 2:27 AM, Kumar, Pankaj wrote:

Hi Frank,

 

Could you please compile MQTT C++ code base and provide static library (one lib if possible) for below target platform.

 

OS: Digi Embedded Yocto Linux

CPU: ARM Cortex-A7 @900 MHz

Kernel Version: Linux ccimx6ulsbc 4.1.41-dey+gf1a146c #2 SMP PREEMPT Mon Apr 23 10:32:20 EDT 2018 armv7l GNU/Linux

 

My need from this library is as following:

  • It should compile/run for Ubuntu X86_64 (required for development purpose). I have compiled and tested for it and it works !!!
  • It should also compile/run for embedded environment (as above). This is where I need help.  

 

Only if it compiles and runs for embedded environment then only we can consider this as an option for our production.

 

Regarding hiring you for support, I am in discussion with management and will get back to you.

 

Regards

Pankaj  

 

From: Frank Pagliughi [mailto:fpagliughi@xxxxxxxxxxxxxx]
Sent: Thursday, October 18, 2018 8:20 PM
To: Kumar, Pankaj
<pankaj_kumar2@xxxxxxxxxx>; General development discussions for paho project <paho-dev@xxxxxxxxxxx>
Subject: Re: [paho-dev] MQTT C++ Client options

 

CMake is now the preferred method for compiling.

What are you trying with ARM that is failing? Please post specific error messages. If something appears to be a bug, please raise an Issue on the GitHub site:
https://github.com/eclipse/paho.mqtt.cpp/issues

That is preferred for specific problems, rather than e-mail to the group.

Are you compiling natively on an ARM platform, or trying to cross-compile?  If cross, what host? What target?

Frank

On 10/18/18 4:59 AM, Kumar, Pankaj wrote:

Hi Frank,

 

Today, I started compiling C++ MQTT source code and was able to successfully compile and test it for x86 architecture. (Using Cmake option)

However, for ARM architecture I am still not successful and need your help. Can you please help me with the steps on how to compile it for ARM ?

 

Also, please help me with what is the best way from following options if I am looking to compile it for multiple platforms (x86, ARM).

  • CMake
  • Autotools
  • GNU Make

Regards

Pankaj

 

From: paho-dev-bounces@xxxxxxxxxxx [mailto:paho-dev-bounces@xxxxxxxxxxx] On Behalf Of Kumar, Pankaj
Sent: Monday, October 8, 2018 11:15 AM
To: Frank Pagliughi
<fpagliughi@xxxxxxxxxxxxxx>; General development discussions for paho project <paho-dev@xxxxxxxxxxx>
Subject: Re: [paho-dev] MQTT C++ Client options

 

Thanks Frank.

Appreciate the quick response !!!

 

Regards

Pankaj

 

From: Frank Pagliughi [mailto:fpagliughi@xxxxxxxxxxxxxx]
Sent: Friday, October 5, 2018 6:12 PM
To: General development discussions for paho project <
paho-dev@xxxxxxxxxxx>; Kumar, Pankaj <pankaj_kumar2@xxxxxxxxxx>
Subject: Re: [paho-dev] MQTT C++ Client options

 

Answers inline...

On 10/05/2018 06:37 AM, Kumar, Pankaj wrote:

 

  • Can we use it for commercial purpose ?


Yes. The terms of the license are listed on the GitHub page.
https://github.com/eclipse/paho.mqtt.cpp


 

  • Does this library have any third party library dependency ?


It is a wrapper around the Paho C library.
https://github.com/eclipse/paho.mqtt.c

That library is also required.


  • If we run into issues in future (e.g., memory corruption within the library), do we get stuck waiting for the community to fix the issue? Is there is a commercial support available?


I'm the author, running a software consulting company, and available for hire. If there is any support that you require with particular time constraints, you can contact me directly at this e-mail address.


  • Currently, it supports MQTT 3.1.1, what is the roadmap for supporting MQTT 5.0 ?


MQTT 5.0 and WebSocket support should be complete by the end of the year.


 

  • Where can I find the list of outstanding issue, if there are any ?


On the GitHub page, under "Issues":
https://github.com/eclipse/paho.mqtt.cpp/issues


Frank Pagliughi
SoRo Systems, Inc

 

 

///////////////////////////////////////////////////////////////////////////////
///
/// Copyright 2018, Baxter Healthcare Corporation.  All rights reserved.
/// For use only by Baxter Healthcare Corporation.
/// This document contains proprietary information. It must not be reproduced
/// or disclosed to others without prior written approval.
///
///////////////////////////////////////////////////////////////////////////////

///////////////////////////////////////////////////////////////////////////////
///
/// @file       MQTTTransport.h
/// @brief      This file provides MQTTTransport class declaration.
///
///             Wrapper over paho mqtt library to manage communication with MQTT
///             broker.
///
///////////////////////////////////////////////////////////////////////////////

#ifndef __MQTT_TRANSPORT_H__
#define __MQTT_TRANSPORT_H__

#include "mqtt/async_client.h"
#include "Transport.h"
#include "TransportUser.h"

class MQTTTransport : public Transport, public mqtt::callback
{
public:
    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Constructor
    ///
    ///////////////////////////////////////////////////////////////////////////
    MQTTTransport(TransportUser* receiver,
                    const std::string& remoteIp, const int remotePort);

    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Destructor
    ///
    ///////////////////////////////////////////////////////////////////////////
    virtual ~MQTTTransport();

    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Start transport
    ///
    /// @return True if start successful, False otherwise
    ///
    ///////////////////////////////////////////////////////////////////////////
    virtual bool start() override;

    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Send byte stream to remote
    ///
    /// @param[in] byteStream Byte stream to be sent
    ///
    /// @return True if send successful, False otherwise
    ///
    ///////////////////////////////////////////////////////////////////////////
    virtual bool sendByteStream(std::
                            unique_ptr<std::string> byteStream) override;

    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Shutdown transport
    ///
    ///////////////////////////////////////////////////////////////////////////
    virtual void shutdown() override;

private:
    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Process connect callback
    ///
    ///////////////////////////////////////////////////////////////////////////
    virtual void connected(const std::string& cause) override;

    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Process incoming message
    ///
    ///////////////////////////////////////////////////////////////////////////
    virtual void message_arrived(mqtt::const_message_ptr msg) override;

    ///////////////////////////////////////////////////////////////////////////
    ///
    /// Process connection lost
    ///
    /// @param[in] cause Reason for disconnection
    ///
    ///////////////////////////////////////////////////////////////////////////
    virtual void connection_lost(const std::string& cause) override;

    /// MQTT async client for egress communication
    std::unique_ptr<mqtt::async_client> asyncClient;

    /// Transport user or receiver to handle ingress communication
    TransportUser* receiver_ = nullptr;

    /// Default QOS
    const unsigned int QOS = 1;

    /// MQTT client Id
    const std::string CLIENT_ID = "NPP_ECM";

    /// Communication topic
    const std::string TOPIC = "NPP_ECM";
};

#endif //__MQTT_TRANSPORT_H__
///////////////////////////////////////////////////////////////////////////////
///
/// Copyright 2018, Baxter Healthcare Corporation.  All rights reserved.
/// For use only by Baxter Healthcare Corporation.
/// This document contains proprietary information. It must not be reproduced
/// or disclosed to others without prior written approval.
///
///////////////////////////////////////////////////////////////////////////////

#include "CppUtils.h"

#include "MQTTTransport.h"
#include "MsgMacros.h"
#include "ExtComConfig.h"
#include "TransportUser.h"

///////////////////////////////////////////////////////////////////////////////
///
/// Constructor
///
///////////////////////////////////////////////////////////////////////////////
MQTTTransport::MQTTTransport(TransportUser* receiver,
                    const std::string& remoteIp, const int remotePort)
{
    if (!receiver)
    {
        throw std::runtime_error("MQTTTransport::\
                                        MQTTTransport receiver is NULL");
    }

    /// update receiver
    receiver_ = receiver;

    if (remoteIp.empty())
    {
        throw std::runtime_error("MQTTTransport::\
                                        MQTTTransport remote ip is NULL");
    }

    if (remotePort <= 0)
    {
        throw std::runtime_error("MQTTTransport:: MQTTTransport \
                                            remote port is invalid");
    }

    asyncClient.reset(new mqtt::async_client("tcp://" +
                    remoteIp + ":" + std::to_string(remotePort), CLIENT_ID));
    asyncClient->set_callback(*this);
}

///////////////////////////////////////////////////////////////////////////////
///
/// Destructor
///
///////////////////////////////////////////////////////////////////////////////
MQTTTransport::~MQTTTransport()
{
}

///////////////////////////////////////////////////////////////////////////////
///
/// Start transport
///
///////////////////////////////////////////////////////////////////////////////
bool
MQTTTransport::start()
{
    if (operationalStatus_ == OperationalStatus::RUNNING)
    {
        ERROR_MSG("MQTTTransport::start, transport is already running.");
        return false;
    }

    /// Update operation status as running
    operationalStatus_ = OperationalStatus::RUNNING;

    /// Configure connect call
    mqtt::connect_options connectOptions;
    connectOptions.set_automatic_reconnect(true);
    connectOptions.set_clean_session(true);
    connectOptions.set_keep_alive_interval(2);

    DEBUG_MSG("PANKAJ. keepalive=" << connectOptions.get_keep_alive_interval().count());

    /// Connect to remote
    asyncClient->connect(connectOptions);

    DEBUG_MSG("PANKAJ. patch set 1....");

    DEBUG_MSG("MQTTTransport::start, connecting to..."
              << "remoteAddress=" << asyncClient->get_server_uri().c_str());

    return true;
}

///////////////////////////////////////////////////////////////////////////////
///
/// Send byte stream to remote
///
///////////////////////////////////////////////////////////////////////////////
bool
MQTTTransport::sendByteStream(std::unique_ptr<std::string> message)
{
    if (operationalStatus_ == OperationalStatus::DOWN)
    {
        ERROR_MSG("MQTTTransport::sendEgressByteStream, "
                  << " transport is down.");
        return false;
    }

    asyncClient->publish(TOPIC, std::move(*(message.release())));

    return true;
}

///////////////////////////////////////////////////////////////////////////////
///
/// Process connect callback
///
///////////////////////////////////////////////////////////////////////////////
void
MQTTTransport::connected(const std::string& cause)
{
    DEBUG_MSG("MQTTTransport::connected, connection "
              << "established to remote!!!! cause=" << cause.c_str());

    /// Provide callback to applications that remote is connected
    receiver_->remoteConnected();

    /// Start processing the incoming traffic
    asyncClient->start_consuming();

    /// Subscribe to incoming topics
    asyncClient->subscribe(TOPIC, QOS);
}

///////////////////////////////////////////////////////////////////////////////
///
/// Process incoming message
///
///////////////////////////////////////////////////////////////////////////////
void
MQTTTransport::message_arrived(mqtt::const_message_ptr msg)
{
    receiver_->receiveByteStream(CppUtils::
            make_unique<std::string>(std::move(msg->get_payload())));
}

///////////////////////////////////////////////////////////////////////////////
///
/// Process incoming message
///
/// @param[in] cause Reason for disconnection
///
///////////////////////////////////////////////////////////////////////////////
void
MQTTTransport::connection_lost(const std::string& cause)
{
    DEBUG_MSG("MQTTTransport::message_arrived, "
              << "connection lost to remote !!!!, cause=" << cause.c_str());
    receiver_->remoteDisconnected();
}

///////////////////////////////////////////////////////////////////////////////
///
/// Shutdown transport
///
///////////////////////////////////////////////////////////////////////////////
void
MQTTTransport::shutdown()
{
    if (operationalStatus_ == OperationalStatus::DOWN)
    {
        ERROR_MSG("MQTTTransport::shutdown, transport is already down.");
    }

    /// Un-subscribe from subscribed topics
    asyncClient->unsubscribe(TOPIC);

    /// Stop receiving incoming traffic
    asyncClient->stop_consuming();

    /// Notify applications for remote disconnect
    asyncClient->disconnect();

    /// Update operation status as running
    operationalStatus_ = OperationalStatus::DOWN;
}

Back to the top