How to create a custom connector in rasa for Viber connectivity

• This is my current custom connector file. the file name is viber.py and I am using rasa 2.8. I created this custom connector file where the credential file is.I am new to rasa and first time making a custom connector.

from http.client import HTTPResponse
from viberbot import Api
from viberbot.api.bot_configuration import BotConfiguration
from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable
from asyncio import Queue
from sanic.request import Request
from rasa.core.channels import InputChannel
from rasa.core.agent import Agent
from rasa.core.channels.channel import UserMessage, CollectingOutputChannel, QueueOutputChannel
from rasa.utils.endpoints import EndpointConfig
from rasa import utils
from flask import Blueprint, request, jsonify
from sanic import Blueprint, response
from rasa.model import get_model, get_model_subdirectories
import inspect
from rasa.core.run import configure_app

bot_configuration = BotConfiguration(
    name='Rasa_demo_one',
    avatar='',
    auth_token='4f831f01ef34ad38-9d057b2fd4ba804-8dd0cf1cdf5e39dc'
)
viber = Api(bot_configuration)
viber.set_webhook('<host doamin>/webhooks/viber/webhook')

class ViberInput(InputChannel):

    @classmethod
    def name(cls) -> Text:
        return "viber"

    @classmethod
    def from_credentials(cls, credentials: Optional[Dict[Text, Any]]) -> InputChannel:
        if not credentials:
            cls.raise_missing_credentials_exception()

        return cls(
            credentials.get("viber_name"),
            credentials.get("avatar"),
            credentials.get("auth_token")
            )

    def __init__(self, viber_name: Text, avatar: Text, auth_token: Text) -> None:
        
        self.viber_name = viber_name
        self.avatar = avatar
        self.auth_token = auth_token
        self.viber = Api(
            BotConfiguration(
                name=self.viber_name,
                avatar=self.avatar,
                auth_token=self.auth_token
                )
        )

    def blueprint(self, on_new_message: Callable[[UserMessage], Awaitable[Any]]) -> Blueprint:
        viber_webhook = Blueprint("viber_webhook", __name__)

        @viber_webhook.route("/", methods=["POST"])
        async def incoming(request: Request) -> HTTPResponse:
            viber_request = self.viber.parse_request(request.get_data())
            if isinstance(viber_request):
                message = viber_request.message
                # lets echo back
                self.viber.send_messages(viber_request.sender.id, [
                    message
                ])
            return response.text("success")

        return viber_webhook

• This is the credential I put in the credentials.yml file

viber.ViberInput:
 viber_name: "Rasa_demo_one"
 avatar: ""
 auth_token: "4f831f01ef34ad38-9d057b2fd4ba804-8dd0cf1cdf5e39dc"

• when I tries to run rasa with this configuration I got an error that says

RasaException: Failed to find input channel class for 'Viber.ViberInput'. Unknown input channel. Check your credentials configuration to make sure the mentioned channel is not misspelled. If you are creating your own channel, make sure it is a proper name of a class in a module.
1 Like

Hi @lahiruDilshan were you able to solve this? I’m getting the same issue