Shovel plugin

The Shovel plugin allows you to configure a number of shovels, which start automatically when the broker starts.

The high level goal of a shovel is to reliably and continually take messages from a queue (a source) in one broker and publish them to exchanges in another broker (a destination).

The source queue and destination exchanges can be on the same broker or distinct brokers.

A shovel behaves like a well-written client application, which connects to its source and destination, reads and writes messages, and copes with connection failures.

The primary advantages of a shovel are:

Loose coupling
A shovel can move messages between brokers (or clusters) in different administrative domains:
  • they may have different users and virtual hosts;
  • they may run on different versions of RabbitMQ and Erlang.
WAN-friendly
The Shovel plugin uses AMQP to communicate between brokers, and is designed to tolerate intermittent connectivity without message loss.
Non-invasive
To define and start a shovel, you do not have to reconfigure either the source or the destination resources. You do not even have to restart the broker(s) upon which they reside: the shovels can run in a separate broker altogether.
Highly tailorable
When a shovel connects (either to the source or the destination) it can be configured to perform any number of explicit methods. For example, the source queue need not exist initially, and can be declared on connect.

A comparison between clustering, federated exchanges and shovels is given on the Distributed Messaging page.

What does it do?

The Shovel plugin defines (and runs) an Erlang client application for each shovel defined in its configuration.

In essence, a shovel is a simple pump. Each shovel:

  • connects to the source broker and the destination broker,
  • consumes messages from the queue,
  • re-publishes each message to the destination broker (using, by default, the original exchange name and routing_key).

The shovel configuration allows each of these processes to be tailored.

connects

After connection to a source or a destination broker a series of configured AMQP declarations can be issued. Queues, exchanges and bindings can be declared.

A shovel will attempt to reconnect to a broker if a failure occurs and multiple brokers can be specified for the source and destination so that another broker may be selected (at random) to reconnect to. A reconnection delay can be specified to avoid flooding the network with reconnection attempts, or to prevent reconnection on failure altogether.

All configured declarations (for that source or destination) are re-issued upon re-connect.

consumes

The shovel's consumer can acknowledge messages automatically on receipt, after (re-)publication, or after confirmation of its publication.

re-publishes

Both the publish method and the message properties can be modified with explicit parameter values.

Full details are given in the configuration section, below.

Getting Started

The Shovel plugin is included in the RabbitMQ distribution. To enable it, use rabbitmq-plugins:

  rabbitmq-plugins enable rabbitmq_shovel

You may also wish to enable the rabbitmq_shovel_management plugin (see below).

Shovel status

There are two ways of discovering the status of configured shovels.

Use shovel management

Shovel status can be reported on the Management plugin user interface by enabling the rabbitmq_shovel_management plugin wherever you have the management plugin enabled.

Information about configured shovels will automatically appear in the management API and UI.

Direct query

Shovel status can be obtained by direct query of the Shovel plugin app. Issue the following rabbitmqctl command:

  $ rabbitmqctl eval 'rabbit_shovel_status:status().'

This calls the status method in a module of the rabbitmq_shovel plugin, which will return an Erlang list, with one element for each configured shovel.

Each element of the list is a tuple with three fields: {Name, Status, Timestamp}.

  • Name is the shovel name (an Erlang atom),
  • Status is the current shovel state,
  • and Timestamp is the time when the shovel entered this state.
  • Timestamp is a local calendar time of the form {{YYYY, MM, DD}, {HH, MM, SS}}.

    Status takes one of three forms:

    • The shovel is starting up, connecting and creating resources:
        'starting'
      
    • The shovel is running normally:
        { 'running' | 'blocked', {'source', Source},
                                 {'destination', Destination}}:
      

      where Source and Destination terms give the respective connection parameters.

    • The shovel has terminated:
        {'terminated', Reason}
      

      where Reason is an Erlang term that indicates the reason for the termination.

    The first term of a normal status can be 'running' or 'blocked':

    • 'running': the shovel is up and running, shovelling messages;
    • 'blocked': the destination has raised channel.flow, preventing the shovel from sending messages to the destination.

    When 'blocked', the shovel will raise channel.flow to the source, asking the source to stop sending further messages to the shovel. Any messages that are received by the shovel before the source observes the channel.flow are correctly buffered and maintained in order, and are published to the destination as soon as the destination drops the channel.flow block.

    Configuration

    The configuration for the Shovel plugin in the broker configuration file is an Erlang term (as usual) and consists of a single shovels clause:

      {rabbitmq_shovel, [ {shovels, [ {shovel_name, [ ... ]}, ... ]} ]}
    

    A (deliberately verbose) example configuration is given below.

    Each element of the list in the shovels clause is a named shovel. The shovel_names in the list must be distinct.

    Each shovel definition looks like this:

      {shovel_name, [ {sources, [ ... ]}
                    , {destinations, [ ... ]}
                    , {queue, queue_name}
                    , {prefetch_count, count}
                    , {ack_mode, a_mode}
                    , {publish_properties, [ ... ]}
                    , {publish_fields, [ ... ]}
                    , {reconnect_delay, reconn_delay}
                    ]}
    

    where shovel_name is the name of the shovel (an Erlang atom) and the clauses for sources, destinations and queue are mandatory. All the other clauses are optional.

    Each clause is fully described below.

    sources
    destinations

    Both of these clauses are mandatory. They take the form:

      {sources, [ {brokers, broker_list}
                , {declarations, declaration_list}
                ]}
    

    (or {destinations, ...}). The brokers clause is mandatory and the declarations clause optional.

    brokers

    This clause (or its variant broker clause, see note below) is mandatory. In

      {brokers, broker_list}
    

    broker_list is a list of URI broker connections (for the basic syntax, see AMQP URI), for example:

      [ "amqp://fred:secret@host1.domain/my_vhost"
      , "amqp://john:secret@host2.domain/my_vhost"
      ]
    
    If the host is omitted (not valid in a general AMQP URI), the shovel uses a direct connection to the broker in which it is running. This avoids using the network stack.

    The syntax is extended to include a query part to permit the configuration of additional connection parameters. heartbeat, channel_max, and frame_max can be specified, in any order. Omitted fields assume default values. For example:

      "amqp://myhost?heartbeat=5&frame_max=8192"
    
    specifies a (non-encrypted) network connection to the host myhost, using default username, password, port, vhost and channel_max. The heartbeat interval is set to 5 seconds, and the maximum frame size to 8192 bytes.

    It is possible to specify an encrypted SSL connection, the general form of which is:

      "amqps://username:password@host:port/vhost?
          cacertfile=/path/to/cacert.pem
         &certfile=/path/to/certfile.pem
         &keyfile=/path/to/keyfile.pem
         &verify=verifyOption
         &fail_if_no_peer_cert=failOption"
    
    (This URI has been split across several lines only for readbility. There must be no white-space in the URI.)

    All five parameters (3 paths: cacertfile, certfile and keyfile; and 2 options: verify, fail_if_no_peer_cert) must be specified as well as the amqps scheme. See the SSL guide for details of SSL in RabbitMQ in general and specifically the Erlang client section.

    Note: If the broker list consists of a single connection URI, the variant form:

      {broker, amqp_uri_string}
    

    is equivalent to a brokers clause with a single-element list.

    declaration_list

    This clause is optional. In

      {declarations, declaration_list}
    

    the declaration_list is a list of AMQP methods (in the style of the Erlang client) which can be sent to the broker after connection and before shovelling.

    This allows any resources that may need to be set up to be configured, including the source queue and the destination exchanges. For example:

      {declarations, [ 'queue.declare'
                     , {'queue.bind', [ {exchange, <<"my_exchange">>}
                                      , {queue,    <<>>}
                                      ]}
                     ]}
    

    will first declare an anonymous queue, and then bind it to the exchange called "my_exchange". (The queue parameter <<>> on queue.bind means 'use the queue last declared on this channel'.)

    Each element of the list is either an atom, being the name of an AMQP method, or a tuple with first element the method atom, and second element a property-list of parameter settings.

    If just the AMQP method atom is supplied all the parameters take their defaults (as illustrated with 'queue.declare' above).

    If a tuple and property-list is supplied, then the properties in the list specify some or all of the parameters explicitly.

    Here is another example:

      {'exchange.declare', [ {exchange, <<"my_exchange">>}
                           , {type, <<"direct">>}
                           , durable
                           ]}
    

    will declare a durable, direct exchange called "my_exchange".

    For full details, consult the Erlang Client documentation.

    queue

    This clause is mandatory. In

      {queue, queue_name}
    

    queue_name is the name of the queue (as a binary string) to shovel messages from. For example:

      {queue, <<"my_work_queue">>}
    

    This queue must exist. Use the resource declarations to create the queue (or ensure it exists) first. If queue_name is <<>> (the empty binary string) the most recently declared queue in declarations is used. This allows anonymous queues to be declared and used.

    prefetch_count

    This clause is optional. In

      {prefetch_count, count}
    

    count is the maximum number of unacknowledged messages the shovel may hold at a time (a non-negative integer). For example:

      {prefetch_count, 1}
    

    If this number is zero (the default), there is no limit.

    ack_mode

    This clause is optional. In

      {ack_mode, a_mode}
    

    a_mode is one of 'no_ack', 'on_publish' or 'on_confirm'.

    'no_ack'

    indicates that no message acknowledgements are to be generated by the shovel (the broker automatically acknowledges all delivered messages);

    'on_publish'

    indicates that a message acknowledgement is to be sent (to the source broker) after each message is re-published to the destination;

    'on_confirm'

    indicates that publish confirmations are sought and that a message acknowledgement is to be sent (to the source broker) after each message publication is confirmed by the destination broker.

    The default is 'on_confirm', which is highly recommended. If other options are chosen performance may improve slightly, but messages are more likely to be lost in the event of failures.

    publish_properties

    This clause is optional. It takes the form:

      {publish_properties, property_list}
    

    where the properties in the list are set on the basic.properties of each message before it is re-published.

    For example:

      {publish_properties, [ {delivery_mode, 2} ]}
    

    would mark all re-published messages persistent.

    By default the properties of the message are preserved, but this clause can be used to change, or set any property, including content_type, content_encoding, headers, delivery_mode, priority, correlation_id, reply_to, expiration, message_id, timestamp, type, user_id, app_id and cluster_id.

    publish_fields

    This clause is optional. It takes the form:

      {publish_fields, property_list}
    

    where the properties in the list are used to set the fields on the basic.publish method used to re-publish messages.

    By default the messages are re-published using the original exchange name and routing key, for example. By specifying:

      {publish_fields, [ {exchange, <<"my_exchange">>}
                       , {routing_key, <<"from_shovel">>}
                       ]}
    

    messages would be re-published to an explicit exchange name with an explicit, fixed routing key.

    reconnect_delay

    This clause is optional. In

      {reconnect_delay, reconn_delay}
    

    reconn_delay is the number of seconds to wait before reconnecting in the event of connection failure (a non-negative number). For example:

      {reconnect_delay, 1.5}
    

    would delay for one and a half seconds before reconnecting after failure.

    If reconn_delay is 0, then no reconnections occur: the shovel will stop after the first failure.

    The default reconn_delay is 5 (seconds).

    Example Configuration

    A verbose shovel configuration might look like this:

      {rabbitmq_shovel,
        [ {shovels, [ {my_first_shovel,
                        [ {sources,
                            [ {brokers, [ "amqp://fred:secret@host1.domain/my_vhost"
                                        , "amqp://john:secret@host2.domain/my_vhost"
                                        ]}
                            , {declarations, [ {'exchange.declare',
                                                  [ {exchange, <<"my_fanout">>}
                                                  , {type, <<"fanout">>}
                                                  , durable
                                                  ]}
                                             , {'queue.declare',
                                                  [{arguments,
                                                     [{<<"x-message-ttl">>, long, 60000}]}]}
                                             , {'queue.bind',
                                                  [ {exchange, <<"my_direct">>}
                                                  , {queue,    <<>>}
                                                  ]}
                                             ]}
                            ]}
                        , {destinations,
                            [ {broker, "amqp://"}
                            , {declarations, [ {'exchange.declare',
                                                  [ {exchange, <<"my_direct">>}
                                                  , {type, <<"direct">>}
                                                  , durable
                                                  ]}
                                             ]}
                            ]}
                        , {queue, <<>>}
                        , {prefetch_count, 10}
                        , {ack_mode, on_confirm}
                        , {publish_properties, [ {delivery_mode, 2} ]}
                        , {publish_fields, [ {exchange, <<"my_direct">>}
                                           , {routing_key, <<"from_shovel">>}
                                           ]}
                        , {reconnect_delay, 5}
                        ]}
                    ]}
        ]}
    

    The configuration above defines a single shovel called 'my_first_shovel'.

    'my_first_shovel' will connect to a broker on either host1 or host2 (as source), and directly to the local broker (as destination). It will reconnect to the other source broker on failure, after a delay of 5 seconds.

    When connected to the source it will declare a a direct, fanout exchange called "my_fanout", an anonymous queue with a per-queue message ttl, and bind the queue to the exchange.

    When connected to the destination (the local broker) it will declare a durable, direct exchange called "my_direct".

    This shovel will re-publish messages sent to the anonymous queue on the source to the local exchange with the fixed routing key "from_shovel". The messages will be persistent and only acknowledged after receiving a publish confirm from the local broker.

    The shovel consumer will not be allowed to hold more than ten unacknowledged messages at a time.