notice

This is documentation for Rasa Open Source Documentation v2.0.x, which is no longer actively maintained.
For up-to-date documentation, see the latest version (2.1.x).

Version: 2.0.x

Custom Connectors

You can also implement your own custom channel. You can use the rasa.core.channels.rest.RestInput class as a template. The methods you need to implement are blueprint and name. The method needs to create a sanic blueprint that can be attached to a sanic server.

This allows you to add REST endpoints to the server that the external messaging service can call to deliver messages.

Your blueprint should have at least the two routes: health on /, and receive on the HTTP route /webhook.

The name method defines the url prefix. For example, if your component is named myio, the webhook you can use to attach the external service is http://<host>:<port>/webhooks/myio/webhook, replacing the host and port with the appropriate values from your running Rasa X or Rasa Open Source server.

You can POST messages to your custom connector at the url http://<host>:<port>/webhooks/myio/webhook using the following format:

{
"sender": "test_user", // sender ID of the user sending the message
"message": "Hi there!",
"metadata": {} // optional, any extra info you want to add for processing in NLU or custom actions
}

If you need to use extra information from your front end in your custom actions, you can add this information in the metadata dict of your user message. This information will accompany the user message through the rasa server into the action server when applicable, where you can find it stored in the tracker. Message metadata will not directly affect NLU classification or action prediction. If you want to change the way metadata is extracted for an existing channel, you can overwrite the function get_metadata. The return value of this method will be passed to the UserMessage.

In your implementation of the receive endpoint, you will need to call on_new_message(UserMessage(text, output, sender_id)). This will tell Rasa Core to handle this user message. The output is an output channel implementing the OutputChannel class. You can either implement the methods for your particular chat channel (e.g. there are methods to send text and images) or you can use the CollectingOutputChannel to collect the bot responses Core creates while the bot is processing your messages and return them as part of your endpoint response. This is the way the RestInput channel is implemented. For examples on how to create and use your own output channel, take a look at the implementations of the other output channels, e.g. the SlackBot in rasa.core.channels.slack.

To use a custom channel, you need to supply a credentials configuration file credentials.yml with the command line argument --credentials. This credentials file has to contain the module path of your custom channel and any required configuration parameters. For example, this could look like:

mypackage.MyIO:
username: "user_name"
another_parameter: "some value"

Here is an example implementation for an input channel that receives the messages, hands them over to Rasa Core, collects the bot utterances, and returns these bot utterances as the json response to the webhook call that posted the message to the channel:

import asyncio
import inspect
import json
import logging
from asyncio import Queue, CancelledError
from sanic import Sanic, Blueprint, response
from sanic.request import Request
from sanic.response import HTTPResponse
from typing import Text, Dict, Any, Optional, Callable, Awaitable, NoReturn
import rasa.utils.endpoints
from rasa.core.channels.channel import (
InputChannel,
CollectingOutputChannel,
UserMessage,
)
logger = logging.getLogger(__name__)
class RestInput(InputChannel):
"""A custom http input channel.
This implementation is the basis for a custom implementation of a chat
frontend. You can customize this to send messages to Rasa and
retrieve responses from the assistant."""
@classmethod
def name(cls) -> Text:
return "rest"
@staticmethod
async def on_message_wrapper(
on_new_message: Callable[[UserMessage], Awaitable[Any]],
text: Text,
queue: Queue,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> None:
collector = QueueOutputChannel(queue)
message = UserMessage(
text, collector, sender_id, input_channel=input_channel, metadata=metadata
)
await on_new_message(message)
await queue.put("DONE")
async def _extract_sender(self, req: Request) -> Optional[Text]:
return req.json.get("sender", None)
# noinspection PyMethodMayBeStatic
def _extract_message(self, req: Request) -> Optional[Text]:
return req.json.get("message", None)
def _extract_input_channel(self, req: Request) -> Text:
return req.json.get("input_channel") or self.name()
def stream_response(
self,
on_new_message: Callable[[UserMessage], Awaitable[None]],
text: Text,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> Callable[[Any], Awaitable[None]]:
async def stream(resp: Any) -> None:
q = Queue()
task = asyncio.ensure_future(
self.on_message_wrapper(
on_new_message, text, q, sender_id, input_channel, metadata
)
)
while True:
result = await q.get()
if result == "DONE":
break
else:
await resp.write(json.dumps(result) + "\n")
await task
return stream
def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
custom_webhook = Blueprint(
"custom_webhook_{}".format(type(self).__name__),
inspect.getmodule(self).__name__,
)
# noinspection PyUnusedLocal
@custom_webhook.route("/", methods=["GET"])
async def health(request: Request) -> HTTPResponse:
return response.json({"status": "ok"})
@custom_webhook.route("/webhook", methods=["POST"])
async def receive(request: Request) -> HTTPResponse:
sender_id = await self._extract_sender(request)
text = self._extract_message(request)
should_use_stream = rasa.utils.endpoints.bool_arg(
request, "stream", default=False
)
input_channel = self._extract_input_channel(request)
metadata = self.get_metadata(request)
if should_use_stream:
return response.stream(
self.stream_response(
on_new_message, text, sender_id, input_channel, metadata
),
content_type="text/event-stream",
)
else:
collector = CollectingOutputChannel()
# noinspection PyBroadException
try:
await on_new_message(
UserMessage(
text,
collector,
sender_id,
input_channel=input_channel,
metadata=metadata,
)
)
except CancelledError:
logger.error(
f"Message handling timed out for " f"user message '{text}'."
)
except Exception:
logger.exception(
f"An exception occured while handling "
f"user message '{text}'."
)
return response.json(collector.messages)
return custom_webhook
class QueueOutputChannel(CollectingOutputChannel):
"""Output channel that collects send messages in a list
(doesn't send them anywhere, just collects them)."""
@classmethod
def name(cls) -> Text:
return "queue"
# noinspection PyMissingConstructor
def __init__(self, message_queue: Optional[Queue] = None) -> None:
super().__init__()
self.messages = Queue() if not message_queue else message_queue
def latest_output(self) -> NoReturn:
raise NotImplementedError("A queue doesn't allow to peek at messages.")
async def _persist_message(self, message) -> None:
await self.messages.put(message)