Dump rasa response directly in json format via output channel

I have written my custom Input channel while writing my output channel for my custom UI, I was thinking is there any way to dump the bot response directly in JSON format so that I can render it easily at my UI end. Currently the rasa response is mixed JSON and arrray of buttons inside it. What I’m trying to achieve is every rasa response in to custom JSON format for my custom channel.

Is there any way to achieve this?

import os import logging import uuid import inspect import rasa import rasa.utils.endpoints from rasa.cli import utils as cli_utils from rasa.core import utils from sanic import Blueprint, response from sanic.request import Request from socketio import AsyncServer from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable from asyncio import Queue, CancelledError from rasa.core.channels.channel import UserMessage, OutputChannel, CollectingOutputChannel, InputChannel

logger = logging.getLogger(name)

class MyIOOutputChannel(OutputChannel): “”"Output channel that collects send messages in a list

(doesn't send them anywhere, just collects them)."""

def __init__(self) -> None:
    self.messages = []

@classmethod
def name(cls) -> Text:
    print("hi from name method of MyIOOUTPUT channel")
    return "collector"

@staticmethod
def _message(
    recipient_id: Text,
    text: Text = None,
    image: Text = None,
    buttons: Dict[Text, Any] = None,
    attachment: Dict[Text,Any] = None,
    custom: Dict[Text, Any] = None,
) -> Dict:
    """Create a message object that will be stored."""

    obj = {
        "recipient_id": recipient_id,
        "text": text,
        "image": image,
        "buttons": buttons,
        "attachment": attachment,
        "custom": custom,
    }

    # filter out any values that are `None`
    return utils.remove_none_values(obj)

def latest_output(self) -> Optional[Dict[Text, Any]]:
    if self.messages:
        return self.messages[-1]
    else:
        return None

async def _persist_message(self, message: Dict[Text, Any]) -> None:
    self.messages.append(message)  # pytype: disable=bad-return-type

async def send_text_message(
    self, recipient_id: Text, text: Text, **kwargs: Any
) -> None:
    for message_part in text.split("\n\n"):
        await self._persist_message(self._message(recipient_id, text=message_part))

async def send_image_url(
    self, recipient_id: Text, image: Text, **kwargs: Any
) -> None:
    """Sends an image. Default will just post the url as a string."""

    await self._persist_message(self._message(recipient_id, image=image))

async def send_attachment(
    self, recipient_id: Text, attachment: Text, **kwargs: Any
) -> None:
    """Sends an attachment. Default will just post as a string."""

    await self._persist_message(self._message(recipient_id, attachment=attachment))

async def send_text_with_buttons(
    self,
    recipient_id: Text,
    text: Text,
    buttons: Dict[Text, Any],
    **kwargs: Any,
) -> None:
    await self._persist_message(
        self._message(recipient_id, text=text, buttons=buttons)
    )

async def send_custom_json(
    self, recipient_id: Text, json_message: Dict[Text, Any], **kwargs: Any
) -> None:
    await self._persist_message(self._message(recipient_id, custom=json_message))

class MyIOInput(InputChannel): “”"A custom http input channel.

This implementation is the basis for a custom implementation of a chat
frontend. You can customize this to send messages to Rasa Core and
retrieve responses from the agent."""

@classmethod
def name(cls):
    print("hi from name method of MyIO channel")
    return "MyIO"

@staticmethod
async def on_message_wrapper(
    on_new_message: Callable[[UserMessage], Awaitable[None]],
    text: Text,
    queue: Queue,
    sender_id: Text,
) -> None:

    print("Inside on_message_wrapper function")
    collector = QueueOutputChannel(queue)

    message = UserMessage(
        text, collector, sender_id, input_channel=MyIO.name()
    )

    print("above on_new_message method")
    await on_new_message(message)

    await queue.put("DONE")  # pytype: disable=bad-return-type

async def _extract_sender(self, req) -> Optional[Text]:
    return req.json.get("sender", None)

# noinspection PyMethodMayBeStatic
def _extract_message(self, req):
    print("User message ::- ",req.json.get("message", None))
    return req.json.get("message", None)

def stream_response(
    self,
    on_new_message: Callable[[UserMessage], Awaitable[None]],
    text: Text,
    sender_id: Text,
) -> Callable[[Any], Awaitable[None]]:
    async def stream(resp: Any) -> None:
        q = Queue()
        print("the result :: --",q.get())
        task = asyncio.ensure_future(
            self.on_message_wrapper(on_new_message, text, q, sender_id)
        )
        while True:
            result = await q.get()  # pytype: disable=bad-return-type
            
            if result == "DONE":
                break
            else:
                await resp.write(json.dumps(result) + "\n")
        await task

    return stream  # pytype: disable=bad-return-type

def blueprint(self, on_new_message: Callable[[UserMessage], Awaitable[None]]):
    custom_webhook = Blueprint(
        "custom_webhook_{}".format(type(self).__name__),
        inspect.getmodule(self).__name__,
    )

    # noinspection PyUnusedLocal
    @custom_webhook.route("/", methods=["GET"])
    async def health(request: Request):
        print("Inside health")
        return response.json({"status": "ok"})

    @custom_webhook.route("/webhook", methods=["POST"])
    async def receive(request: Request):
        print("Inside receive")
        sender_id = await self._extract_sender(request)
        text = self._extract_message(request)
        print("sender_id is ::-",sender_id)
        print("text is ::-",text)
        should_use_stream = rasa.utils.endpoints.bool_arg(
            request, "stream", default=False
        )

        if should_use_stream:
            return response.stream(
                self.stream_response(on_new_message, text, sender_id),
                 content_type="text/event-stream",
                
            )
        else:
            collector = MyIOOutputChannel()
            on_new_message(UserMessage(text, collector, sender_id))
            print("collector MSG::",collector)
            # noinspection PyBroadException
            try:
                await on_new_message(
                    UserMessage(
                        text, collector, sender_id, input_channel=self.name()
                    )
                )
            except CancelledError:
                logger.error(
                    "Message handling timed out for "
                    "user message '{}'.".format(text)
                )
            except Exception:
                logger.exception(
                    "An exception occured while handling "
                    "user message '{}'.".format(text)
                )
            return response.json(collector.messages)
            

    return custom_webhook

@anuragchaudhary Do you need any additional information in the returned JSON? If not, I would say the best way would be to parse the JSON at your end itself. Let me know if I misunderstood something. Maybe it is clearer with an example?

Yes, I’m trying to add the custom payload as well, so that I can render at my UI. If I try giving a custom payload like this only custom is returned with recipient id, the FB stops recognizing this and my Ui only identifies custom with recipient id.

The solution I’m looking create a single assistant that will run every channel and can have the custom payload too, so that I can render the elements that I’m sending under custom. I also tried the channel-specific utterance but it didn’t work

the current structure of the utterance.

utter_greet:

  • text: “hello from normal utterance” buttons:
    • title: “About Us” payload: ‘/about_us’
    • title: “Contact us” payload: ‘/contact_us’
  • custom: text: “Please provide your details” blocks:
    • type: name text: “Name”
    • type: number text: “Phone”
    • type: email text: “Email”

Hi Anurag, Were you able to resolve it? @dakshvar22 any update on this on your end?