Version: Latest

Event Brokers

An event broker serves as a link between your active assistant and various external services that handle data from conversations. It sends messages to a message streaming service, often referred to as a message broker or message queue. This enables the forwarding of Rasa Events from the Rasa server to other systems.

Format

All events are streamed to the broker as serialized dictionaries every time the tracker updates its state. An example event emitted from the default tracker looks like this:

{
"sender_id": "default",
"timestamp": 1528402837.617099,
"event": "bot",
"text": "what your bot said",
"data": "some data about e.g. attachments"
"metadata" {
"a key": "a value",
}
}

The event field takes the event's type_name (for more on event types, check out the events docs).

Kafka Event Broker

Kafka is recommended for all assistants at scale. Kafka is a requirement when streaming events to Rasa Pro Services or Rasa Studio.

Rasa uses the confluent-kafka library, a Kafka client written in Python.

Configuration

To set up Rasa with Kafka the following steps are required:

  1. Add required configuration to your endpoints.yml

    endpoints.yml
    event_broker:
    type: kafka
    partition_by_sender: True
    security_protocol: PLAINTEXT
    topic: topic
    url: localhost
    client_id: kafka-python-rasa

    When using the SASL_PLAINTEXT protocol the endpoints file must have the following entries:

    event_broker:
    type: kafka
    security_protocol: SASL_PLAINTEXT
    topic: topic
    url: localhost
    partition_by_sender: True
    sasl_username: username
    sasl_password: password
    sasl_mechanism: PLAIN

    When using the PLAINTEXT protocol the endpoints file must have the following entries:

    event_broker:
    type: kafka
    security_protocol: PLAINTEXT
    topic: topic
    url: localhost
    client_id: kafka-python-rasa

    When using the SSL protocol, the endpoints file should look like:

    event_broker:
    type: kafka
    security_protocol: SSL
    topic: topic
    url: localhost
    ssl_cafile: CARoot.pem
    ssl_certfile: certificate.pem
    ssl_keyfile: key.pem
    ssl_check_hostname: True

    When using the SASL_SSL protocol, the endpoints file should look like:

    event_broker:
    type: kafka
    security_protocol: SASL_SSL
    topic: topic
    url: localhost
    sasl_username: username
    sasl_password: password
    sasl_mechanism: PLAIN
    ssl_cafile: CARoot.pem
    ssl_certfile: certificate.pem
    ssl_keyfile: key.pem
    ssl_check_hostname: True
  2. To start the Rasa server using your Kafka backend, add the --endpoints flag, e.g.:

    rasa run -m models --endpoints endpoints.yml

Configuration Parameters

Partition Key

Rasa's Kafka producer can optionally be configured to partition messages by conversation ID. This can be configured by setting partition_by_sender in the endpoints.yml file to True. By default, this parameter is set to False and the producer will randomly assign a partition to each message.

Authentication and Authorization

Rasa's Kafka producer accepts the following types of security protocols: SASL_PLAINTEXT, SSL, PLAINTEXT and SASL_SSL.

For development environments, or if the brokers servers and clients are located into the same machine, you can use simple authentication with SASL_PLAINTEXT or PLAINTEXT. By using this protocol, the credentials and messages exchanged between the clients and servers will be sent in plaintext. Thus, this is not the most secure approach, but since it's simple to configure, it is useful for simple cluster configurations. SASL_PLAINTEXT protocol requires the setup of the username and password previously configured in the broker server.

If the clients or the brokers in the kafka cluster are located in different machines, it's important to use the SSL or SASL_SSL protocol to ensure encryption of data and client authentication. After generating valid certificates for the brokers and the clients, the path to the certificate and key generated for the producer must be provided as arguments, as well as the CA's root certificate.

When using the SASL_PLAINTEXT and SASL_SSL protocols, the sasl_mechanism can be optionally configured and is set to PLAIN by default. Valid values for sasl_mechanism are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, and SCRAM-SHA-512.

If GSSAPI is used for the sasl_mechanism, you will need to additionally install python-gssapi and the necessary C library Kerberos dependencies.

If the ssl_check_hostname parameter is enabled, the clients will verify if the broker's hostname matches the certificate. It's used on client's connections and inter-broker connections to prevent man-in-the-middle attacks.

Sending Events to Multiple Queues

Kafka does not allow you to configure multiple topics.

However, multiple consumers can read from the same queue as long as they are in different consumer groups. Each consumer group will process all events independent of each other (in a sense, each group has their own reference to the last event they have processed). Kafka: The Definitive Guide

Pika Event Broker for RabbitMQ

Rasa uses Pika , the Python client library for RabbitMQ.

Adding a Pika Event Broker Using the Endpoint Configuration

To set up Rasa with Pika for RabbitMQ the following steps are required:

  1. Add required configuration to your endpoints.yml

    event_broker:
    type: pika
    url: localhost
    username: username
    password: password
    queues:
    - queue-1
    # you may supply more than one queue to publish to
    # - queue-2
    # - queue-3
    exchange_name: exchange
  2. To start the Rasa server using your Redis backend, add the --endpoints flag, e.g.:

    rasa run -m models --endpoints endpoints.yml

Configuration Parameters

A comprehensive list of all arguments that can be customized in the endpoints.yml file can be found in the reference documentation. Rasa will automatically start streaming events when you restart the Rasa server.

Adding SSL options to the Pika Event Broker

You can create RabbitMQ SSL options by setting the following required environment variables:

  • RABBITMQ_SSL_CLIENT_CERTIFICATE: path to the SSL client certificate
  • RABBITMQ_SSL_CLIENT_KEY: path to the SSL client key

Please note that specifying 'RABBITMQ_SSL_CA_FILE' via environment variables is no longer supported, as well as specifying RABBITMQ_SSL_KEY_PASSWORD environment variable - please use a key file that is not encrypted instead.

Adding a Pika Event Broker in Python

Here is how you add it using Python code:

import asyncio
from rasa.core.brokers.pika import PikaEventBroker
from rasa.core.tracker_store import InMemoryTrackerStore
pika_broker = PikaEventBroker('localhost',
'username',
'password',
queues=['rasa_events'],
event_loop=event_loop
)
asyncio.run(pika_broker.connect())
tracker_store = InMemoryTrackerStore(domain=domain, event_broker=pika_broker)

Implementing a Pika Event Consumer

You need to have a RabbitMQ server running, as well as another application that consumes the events. This consumer to needs to implement Pika's start_consuming() method with a callback action. Here's a simple example:

import json
import pika
def _callback(ch, method, properties, body):
# Do something useful with your incoming message body here, e.g.
# saving it to a database
print("Received event {}".format(json.loads(body)))
if __name__ == "__main__":
# RabbitMQ credentials with username and password
credentials = pika.PlainCredentials("username", "password")
# Pika connection to the RabbitMQ host - typically 'rabbit' in a
# docker environment, or 'localhost' in a local environment
connection = pika.BlockingConnection(
pika.ConnectionParameters("rabbit", credentials=credentials)
)
# start consumption of channel
channel = connection.channel()
channel.basic_consume(queue="rasa_events", on_message_callback=_callback, auto_ack=True)
channel.start_consuming()

Sending Events to Multiple Queues

You can specify multiple event queues to publish events to. This should work for all event brokers supported by Pika (e.g. RabbitMQ)

SQL Event Broker

It is possible to use an SQL database as an event broker. Connections to databases are established using SQLAlchemy, a Python library which can interact with many different types of SQL databases, such as SQLite, PostgreSQL and more. The default Rasa installation allows connections to SQLite and PostgreSQL databases. To see other options, please see the SQLAlchemy documentation on SQL dialects.

To set up Rasa with SQL event broker the following steps are required:

  1. Add required configuration to your endpoints.yml

    When using SQLite:

    endpoints.yml
    event_broker:
    type: SQL
    dialect: sqlite
    db: events.db

    When using PostgreSQL:

    endpoints.yml
    event_broker:
    type: SQL
    url: 127.0.0.1
    port: 5432
    dialect: postgresql
    username: myuser
    password: mypassword
    db: mydatabase
  2. To start the Rasa server using your SQL backend, add the --endpoints flag, e.g.:

    rasa run -m models --endpoints endpoints.yml

FileEventBroker

It is possible to use the FileEventBroker as an event broker. This implementation will log events to a file in json format.

You can provide a path key in the endpoints.yml file if you wish to override the default file name: rasa_event.log.

Custom Event Broker

If you need an event broker which is not available out of the box, you can implement your own. This is done by extending the base class EventBroker.

Your custom event broker class must also implement the following base class methods:

To set up Rasa with your custom event broker the following steps are required:

  1. Add required configuration to your endpoints.yml

    endpoints.yml
    event_broker:
    type: path.to.your.module.Class
    url: localhost
    a_parameter: a value
    another_parameter: another value
  2. To start the Rasa server using your custom backend, add the --endpoints flag, e.g.:

    rasa run -m models --endpoints endpoints.yml