Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[paho-dev] Publish message from MQTT broker topics to Google cloud PubSub topics

Hi Team,
I am working with mqtt-paho. MQTT Version 3.1.1 on Ubuntu machine.
Project Requirement:
1. Subscribe to mqtt broker topics (there are multiple topics)
2. Publish message of each MQTT topic to Google cloud PubSub topics. Where pubsub topic and mqtt topic have 1-1 relationship.
Please have a look at the script that I have attached.

Questions:
Can I parallely send messages to different pubsub topics based on from which broker topic the message is arriving?

Thanks,
Urvashi Chaudhary
import paho.mqtt.client as mqtt

from google.cloud import pubsub_v1

project_id = "project_sample"
topic_name1 = "topic-01"
topic_name2 = "topic-02"
topic_name3 = "topic-03"
topic_name4 = "topic-04"
publisher = pubsub_v1.PublisherClient()
topic_path1 = publisher.topic_path(project_id, topic_name1)
topic_path2 = publisher.topic_path(project_id, topic_name2)
topic_path3 = publisher.topic_path(project_id, topic_name3)
topic_path4 = publisher.topic_path(project_id, topic_name4)

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code" +str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("mqtt/topic-01")
    client.subscribe("mqtt/topic-02")
    client.subscribe("mqtt/topic-03")
    client.subscribe("mqtt/topic-04")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    # print(msg.topic+" "+str(msg.payload))
    future = publisher.publish(topic_path1, data=msg.payload)
    future = publisher.publish(topic_path2, data=msg.payload)
    future = publisher.publish(topic_path3, data=msg.payload)
    future = publisher.publish(topic_path4, data=msg.payload)
    print(future.result())

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("IP_of_mqtt", Port, interval)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

Back to the top