The chat platform I use, send a security token through the webhook call URL, How can I get this URL in my custom_connector.py?
My code:
import logging
from sanic import Blueprint, response
from sanic.request import Request
from typing import Text, Dict, Any, List, Iterable, Optional, Callable, Awaitable
import sentry_sdk
from sentry_sdk.integrations.sanic import SanicIntegration
from sanic import Sanic
from rasa.core.channels.channel import UserMessage, OutputChannel, InputChannel
from sanic.response import HTTPResponse
from crisp_api import Crisp
logging.basicConfig(filename="newfile.log",
format='%(asctime)s %(message)s',
filemode='w')
logger = logging.getLogger(__name__)
class CrispBot(OutputChannel):
@classmethod
def name(cls) -> Text:
return "crisp"
def __init__(self, identifier: Text, key: Text, website_id: Text) -> None:
self.identifier = identifier
self.key = key
self.website_id = website_id
self.client = Crisp()
self.client.set_tier("plugin")
self.client.authenticate(identifier, key)
async def send_text_message(
self, recipient_id: Text, text: Text, **kwargs: Any
) -> None:
"""Send message to output channel"""
self.client.website.send_message_in_conversation(
self.website_id, recipient_id,
{
"type": "text",
"content": text,
"from": "operator",
"origin": "chat"
}
)
class CrispInput(InputChannel):
"""crisp input channel implementation."""
@classmethod
def name(cls) -> Text:
return "crisp"
@classmethod
def from_credentials(cls, credentials: Optional[Dict[Text, Any]]) -> InputChannel:
if not credentials:
cls.raise_missing_credentials_exception()
return cls(
credentials.get("identifier"),
credentials.get("key"),
credentials.get("website_id")
)
def __init__(self, identifier: Text, key: Text, website_id: Text) -> None:
self.identifier = identifier
self.key = key
self.website_id = website_id
print("URL::",self.url_prefix, flush=True)
print(self.blueprint)
async def send_message(
self,
text: Optional[Text],
website_id: Optional[Text],
user_id: Optional[Text],
sender_name: Optional[Text],
recipient_id: Optional[Text],
on_new_message: Callable[[UserMessage], Awaitable[Any]]
) -> None:
output_channel = self.get_output_channel()
user_msg = UserMessage(
text,
output_channel,
recipient_id,
input_channel=self.name(),
metadata={"website_id": website_id, "session_id": recipient_id, "user_id": user_id, "nickname": sender_name},
)
await on_new_message(user_msg)
def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[Any]]
) -> Blueprint:
crisp_custom_webhook = Blueprint("crisp_custom_webhook", __name__)
@crisp_custom_webhook.route("/", methods=["GET"])
async def health(_: Request) -> HTTPResponse:
return response.json({"status": "ok"})
@crisp_custom_webhook.route("/webhook", methods=["GET", "POST"])
async def receive(request: Request) -> HTTPResponse:
print("URL::2", self.url_prefix, flush=True)
data = request.json.get('data')
if data:
website_id = data.get('website_id')
user_id = data.get('user').get('user_id')
sender_name = data.get('user').get("nickname")
text = data.get("content", None)
session_id = data.get("session_id", None)
await self.send_message(
text = text, website_id = website_id, user_id = user_id, sender_name = sender_name, recipient_id = session_id, on_new_message= on_new_message
)
return response.text("")
return crisp_custom_webhook
def get_output_channel(self) -> OutputChannel:
return CrispBot(self.identifier, self.key, self.website_id)