RASA Custom Connector keep giving custom response

Hello Everyone,

I have been trying to use rasa Custom Connectors mentioned here

I m using the name method to connect.

My domain.yml file looks like this:

  utter_greet:
  - text: default greetings
    channel: "testchannel"
  - text: custom greetings

My credentials.yml contains the connectors as :

connectors.custom_channel.TestChannel:

Where custom_channel.py is in inside connectors directory and connectors directory is inside the root directory. directory structure:

- data
  -- nlu.yml
  -- stories.yml
  -- rules.yml
- actions
  -- __init__.py
  -- actions.py
- models
- connectors
  -- custom_channel.py
- domain.yml
- config.yml
- credentials.yml
- endpoints.yml

My custom_channel.py contains following code :

from rasa.core.channels.channel import InputChannel
from typing import Any, Text, Dict, List
class TestChannel(InputChannel):
    def name() -> Text:
        """Name of your custom channel."""
        return "testchannel"

But when I run following command :

rasa run -m models --enable-api --cors "*" --credentials credentials.yml --debug

i m getting this error :

Traceback (most recent call last):
  File "c:\users\athena\anaconda3\envs\installingrasa\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "c:\users\athena\anaconda3\envs\installingrasa\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\Users\Athena\anaconda3\envs\installingrasa\Scripts\rasa.exe\__main__.py", line 7, in <module>
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\__main__.py", line 116, in main
    cmdline_arguments.func(cmdline_arguments)
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\cli\run.py", line 90, in run
    rasa.run(**vars(args))
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\run.py", line 57, in run
    **kwargs,
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\run.py", line 174, in serve_application
    input_channels = create_http_input_channels(channel, credentials)
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\run.py", line 53, in create_http_input_channels
    return [_create_single_channel(c, k) for c, k in all_credentials.items()]
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\run.py", line 53, in <listcomp>
    return [_create_single_channel(c, k) for c, k in all_credentials.items()]
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\run.py", line 65, in _create_single_channel
    channel
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\shared\utils\common.py", line 37, in class_from_module_path
    m = importlib.import_module(module_name)
  File "c:\users\athena\anaconda3\envs\installingrasa\lib\importlib\__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "C:\Users\Athena\Datascience\rasa\Venkat\moodbot\connector\MyIO.py", line 3, in <module>
    class TestChannel(InputChannel):
  File "C:\Users\Athena\Datascience\rasa\Venkat\moodbot\connector\MyIO.py", line 4, in TestChannel
    def name() -> Text:
NameError: name 'Text' is not defined

I also tried using the blueprint method

I changed the python code as follows in custom_channel.py :

import logging
import uuid
import inspect
import rasa
from sanic import Blueprint, response
from sanic.request import Request
from socketio import AsyncServer
from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable
from asyncio import Queue, CancelledError
from rasa.core.channels import InputChannel
from rasa.core.channels.channel import UserMessage, OutputChannel, CollectingOutputChannel

logger = logging.getLogger(__name__)

class TestChannel(InputChannel):
    """A custom http input channel.

    This implementation is the basis for a custom implementation of a chat
    frontend. You can customize this to send messages to Rasa Core and
    retrieve responses from the agent."""

    @classmethod
    def name(cls):
        print("hi")
        return "testchannel"

    @staticmethod
    async def on_message_wrapper(
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        queue: Queue,
        sender_id: Text,
    ) -> None:

        print("Inside on_message_wrapper function")
        collector = QueueOutputChannel(queue)

        message = UserMessage(
            text, collector, sender_id, input_channel=RestInput.name()
        )
        await on_new_message(message)

        await queue.put("DONE")  # pytype: disable=bad-return-type

    async def _extract_sender(self, req) -> Optional[Text]:
        return req.json.get("sender", None)

    # noinspection PyMethodMayBeStatic
    def _extract_message(self, req):
        return req.json.get("message", None)

    def stream_response(
        self,
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        sender_id: Text,
    ) -> Callable[[Any], Awaitable[None]]:
        async def stream(resp: Any) -> None:
            q = Queue()
            task = asyncio.ensure_future(
                self.on_message_wrapper(on_new_message, text, q, sender_id)
            )
            while True:
                result = await q.get()  # pytype: disable=bad-return-type
                if result == "DONE":
                    break
                else:
                    await resp.write(json.dumps(result) + "\n")
            await task

        return stream  # pytype: disable=bad-return-type

    def blueprint(self, on_new_message: Callable[[UserMessage], Awaitable[None]]):
        custom_webhook = Blueprint(
            "custom_webhook_{}".format(type(self).__name__),
            inspect.getmodule(self).__name__,
        )

        # noinspection PyUnusedLocal
        @custom_webhook.route("/", methods=["GET"])
        async def health(request: Request):
            return response.json({"status": "ok"})

        @custom_webhook.route("/webhook", methods=["POST"])
        async def receive(request: Request):
            sender_id = await self._extract_sender(request)
            text = self._extract_message(request)
            should_use_stream = rasa.utils.endpoints.bool_arg(
                request, "stream", default=False
            )

            if should_use_stream:
                return response.stream(
                    self.stream_response(on_new_message, text, sender_id),
                     content_type="text/event-stream",
                    
                )
            else:
                collector = CollectingOutputChannel()
                print("collector MSG::",collector.messages)
                # noinspection PyBroadException
                try:
                    await on_new_message(
                        UserMessage(
                            text, collector, sender_id, input_channel=self.name()
                        )
                    )
                except CancelledError:
                    logger.error(
                        "Message handling timed out for "
                        "user message '{}'.".format(text)
                    )
                except Exception:
                    logger.exception(
                        "An exception occured while handling "
                        "user message '{}'.".format(text)
                    )
                return response.json(collector.messages)
                

        return custom_webhook

and didn’t changed anything in credentials and domain file.

I then run the command

rasa run -m models --enable-api --cors “*” --credentials credentials.yml --debug

The server was up and running but I did get custom response in the postman The endpoint I used :

HTTP://localhost:5005/webhooks/testchannel/webhook

It starts giving me custom response :

custom greetings

but when I tried using the default rest endpoint (not custom connector) i again got custom response not the default response

custom greetings

I should have given this response instead:

default greetings

i even used the socketio to connect using @JiteshGaikwad widget

I still got custom response.

Please help Thanks.

Hello @erohmensing can anyone help me out here. please.

Looking at this error makes me think that your issue is related to an import missing. Are you 100% sure it is imported in MyIO.py? You’re looking for this line:

from typing import Text

Hello @koaning , i did try this also and I got this error

Traceback (most recent call last):
  File "c:\users\athena\anaconda3\envs\installingrasa\lib\runpy.py", line 193, in _run_module_as_main
2021-03-25 02:56:36     "__main__", mod_spec)
DEBUG     File "c:\users\athena\anaconda3\envs\installingrasa\lib\runpy.py", line 85, in _run_code
 urllib3.connectionpool      - Starting new HTTPS connection (1): o251570.ingest.sentry.io:443
exec(code, run_globals)
  File "C:\Users\Athena\anaconda3\envs\installingrasa\Scripts\rasa.exe\__main__.py", line 7, in <module>
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\__main__.py", line 116, in main
    cmdline_arguments.func(cmdline_arguments)
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\cli\run.py", line 90, in run
    rasa.run(**vars(args))
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\run.py", line 57, in run
    **kwargs,
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\run.py", line 187, in serve_application
    conversation_id=conversation_id,
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\run.py", line 116, in configure_app
    channels.channel.register(input_channels, app, route=route)
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\channels\channel.py", line 89, in register
    p = urljoin(route, channel.url_prefix())
  File "C:\Users\Athena\AppData\Roaming\Python\Python37\site-packages\rasa\core\channels\channel.py", line 108, in url_prefix
    return self.name()
TypeError: name() takes 0 positional arguments but 1 was given
2021-03-25 02:56:37 DEBUG    urllib3.connectionpool  - https://o251570.ingest.sentry.io:443 "POST /api/2801673/store/ HTTP/1.1" 200 41

Hello Everyone,

I figured out a way myself by seeing the code mentioned at rasa rest

Here is the code to perform custom connectors completely working with default rest input and custom connections as well

Custom Connector code :

import asyncio
import inspect
from sanic import Sanic, Blueprint, response
from sanic.request import Request
from sanic.response import HTTPResponse
from typing import Text, Dict, Any, Optional, Callable, Awaitable, NoReturn
from asyncio import Queue, CancelledError
import rasa.utils.endpoints
from rasa.core.channels.channel import (
    InputChannel,
    CollectingOutputChannel,
    UserMessage,
)

class MyioInput(InputChannel):
    @classmethod
    def name(cls) -> Text:
        return "myio"


    @staticmethod
    async def on_message_wrapper(
        on_new_message: Callable[[UserMessage], Awaitable[Any]],
        text: Text,
        queue: Queue,
        sender_id: Text,
        input_channel: Text,
        metadata: Optional[Dict[Text, Any]],
    ) -> None:
        collector = QueueOutputChannel(queue)

        message = UserMessage(
            text, collector, sender_id, input_channel=input_channel, metadata=metadata
        )
        await on_new_message(message)

        await queue.put("DONE")

    async def _extract_sender(self, req: Request) -> Optional[Text]:
        return req.json.get("sender", None)

    # noinspection PyMethodMayBeStatic

    def _extract_message(self, req: Request) -> Optional[Text]:
        return req.json.get("message", None)

    def _extract_input_channel(self, req: Request) -> Text:
        return req.json.get("input_channel") or self.name()

    def stream_response(
        self,
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        sender_id: Text,
        input_channel: Text,
        metadata: Optional[Dict[Text, Any]],
    ) -> Callable[[Any], Awaitable[None]]:
        async def stream(resp: Any) -> None:
            q = Queue()
            task = asyncio.ensure_future(
                self.on_message_wrapper(
                    on_new_message, text, q, sender_id, input_channel, metadata
                )
            )
            while True:
                result = await q.get()
                if result == "DONE":
                    break
                else:
                    await resp.write(json.dumps(result) + "\n")
            await task

        return stream



    def blueprint(
        self, on_new_message: Callable[[UserMessage], Awaitable[None]]
    ) -> Blueprint:
        custom_webhook = Blueprint(
            "custom_webhook_{}".format(type(self).__name__),
            inspect.getmodule(self).__name__,
        )

        # noinspection PyUnusedLocal
        @custom_webhook.route("/", methods=["GET"])
        async def health(request: Request) -> HTTPResponse:
            return response.json({"status": "ok"})

        @custom_webhook.route("/webhook", methods=["POST"])
        async def receive(request: Request) -> HTTPResponse:
            sender_id = await self._extract_sender(request)
            text = self._extract_message(request)
            should_use_stream = rasa.utils.endpoints.bool_arg(
                request, "stream", default=False
            )
            input_channel = self._extract_input_channel(request)
            metadata = self.get_metadata(request)

            if should_use_stream:
                return response.stream(
                    self.stream_response(
                        on_new_message, text, sender_id, input_channel, metadata
                    ),
                    content_type="text/event-stream",
                )
            else:
                collector = MyioOutput()
                # noinspection PyBroadException
                try:
                    await on_new_message(
                        UserMessage(
                            text,
                            collector,
                            sender_id,
                            input_channel=input_channel,
                            metadata=metadata,
                        )
                    )
                except CancelledError:
                    logger.error(
                        f"Message handling timed out for " f"user message '{text}'."
                    )
                except Exception:
                    logger.exception(
                        f"An exception occured while handling "
                        f"user message '{text}'."
                    )
                return response.json(collector.messages)

        return custom_webhook


class MyioOutput(CollectingOutputChannel):
    @classmethod
    def name(cls) -> Text:
        return "myio"

My code is stored in this directory structure :

- data
  -- nlu.yml
  -- stories.yml
  -- rules.yml
- actions
  -- __init__.py
  -- actions.py
- models
- connectors
  -- custom_channel.py
- domain.yml
- config.yml
- credentials.yml
- endpoints.yml

Hence My credentials.yml contains the connectors as :

connector.MyIO.MyioInput:

My domain.yml file looks like this:

  utter_greet:
  - text: default greetings
  - text: custom greetings
    channel: "myio"

Lastly, train the model using rasa train and then run the following command to start the rasa server

rasa run -m models --enable-api --cors "*" --credentials credentials.yml --debug

And the run this endpoint to get default response :

http://localhost:5005/webhooks/rest/webhook

And to test your custom connector myio custom response, hit this endpoint :

http://localhost:5005/webhooks/myio/webhook

Thanks, Everyone for help.

5 Likes

The solution offered by @athenasaurav seems to work great! Is it possible to update the docs, since the code in the docs throws the same error shown above:

I was putting messege from custom channel which is whatsapp I got this exception How can handle this exception: 2021-04-06 08:52:35 ERROR addons.custom_channel - An exception occured while handling user message ‘{‘text’: ‘Hi’, ‘media’: {‘mediaUri’: ‘’, ‘contentType’: ‘’, ‘title’: ‘’}, ‘custom’: {}}’. Traceback (most recent call last): File “C:\Users\tendaji-adm\Desktop\Chatbot\addons\custom_channel.py”, line 126, in receive metadata=metadata, File “c:\users\tendaji-adm\appdata\local\programs\python\python37\lib\site-packages\rasa\core\channels\channel.py”, line 58, in init self.text = text.strip() if text else text AttributeError: ‘dict’ object has no attribute ‘strip’

Hello @shubhampawar18 can you please repost the error message by formatting it correctly.

Also have you written any custom connector for whatsapp, or you are using default twillio integration?

No I am not using Twilio . Yes write custom connector for WhatsApp.

I got exception for message coming from WhatsApp.

2021-04-06 11:02:51 ERROR addons.custom_channel - An exception occured while handling user message ‘{‘text’: ‘Hi’, ‘media’: {‘mediaUri’: ‘’, ‘contentType’: ‘’, ‘title’: ‘’}, ‘custom’: {}}’.

Traceback (most recent call last): File “C:\Users\tendaji-adm\Desktop\Chatbot\addons\custom_channel.py”, line 126, in receive metadata=metadata,

File “c:\users\tendaji-adm\appdata\local\programs\python\python37\lib\site-packages\rasa\core\channels\channel.py”, line 58, in init self.text = text.strip() if text else text

AttributeError: ‘dict’ object has no attribute ‘strip’

Please share your code for custom connector for whatsapp. I dont think this is the code mentioned here

This is my code for custom channel :-

import asyncio import inspect import json import logging from asyncio import Queue, CancelledError from sanic import Sanic, Blueprint, response from sanic.request import Request from sanic.response import HTTPResponse from typing import Text, Dict, Any, Optional, Callable, Awaitable, NoReturn

import rasa.utils.endpoints from rasa.core.channels.channel import ( InputChannel, CollectingOutputChannel, UserMessage, )

logger = logging.getLogger(name)

class WhatsappInput(InputChannel): “”“A custom http input channel. This implementation is the basis for a custom implementation of a chat frontend. You can customize this to send messages to Rasa and retrieve responses from the assistant.”""

@classmethod
def name(cls) -> Text:
    return "whatsapp"

@staticmethod
async def on_message_wrapper(
    on_new_message: Callable[[UserMessage], Awaitable[Any]],
    text: Text,
    queue: Queue,
    sender_id: Text,
    input_channel: Text,
    metadata: Optional[Dict[Text, Any]],
) -> None:
    collector = QueueOutputChannel(queue)

    message = UserMessage(
        text, collector, sender_id, input_channel=input_channel, metadata=metadata
    )
    await on_new_message(message)

    await queue.put("DONE")

async def _extract_sender(self, req: Request) -> Optional[Text]:
    return req.json.get("sender", None)

# noinspection PyMethodMayBeStatic
def _extract_message(self, req: Request) -> Optional[Text]:
    return req.json.get("message", None)

def _extract_input_channel(self, req: Request) -> Text:
    return req.json.get("input_channel") or self.name()

def stream_response(
    self,
    on_new_message: Callable[[UserMessage], Awaitable[None]],
    text: Text,
    sender_id: Text,
    input_channel: Text,
    metadata: Optional[Dict[Text, Any]],
) -> Callable[[Any], Awaitable[None]]:
    async def stream(resp: Any) -> None:
        q = Queue()
        task = asyncio.ensure_future(
            self.on_message_wrapper(
                on_new_message, text, q, sender_id, input_channel, metadata
            )
        )
        while True:
            result = await q.get()
            if result == "DONE":
                break
            else:
                await resp.write(json.dumps(result) + "\n")
        await task

    return stream

def blueprint(
    self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
    custom_webhook = Blueprint(
        "custom_webhook_{}".format(type(self).__name__),
        inspect.getmodule(self).__name__,
    )

    # noinspection PyUnusedLocal
    @custom_webhook.route("/", methods=["GET"])
    async def health(request: Request) -> HTTPResponse:
        return response.json({"status": "ok"})

    @custom_webhook.route("/webhook", methods=["POST"])
    async def receive(request: Request) -> HTTPResponse:
        sender_id = await self._extract_sender(request)
        text = self._extract_message(request)
        should_use_stream = rasa.utils.endpoints.bool_arg(
            request, "stream", default=False
        )
        input_channel = self._extract_input_channel(request)
        metadata = self.get_metadata(request)

        if should_use_stream:
            return response.stream(
                self.stream_response(
                    on_new_message, text, sender_id, input_channel, metadata
                ),
                content_type="text/event-stream",
            )
        else:
            collector = CollectingOutputChannel()
            # noinspection PyBroadException
            try:
                await on_new_message(
                    UserMessage(
                        text,
                        collector,
                        sender_id,
                        input_channel=input_channel,
                        metadata=metadata,
                    )
                )
            except CancelledError:
                logger.error(
                    f"Message handling timed out for " f"user message '{text}'."
                )
            except Exception:
                logger.exception(
                    f"An exception occured while handling "
                    f"user message '{text}'."
                )
            return response.json(collector.messages)

    return custom_webhook

class WhatsappOutput(CollectingOutputChannel): @classmethod def name(cls) → Text: return “whatsapp”

This is error while message passing.

You save me. Thank you your solution worked for me.

This is because you did not pass self keyword for def name() method it is supposed to be def name(self) after changing to this it worked for me

1 Like