[Solved] How to implement our own custom connector for the custom UI

Can you show me what have you done specifically in a credentials.yml annd connector file.

credentials.yml:

rest:

rasa:
  url: "http://localhost:5002/api"

channel.RestInput:
  username: ā€œuser_nameā€
  another_parameter: ā€œsome valueā€

channel.py:

from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable
from asyncio import Queue
from sanic.request import Request
from rasa.core.channels import InputChannel
from rasa.core.agent import Agent
from rasa.core.interpreter import RasaNLUInterpreter
from rasa.core.channels.channel import UserMessage, CollectingOutputChannel, QueueOutputChannel
from rasa.utils.endpoints import EndpointConfig
from rasa import utils
#from flask import Blueprint, request, jsonify
from sanic import Blueprint, response
from rasa.model import get_model, get_model_subdirectories
import inspect
from rasa.core.run import configure_app

import logging

logger = logging.getLogger(__name__)


class RestInput(InputChannel):
    #A custom http input channel.
    #This implementation is the basis for a custom implementation of a chat
    #frontend. You can customize this to send messages to Rasa Core and
    #retrieve responses from the agent.

    @classmethod
    def name(cls):
        return "rest"

    @staticmethod
    async def on_message_wrapper(
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        queue: Queue,
        sender_id: Text,
        input_channel: Text,
    ) -> None:
        collector = QueueOutputChannel(queue)
        message = UserMessage(text, collector, sender_id, input_channel=input_channel)
        await on_new_message(message)
        await queue.put("DONE")  # pytype: disable=bad-return-type

    async def _extract_sender(self, req: Request) -> Optional[Text]:
        return req.json.get("sender", None)

    # noinspection PyMethodMayBeStatic
    def _extract_message(self, req: Request) -> Optional[Text]:
        return req.json.get("message", None)

    def _extract_input_channel(self, req: Request) -> Text:
        return req.json.get("input_channel") or self.name()

    def stream_response(
        self,
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        sender_id: Text,
        input_channel: Text,
    ) -> Callable[[Any], Awaitable[None]]:
        async def stream(resp: Any) -> None:
            q = Queue()
            task = asyncio.ensure_future(
                self.on_message_wrapper(
                    on_new_message, text, q, sender_id, input_channel
                )
            )
            result = None  # declare variable up front to avoid pytype error
            while True:
                result = await q.get()
                if result == "DONE":
                    break
                else:
                    await resp.write(json.dumps(result) + "\n")
            await task

        return stream  # pytype: disable=bad-return-type

    def blueprint(self, on_new_message: Callable[[UserMessage], Awaitable[None]]):
        custom_webhook = Blueprint(
            "custom_webhook_{}".format(type(self).__name__),
            inspect.getmodule(self).__name__,
        )
        # noinspection PyUnusedLocal
        @custom_webhook.route("/", methods=["GET"])
        async def health(request: Request):
            return response.json({"status": "ok"})
        @custom_webhook.route("/webhook", methods=["POST"])
        async def receive(request: Request):
            sender_id = await self._extract_sender(request)
            text = self._extract_message(request)
            should_use_stream = rasa.utils.endpoints.bool_arg(
                request, "stream", default=False
            )
            input_channel = self._extract_input_channel(request)

            if should_use_stream:
                return response.stream(
                    self.stream_response(
                        on_new_message, text, sender_id, input_channel
                    ),
                    content_type="text/event-stream",
                )
            else:
                collector = CollectingOutputChannel()
                # noinspection PyBroadException
                try:
                    await on_new_message(
                        UserMessage(
                            text, collector, sender_id, input_channel=input_channel
                        )
                    )
                except CancelledError:
                    logger.error(
                        "Message handling timed out for "
                        "user message '{}'.".format(text)
                    )
                except Exception:
                    logger.exception(
                        "An exception occured while handling "
                        "user message '{}'.".format(text)
                    )
                return response.json(collector.messages)
        return custom_webhook


model_path = get_model("C:/Users/ehumm/Documents/RasaTest3/models")


def run(serve_forever=True):
    _, nlu_model = get_model_subdirectories(model_path)
    interpreter = RasaNLUInterpreter(nlu_model)
    action_endpoint = EndpointConfig(url="http://localhost:5055/webhook")
    agent = Agent.load(model_path, interpreter=interpreter, action_endpoint=action_endpoint)
    input_channel = RestInput()
    if serve_forever:
        configure_app(input_channel)
    return agent


if __name__ == '__main__':
    utils.io.configure_colored_logging(loglevel="INFO")
    run()
1 Like

Hi @Hal,

Can you just try it again by removing below code from your credentials.yml file i.e

rasa:
   url: "http://localhost:5002/api"

let me know if it resolves your issue :slightly_smiling_face:

Unfortunatly it did not help :frowning_face:

Can you try to replace the ā€œrestā€ with something else? I was able to resolve the error ā€œsanic.router.RouteExists: Route already registered: /webhooks/rest/ [GET]ā€ by replacing ā€œrestā€ with ā€œchatroomā€. Hope this helps!

class RestInput(InputChannel):

@classmethod
def name(cls):
    return "chatroom"

Hi Eva, I have similar problem:

ā€œAssertionError: A blueprint with the name ā€œcustom_webhook_RestInputā€ is already registered. Blueprint names must be unique.ā€

Have you found any solutions?

Hi MarcoMc,

I show you how I did it with the help of harisā€™ comments:

file credentials.yml:

rest:

channel.RestInput:

file channel.py:

from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable
from asyncio import Queue, CancelledError
from sanic.request import Request
from rasa.core.channels import InputChannel
from rasa.core.agent import Agent
from rasa.core.interpreter import RasaNLUInterpreter
from rasa.core.channels.channel import UserMessage, CollectingOutputChannel, QueueOutputChannel
from rasa.utils.endpoints import EndpointConfig
from rasa import utils
from sanic import Blueprint, response
from rasa.model import get_model, get_model_subdirectories
import inspect
from rasa.core.run import configure_app
import rasa
import json
import asyncio

import logging

logger = logging.getLogger(__name__)


class RestInput(InputChannel):
    #A custom http input channel.
    #This implementation is the basis for a custom implementation of a chat
    #frontend. You can customize this to send messages to Rasa Core and
    #retrieve responses from the agent.

    @classmethod
    def name(cls):
        return "myrest"

    @staticmethod
    async def on_message_wrapper(
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        queue: Queue,
        sender_id: Text,
        input_channel: Text,
    ) -> None:
        collector = QueueOutputChannel(queue)
        message = UserMessage(text, collector, sender_id, input_channel=input_channel)
        await on_new_message(message)
        await queue.put("DONE")  # pytype: disable=bad-return-type

    async def _extract_sender(self, req: Request) -> Optional[Text]:
        return req.json.get("sender", None)

    # noinspection PyMethodMayBeStatic
    def _extract_message(self, req: Request) -> Optional[Text]:
        return req.json.get("message", None)

    def _extract_input_channel(self, req: Request) -> Text:
        return req.json.get("input_channel") or self.name()

    def stream_response(
        self,
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        sender_id: Text,
        input_channel: Text,
    ) -> Callable[[Any], Awaitable[None]]:
        async def stream(resp: Any) -> None:
            q = Queue()
            task = asyncio.ensure_future(
                self.on_message_wrapper(
                    on_new_message, text, q, sender_id, input_channel
                )
            )
            result = None  # declare variable up front to avoid pytype error
            while True:
                result = await q.get()
                if result == "DONE":
                    break
                else:
                    await resp.write(json.dumps(result) + "\n")
            await task
        return stream  # pytype: disable=bad-return-type

    def blueprint(self, on_new_message: Callable[[UserMessage], Awaitable[None]]):
        custom_webhook = Blueprint(
            "mycustom_webhook_{}".format(type(self).__name__),
            inspect.getmodule(self).__name__,
        )
        @custom_webhook.route("/", methods=["GET"])
        async def health(request: Request):
            return response.json({"status": "ok"})
        @custom_webhook.route("/webhook", methods=["POST"])
        async def receive(request: Request):
            sender_id = await self._extract_sender(request)
            text = self._extract_message(request)
            should_use_stream = rasa.utils.endpoints.bool_arg(
                request, "stream", default=False
            )
            input_channel = self._extract_input_channel(request)

            if should_use_stream:
                return response.stream(
                    self.stream_response(
                        on_new_message, text, sender_id, input_channel
                    ),
                    content_type="text/event-stream",
                )
            else:
                collector = CollectingOutputChannel()
                # noinspection PyBroadException
                try:
                    await on_new_message(
                        UserMessage(
                            text, collector, sender_id, input_channel=input_channel
                        )
                    )
                except CancelledError:
                    logger.error(
                        "Message handling timed out for "
                        "user message '{}'.".format(text)
                    )
                except Exception:
                    logger.exception(
                        "An exception occured while handling "
                        "user message '{}'.".format(text)
                    )
                return response.json(collector.messages)
        return custom_webhook


model_path = get_model("Insert/path/to/your/Rasa-Project/models")


def run(serve_forever=True):
    _, nlu_model = get_model_subdirectories(model_path)
    interpreter = RasaNLUInterpreter(nlu_model)
    action_endpoint = EndpointConfig(url="http://localhost:5055/webhook")
    agent = Agent.load(model_path, interpreter=interpreter, action_endpoint=action_endpoint)
    input_channel = RestInput()
    if serve_forever:
        configure_app(input_channel)
    return agent


if __name__ == '__main__':
    utils.io.configure_colored_logging(loglevel="INFO")
    run()

Any tips on how to solve this?

Exception: Failed to find input channel class for ā€˜socketio_connector.SocketIOInputā€™. Unknown input channel. Check your credentials configuration to make sure the mentioned channel is not misspelled. If you are creating your own channel, make sure it is a proper name of a class in a module.

I added the path variable, but it doesnā€™t seem to be able to find the file. I am using rasa 1.0.9

In that MyIo file, change the class name to something like RestNew and then in credentials file also change it to MyIo.RestNew: .

In MyIo file, @classmethod def name(cls): print(ā€œhi from name methodā€) return ā€œrestnewā€

and use the same name in the uri as well like http://localhost:5005/webhooks/restnew/webhooks

I feel like that should work.

Hi,

I implemented same steps but I am receiving error

Exception: Failed to find input channel class ā€œcustom_channel.RasaInputā€. Unkown Input channel

My custom connector is written in file custom_channel.py and class name is RasaInput.

and my credentials.yml looks like below:

custom_channel.RasaInput:
     username : "abc"

Have you done this step PYTHONPATH=ā€œpath to your custom connector fileā€.

Just check it out.

I triedā€¦but same error

Hmm okay waitā€¦

Yes, it was the python path variable. Suddenly it started working. I donā€™t reboot often and maybe it was because of this. Or I just added it good last time I tried.

So how do your deploy your custom connector to a production environment?

credentials.yml support something like:
<DIR>.<DIR>.<FILENAME>.<NAME-OF-INPUT-CLASS>

You can can store the channel files wherever you want

Hi all,

I have created custom channel in local environment its working fine(ubuntu, apache).

but in production mode(docker, nginx) my custom channel is not working, getting module not found error

  • my project installation path is /etc/rasa
  • custom connector file is located in: /etc/rasa/MyIo.py
  • I set pythonenv for custom connecter in Dockerfile

MyIo.py (4.9 KB) credentials.yml (55 Bytes) Dockerfile (766 Bytes)

Please, can anyone help me to fix this issue?

Did you find a solution to this? I am having the same issue.

@prashant_kamble can you please help to know how to perform Also set environment variable path for your custom connector file like below

PYTHONPATH=ā€œpath to your custom connector fileā€

Is there ay command?

Hi Prashant,

Firstly I would like to thank you for your effort. I have already worked with a custom API by following your steps, it worked very well for me. Today all of a sudden without any modification I encountered this error :

RasaException: Failed to find input channel class for 'CustomApiRest.RestInput'. Unknown input channel. Check your credentials configuration to make sure the mentioned channel is not misspelled. If you are creating your own channel, make sure it is a proper name of a class in a module.

My rasa is run in a docker container via docker-compose.

I am counting on your help please, because I canā€™t find the solution elsewhere.

Thank you!