Hi,
I am trying to get a custom connector up and running but so far it is not working. This is my custom connector file (custom.py):
import asyncio
import inspect
import json
import logging
from asyncio import Queue, CancelledError
from sanic import Sanic, Blueprint, response
from sanic.request import Request
from sanic.response import HTTPResponse
from typing import Text, Dict, Any, Optional, Callable, Awaitable, NoReturn
import rasa.utils.endpoints
from rasa.core.channels.channel import (
InputChannel,
CollectingOutputChannel,
UserMessage,
)
logger = logging.getLogger(__name__)
class RestInput(InputChannel):
"""A custom http input channel.
This implementation is the basis for a custom implementation of a chat
frontend. You can customize this to send messages to Rasa and
retrieve responses from the assistant."""
@classmethod
def name(cls) -> Text:
return "custom"
@staticmethod
async def on_message_wrapper(
on_new_message: Callable[[UserMessage], Awaitable[Any]],
text: Text,
queue: Queue,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> None:
collector = QueueOutputChannel(queue)
message = UserMessage(
text, collector, sender_id, input_channel=input_channel, metadata=metadata
)
await on_new_message(message)
await queue.put("DONE")
async def _extract_sender(self, req: Request) -> Optional[Text]:
return req.json.get("sender", None)
# noinspection PyMethodMayBeStatic
def _extract_message(self, req: Request) -> Optional[Text]:
return req.json.get("message", None)
def _extract_metadata(self, req: Request) -> Optional[Dict[Text, Any]]:
return req.json.get("metadata", None)
def _extract_input_channel(self, req: Request) -> Text:
return req.json.get("input_channel") or self.name()
def stream_response(
self,
on_new_message: Callable[[UserMessage], Awaitable[None]],
text: Text,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> Callable[[Any], Awaitable[None]]:
async def stream(resp: Any) -> None:
q = Queue()
task = asyncio.ensure_future(
self.on_message_wrapper(
on_new_message, text, q, sender_id, input_channel, metadata
)
)
while True:
result = await q.get()
if result == "DONE":
break
else:
await resp.write(json.dumps(result) + "\n")
await task
return stream
def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
custom_webhook = Blueprint(
"custom_webhook_{}".format(type(self).__name__),
inspect.getmodule(self).__name__,
)
# noinspection PyUnusedLocal
@custom_webhook.route("/", methods=["GET"])
async def health(request: Request) -> HTTPResponse:
return response.json({"status": "ok"})
@custom_webhook.route("/webhook", methods=["POST"])
async def receive(request: Request) -> HTTPResponse:
sender_id = await self._extract_sender(request)
text = self._extract_message(request)
should_use_stream = rasa.utils.endpoints.bool_arg(
request, "stream", default=False
)
input_channel = self._extract_input_channel(request)
metadata = self._extract_metadata(request)
if should_use_stream:
return response.stream(
self.stream_response(
on_new_message, text, sender_id, input_channel, metadata
),
content_type="text/event-stream",
)
else:
collector = CollectingOutputChannel()
# noinspection PyBroadException
try:
await on_new_message(
UserMessage(
text,
collector,
sender_id,
input_channel=input_channel,
metadata=metadata,
)
)
except CancelledError:
logger.error(
f"Message handling timed out for " f"user message '{text}'."
)
except Exception:
logger.exception(
f"An exception occured while handling "
f"user message '{text}'."
)
return response.json(collector.messages)
return custom_webhook
class QueueOutputChannel(CollectingOutputChannel):
"""Output channel that collects send messages in a list
(doesn't send them anywhere, just collects them)."""
@classmethod
def name(cls) -> Text:
return "queue"
# noinspection PyMissingConstructor
def __init__(self, message_queue: Optional[Queue] = None) -> None:
super().__init__()
self.messages = Queue() if not message_queue else message_queue
def latest_output(self) -> NoReturn:
raise NotImplementedError("A queue doesn't allow to peek at messages.")
async def _persist_message(self, message) -> None:
await self.messages.put(message)
This is my Dockerfile that extends Rasa:
# Extend the official Rasa image
FROM rasa/rasa:2.0.2-full
# Change back to root user to install dependencies
USER root
RUN pip install --upgrade pip && \
pip install --no-cache-dir nltk && \
python -m nltk.downloader vader_lexicon -d '/nltk_data' && \
pip install --upgrade spacy && \
python -m spacy download nl_core_news_lg
# Copy components folder to working directory
COPY ./components /app/components
COPY ./components/custom.py /opt/venv/lib/python3.7/site-packages/rasa/core/channels/
# By best practices, don't run the code with root user
USER 1001
ENV PYTHONPATH "${PYTHONPATH}:/app"
These are my values for additionalChannelCredentials:
I get a 404 error when I try to do a POST request with the following link:
http://<IP>:<PORT>/webhooks/custom/webhook with json={"sender": sender, "message": message, "metadata": metadata}
Can somebody explain to me why it is not working?