Link up Custom input Channel with a custom Output Channel

I have created a custom Input channel known as “desktop”. I have got the http url as ‘localhost:50055/webhooks/desktop/webhook’. The url is also working fine.

My issue is that in the following domain file specification, I am not getting the output ‘Hi desktop user’ every time. The output is a random one between the two text messages.

utter_greet:

- text: 'Hey there'
- text: 'Hi desktop user'
  channel: "desktop"

Does rasa look for input or output channel in the domain file and if it looks for output channel , how to create a custom output channel. Like, can i just create a class “desktop” inheriting OutputChannel class and run my code ??.

Please provide suggestions.

Hi @suryavamsi62, you cannot specify output channels in the domain. You can set the output channel in your UserMessage though: Custom Connectors.init

You would have to adapt your custom input channel and set the output_channel accordingly when you create your UserMessage() (see for instance this example).

Hi @ricwo, As per your info, I have changed my Input Channel as follows.

class DesktopInput(InputChannel):
    
    @classmethod
    def name(cls):
       # print('Inside the desktop Channel')
        return "desktop"

    @staticmethod
    async def on_message_wrapper(
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        queue: Queue,
        sender_id: Text,
        input_channel: Text,
        metadata: Dict,
    ) -> None:
        collector = QueueOutputChannel(queue)
        output_channel = DesktopOutputChannel()
       # print('Input meta data is: ', metadata)
        message = UserMessage(text, sender_id = sender_id, input_channel=input_channel, output_channel=collector, metadata=metadata,)
        await on_new_message(message)

        await queue.put("DONE")  # pytype: disable=bad-return-type

    async def _extract_sender(self, req: Request) -> Optional[Text]:
        return req.json.get("sender", None)

    # noinspection PyMethodMayBeStatic
    def _extract_message(self, req: Request) -> Optional[Text]:
        return req.json.get("message", None)

    def _extract_input_channel(self, req: Request) -> Text:
        return req.json.get("input_channel") or self.name()

    def _extract_metadata(self, req: Request) -> Text:
        return req.json.get("metadata",None)


    def stream_response(
        self,
        on_new_message: Callable[[UserMessage], Awaitable[None]],
        text: Text,
        sender_id: Text,
        input_channel: Text,
        metadata: Dict,
    ) -> Callable[[Any], Awaitable[None]]:
        async def stream(resp: Any) -> None:
            q = Queue()
            output = DesktopOutputChannel()
            task = asyncio.ensure_future(
                self.on_message_wrapper(
                    on_new_message, text, q, sender_id, input_channel, metadata 
                )
            )
            result = None  # declare variable up front to avoid pytype error
            while True:
                result = await q.get()
                if result == "DONE":
                    break
                else:
                    await resp.write(json.dumps(result) + "\n")
            await task

        return stream  # pytype: disable=bad-return-type

    def blueprint(self, on_new_message: Callable[[UserMessage], Awaitable[None]]):
        custom_webhook = Blueprint(
            "custom_webhook_{}".format(type(self).__name__),
            inspect.getmodule(self).__name__,
        )

        # noinspection PyUnusedLocal
        @custom_webhook.route("/", methods=["GET"])
        async def health(request: Request):
            return response.json({"status": "ok"})

        @custom_webhook.route("/webhook", methods=["POST"])
        async def receive(request: Request):
            sender_id = await self._extract_sender(request)
            text = self._extract_message(request)
            metadata = self._extract_metadata(request)
            should_use_stream = rasa.utils.endpoints.bool_arg(
                request, "stream", default=False
            )
            input_channel = self._extract_input_channel(request)
            if should_use_stream:
               # print("Inside the should stream of Input Channel")
                return response.stream(
                    self.stream_response(
                        on_new_message, text, sender_id, input_channel, metadata
                    ),
                    content_type="text/event-stream",
                )
            else:
               # print("Inside the else of stream of Input Channel")
                collector = DesktopOutputChannel()
                # noinspection PyBroadException
                try:
                    await on_new_message(
                        UserMessage(
                            text, sender_id=sender_id, input_channel=input_channel, output_channel=collector,metadata=metadata
                        )
                    )
                except CancelledError:
                    logger.error(
                        "Message handling timed out for "
                        "user message '{}'.".format(text)
                    )
                except Exception:
                    logger.exception(
                        "An exception occured while handling "
                        "user message '{}'.".format(text)
                    )
               # print('Final Receive Endpoint result is: ', collector.messages)
                return response.json(collector.messages)

        return custom_webhook

    def get_output_channel(self) -> OutputChannel:
       # print('Getting the Output channel ')
        return DesktopOutputChannel() 

In the above code , under on_message_wrapper function, should I send the collector or output_channel to UserMessage output_channel attribuite? …I couldnt understand the same with stream_response where , I am sending q instead of output variable.

My desktop output Channel is:

class DesktopOutputChannel(CollectingOutputChannel):
    """Output channel base class.

    Provides sane implementation of the send methods
    for text only output channels."""

    @classmethod
    def name(cls):
        """Every output channel needs a name to identify it."""
       # print('Inside the desktop Output Channel')
        return "desktop"

After specifying the above changes, when i specify channel as desktop in domain, I am able to get the specific desktop response.

 utter_greet:
    - text: 'Hey there, welcome
    - text: 'Hey there desktop user, welcome
      channel: "desktop"

Am I making any mistake in the above channel creation. Just want to check the efficiency of the code.

Hi @suryavamsi62, would you mind clarifying what you’re trying to achieve:

  1. Determine the output channel on a per-message basis
  2. Given an output channel that you’ve set for all incoming messages, make different utterances depending on what that output channel is (already supported)

I don’t follow the code snippet you posted, as you still assign the QueueOutputChannel()

Hi @ricwo, I need to implement both. I am currently having two input channels.

1.Rest Channel-- For this channel the output channel is CollectingOutputChannel.

  1. Desktop Channel – For this channel I created a new output channel DesktopOutputChannel which inherits the CollectingOutputChannel.

Now I need the assigned output channel to work per message basis for that input channel.

Also I need to send channel specific utterances for DesktopChannel and want to specify within my domain file.

Can you help me with the InputChannel code for the DesktopChannel implementation to achieve the above things.

Thanks for your response.

I see. You’ll have to extract the desired output channel from your incoming message. The easiest way is probably to attach a metadata field to your user message which you can access in your webhook. Something like:

# needs to be implemented - easiest way is probably to extract `metadata` key from `request.json`
metadata = self.get_metadata(request)

requested_output_channel = metadata.get("output_channel")


# now assign the output channel based on that metadata field
if requested_output_channel == "desktop":
  output_channel = DesktopOutputChannel()
else:
  output_channel = CollectingOutputChannel()

user_message = UserMessage(text,
                           output_channel,
                           sender_id,
                           input_channel=input_channel,
                           metadata=metadata)

# use this user_message in the rest of the webhook, i.e. in on_new_message()

I hope that helps!