With Shovel

1 One queue for all incoming msg, root

2 One queue for duplicating all incoming msg, root-worker

3 One queue to remote host for sending all incoming msg, root-worker-remote

Publish to:

ch.basic_publish(exchange="amq.topic", routing_key="root-key", body="root message", 

Both 1 and 2 are bind to root-key, 3 is bind to root-worker-remote-key

2 is the source shovel queue for 3 destination queue.

When publish, it goes like this:

1 Root gets and keeps msg for further processing

2 Root-worker gets and send msg to root-remote-worker

3 Root-worker-remote (simulated on localhost here) gets and keep msg for further processing

Publish msg with Python:

All msg published

Where is it stored?

Information about dynamic shovels is stored in RabbitMQ’s schema database, along with users, permissions, queues, etc.
They therefore can be exported together with other schema definitions.






Stop RabbitMQ service and start it again

Visit management

2023-02-09 13:09:47.264000+01:00 [info] <0.715.0> connection <0.715.0> ( -> – Shovel root_worker_to_remote): user ‘guest’ authenticated and granted access to vhost ‘/’

Dynamic shovel settings are stored in the internal database.
Dynamic shovels can be added at any time. Static shovels are started on plugin

Configuring Static Shovels — RabbitMQ

[…] Most users should prefer dynamic shovels to static ones for their flexibility and ease of automation. Generating a dynamic shovel definition (a JSON document) is usually easier compared to a static shovel definition (which uses Erlang terms).


Also running TLS on server here with self signed.

# to disable 5672
listeners.tcp  = none

# ssl only
listeners.ssl.default = 5671

ssl_options.cacertfile = C:\testca\ca_certificate.pem
ssl_options.certfile   = C:\testca\server\server_certificate.pem
ssl_options.keyfile    = C:\testca\server\private_key.pem
ssl_options.verify     = verify_none
ssl_options.fail_if_no_peer_cert = false

# management.tcp.port = 15672
management.ssl.port       = 15671
management.ssl.cacertfile = C:\testca\ca_certificate.pem
management.ssl.certfile   = C:\testca\server\server_certificate.pem
management.ssl.keyfile    = C:\testca\server\private_key.pem

Python code for TLS with the ca cert same cert as server

import ssl
import pika
import logging

context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)

context.verify_mode = ssl.CERT_REQUIRED

credential = pika.PlainCredentials("baduser", "A-PASSWORD")

conn_parameters = pika.ConnectionParameters(host="HOSTNAME",virtual_host="/", ssl_options=pika.SSLOptions(context), credentials=credential)

conn = pika.BlockingConnection(conn_parameters)
print("\n"+ str(conn))
ch = conn.channel()
ch.queue_declare(queue="root", durable=True)
ch.queue_bind("root", exchange="amq.topic", routing_key="root-key")

for i in range(20):
    ch.basic_publish(exchange="amq.topic", routing_key="root-key", body="root message",  properties=pika.BasicProperties(content_type='text/plain',delivery_mode=pika.DeliveryMode.Transient))
    get_queue_info = ch.basic_get("root")
    print("\n"+ str(get_queue_info))

Python code for TLS with AMQPS

import pika
import urllib
import time

connection = pika.BlockingConnection(pika.URLParameters("amqps://baduser:a-password@hostname:5671/"))
channel = connection.channel()

# channel.queue_declare(queue='root', durable=True)
for x in range(0,200):
    channel.basic_publish(exchange='amq.topic',routing_key='root-key',body='Hello root World!')

print(" Sent 'Hello World!'")
# time.sleep(2)
# works 

Shovel with tls and no tls on same host

# to disable 5672
# listeners.tcp  = none
# enable it
listeners.tcp.default = 5672

# ssl only
listeners.ssl.default = 5671