Hello Everyone,
I figured out a way myself by seeing the code mentioned at rasa rest
Here is the code to perform custom connectors completely working with default rest input and custom connections as well
Custom Connector code :
import asyncio
import inspect
from sanic import Sanic, Blueprint, response
from sanic.request import Request
from sanic.response import HTTPResponse
from typing import Text, Dict, Any, Optional, Callable, Awaitable, NoReturn
from asyncio import Queue, CancelledError
import rasa.utils.endpoints
from rasa.core.channels.channel import (
InputChannel,
CollectingOutputChannel,
UserMessage,
)
class MyioInput(InputChannel):
@classmethod
def name(cls) -> Text:
return "myio"
@staticmethod
async def on_message_wrapper(
on_new_message: Callable[[UserMessage], Awaitable[Any]],
text: Text,
queue: Queue,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> None:
collector = QueueOutputChannel(queue)
message = UserMessage(
text, collector, sender_id, input_channel=input_channel, metadata=metadata
)
await on_new_message(message)
await queue.put("DONE")
async def _extract_sender(self, req: Request) -> Optional[Text]:
return req.json.get("sender", None)
# noinspection PyMethodMayBeStatic
def _extract_message(self, req: Request) -> Optional[Text]:
return req.json.get("message", None)
def _extract_input_channel(self, req: Request) -> Text:
return req.json.get("input_channel") or self.name()
def stream_response(
self,
on_new_message: Callable[[UserMessage], Awaitable[None]],
text: Text,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> Callable[[Any], Awaitable[None]]:
async def stream(resp: Any) -> None:
q = Queue()
task = asyncio.ensure_future(
self.on_message_wrapper(
on_new_message, text, q, sender_id, input_channel, metadata
)
)
while True:
result = await q.get()
if result == "DONE":
break
else:
await resp.write(json.dumps(result) + "\n")
await task
return stream
def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
custom_webhook = Blueprint(
"custom_webhook_{}".format(type(self).__name__),
inspect.getmodule(self).__name__,
)
# noinspection PyUnusedLocal
@custom_webhook.route("/", methods=["GET"])
async def health(request: Request) -> HTTPResponse:
return response.json({"status": "ok"})
@custom_webhook.route("/webhook", methods=["POST"])
async def receive(request: Request) -> HTTPResponse:
sender_id = await self._extract_sender(request)
text = self._extract_message(request)
should_use_stream = rasa.utils.endpoints.bool_arg(
request, "stream", default=False
)
input_channel = self._extract_input_channel(request)
metadata = self.get_metadata(request)
if should_use_stream:
return response.stream(
self.stream_response(
on_new_message, text, sender_id, input_channel, metadata
),
content_type="text/event-stream",
)
else:
collector = MyioOutput()
# noinspection PyBroadException
try:
await on_new_message(
UserMessage(
text,
collector,
sender_id,
input_channel=input_channel,
metadata=metadata,
)
)
except CancelledError:
logger.error(
f"Message handling timed out for " f"user message '{text}'."
)
except Exception:
logger.exception(
f"An exception occured while handling "
f"user message '{text}'."
)
return response.json(collector.messages)
return custom_webhook
class MyioOutput(CollectingOutputChannel):
@classmethod
def name(cls) -> Text:
return "myio"
My code is stored in this directory structure :
- data
-- nlu.yml
-- stories.yml
-- rules.yml
- actions
-- __init__.py
-- actions.py
- models
- connectors
-- custom_channel.py
- domain.yml
- config.yml
- credentials.yml
- endpoints.yml
Hence My credentials.yml contains the connectors as :
connector.MyIO.MyioInput:
My domain.yml file looks like this:
utter_greet:
- text: default greetings
- text: custom greetings
channel: "myio"
Lastly, train the model using rasa train
and then run the following command to start the rasa server
rasa run -m models --enable-api --cors "*" --credentials credentials.yml --debug
And the run this endpoint to get default response :
http://localhost:5005/webhooks/rest/webhook
And to test your custom connector myio custom response, hit this endpoint :
http://localhost:5005/webhooks/myio/webhook
Thanks, Everyone for help.