How to put the received metadata in the outputChannel

Hello rasa fans!

I have a customized channel that works like the callback channel . I needed to customize it so I could pass metadata to rasa in the Input. I would like that metadata to be in my responses as well. Is it possible?

This is my current outputchannel to post response in another endpoint:

   class MychannelOutput(CollectingOutputChannel):
    @classmethod
    def name(cls) -> Text:
        return "mychannel"

    def __init__(self, endpoint: EndpointConfig) -> None:
        self.mychannel_endpoint = endpoint
        super().__init__()

    async def _persist_message(self, message: Dict[Text, Any]) -> None:
        await super()._persist_message(message)    

        try:
            await self.mychannel_endpoint.request(
                "post", content_type="application/json", json=message, verify_ssl=False
            )
        except ClientResponseError as e:
            logger.error(
                "Failed to send output message to mychannel. "
                "Status: {} Response: {}"
                "".format(e.status, e.text)
            )

RESUME In summary I would like post for example (it’s ok actually):

{
   "sender": "test_user",
   "message": "Hi!",
   "metadata": {
     "idMessage": "026357c9-7279-4ddb-887f-56e6560fd56c"
   }
}

and the answer to my endpoint would be (missing metadata actually) …

{
    "recipient_id": "test_user", 
     "text": "Hi 👋 \ n I am virtual assistant 🤖."
     "metadata": {
         "idMessage": "026357c9-7279-4ddb-887f-56e6560fd56c"
       }
}
1 Like

Hi, @wladneto !

I’ve been debugging the CollectingOutputChannel class, and managed to reproduce what you need by extending the _message function as follows:

class MyChannelOutput(CollectingOutputChannel):
    @classmethod
    def name(cls) -> Text:
        return "mychannel"

    def __init__(self, custom_metadata: Dict) -> None:
        self.custom_metadata = custom_metadata
        super().__init__()

    def _message(self,
                 recipient_id: Text,
                 text: Text = None,
                 image: Text = None,
                 buttons: List[Dict[Text, Any]] = None,
                 attachment: Text = None,
                 custom: Dict[Text, Any] = None,
                 ) -> Dict:
        """Create a message object that will be stored."""

        obj = {
            "recipient_id": recipient_id,
            "text": text,
            "image": image,
            "buttons": buttons,
            "attachment": attachment,
            "custom": custom,
            "metadata": self.custom_metadata
        }

        # filter out any values that are `None`
        return {k: v for k, v in obj.items() if v is not None}

    async def _persist_message(self, message: Dict[Text, Any]) -> None:
        await super()._persist_message(message)

        try:
            await self.mychannel_endpoint.request(
                "post", content_type="application/json", json=message, verify_ssl=False
            )
        except ClientResponseError as e:
            logger.error(
                "Failed to send output message to mychannel. "
                "Status: {} Response: {}"
                "".format(e.status, e.text)
            )

Note that I am getting custom_metadata on init, which is passed in the get_output_channel function inside the RestInput class, as follows:

class MyChannelInput(InputChannel):
    def name(cls) -> Text:
        """Name of your custom channel."""
        return "mychannel"

    def __init__(self) -> None:
        self.custom_metadata = {}

    def blueprint(
        self, on_new_message: Callable[[UserMessage], Awaitable[None]]
    ) -> Blueprint:

        custom_webhook = Blueprint(
            "custom_webhook_{}".format(type(self).__name__),
            inspect.getmodule(self).__name__,
        )

        @custom_webhook.route("/", methods=["GET"])
        async def health(request: Request) -> HTTPResponse:
            return response.json({"status": "ok"})

        @custom_webhook.route("/webhook", methods=["POST"])
        async def receive(request: Request) -> HTTPResponse:
            sender_id = request.json.get("sender") # method to get sender_id 
            text = request.json.get("text") # method to fetch text
            input_channel = self.name() # method to fetch input channel
            metadata = self._extract_metadata(request)  # method to get metadata

            collector = self.get_output_channel()
            
            # include exception handling

            await on_new_message(
                UserMessage(
                    text,
                    collector,
                    sender_id,
                    input_channel=input_channel,
                    metadata=metadata,
                )
            )

            return response.json(collector.messages)

        return custom_webhook

    def get_output_channel(self) -> CollectingOutputChannel:
        return MyChannelOutput(self.custom_metadata)

    def _extract_metadata(self, req: Request) -> Optional[Text]:
        self.custom_metadata = req.json.get("metadata", {})
        return req.json.get("metadata", None)

By doing this, you will achieve what you expected.

2 Likes

@igao you solved it! :rocket: Thanks a lot my friend…

Hi, I am digging this topic up because I want to use the same mechanism above for a custom SocketIO channel. I am struggling how I transfer the code above without simply overwriting the CollectingOutputchannel.