I am trying to set slot values into the tracker from custom channel, which executes perfectly in some instances and gives the below error in rest of the cases
RASA Version: 1.5.3
Python 3.6
ERROR STACKTRACE
2020-04-17 16:03:20,004 [ERROR] Exception occurred while handling uri: 'http://localhost:5005/webhooks/testBot/webhook'
Traceback (most recent call last):
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/sanic/app.py", line 976, in handle_request
response = await response
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/www/lib/myio.py", line 204, in receive
text,selection_index_count,option = await self.get_selection_index_value(sender_id,text)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/www/lib/myio.py", line 125, in get_selection_index_value
selection_index = await self.get_slot_value(tracker_id,"selection_index")
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/www/lib/myio.py", line 113, in get_slot_value
selection_response = await requests.get(api_url_base)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/api.py", line 11, in get
return await request("get", url, params=params, **kwargs)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/api.py", line 6, in request
return await session.request(method=method, url=url, **kwargs)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/sessions.py", line 79, in request
resp = await self.send(prep, **send_kwargs)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/sessions.py", line 136, in send
r = await adapter.send(request, **kwargs)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/adapters.py", line 55, in send
timeout=timeout,
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/interfaces.py", line 49, in request
return await self.send(request, verify=verify, cert=cert, timeout=timeout)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/connection_pool.py", line 130, in send
raise exc
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/connection_pool.py", line 121, in send
request, verify=verify, cert=cert, timeout=timeout
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/connection.py", line 59, in send
response = await self.h11_connection.send(request, timeout=timeout)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/http11.py", line 58, in send
http_version, status_code, headers = await self._receive_response(timeout)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/http11.py", line 130, in _receive_response
event = await self._receive_event(timeout)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/http11.py", line 165, in _receive_event
self.READ_NUM_BYTES, timeout, flag=self.timeout_flag
File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/concurrency.py", line 103, in read
data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
File "/usr/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
return (yield from fut)
File "/usr/lib/python3.6/asyncio/streams.py", line 634, in read
yield from self._wait_for_data('read')
File "/usr/lib/python3.6/asyncio/streams.py", line 452, in _wait_for_data
'already waiting for incoming data' % func_name)
RuntimeError: read() called while another coroutine is already waiting for incoming data
Code from my Custom Channel:
from __future__ import print_function
from __future__ import unicode_literals
import logging
import numpy
import json
import boto3
from datetime import datetime
from random import randint
import inspect
import requests
import requests_async as requests
from rasa.core.channels.channel import RestInput
from rasa.core import utils
from rasa.core import agent
from rasa.core.interpreter import RasaNLUInterpreter
from rasa.core.channels.channel import UserMessage
from rasa.core.channels.channel import CollectingOutputChannel
from rasa.core import utils
import rasa.utils.endpoints
from rasa_sdk.events import SlotSet
from sanic import Sanic, Blueprint, response
from sanic.request import Request
from asyncio import Queue, CancelledError
from flask import jsonify
import asyncio
from unidecode import unidecode
import re
import time
from lib import constants
from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable
from sanic.response import HTTPResponse
from typing import NoReturn
logger = logging.getLogger(__name__)
class testBot(RestInput):
"""A custom http input channel.
This implementation is the basis for a custom implementation of a chat
frontend. You can customize this to send messages to Rasa Core and
retrieve responses from the agent."""
@classmethod
def name(cls):
return "testBot"
@staticmethod
async def on_message_wrapper(
on_new_message: Callable[[UserMessage], Awaitable[Any]],
text: Text,
queue: Queue,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> None:
collector = QueueOutputChannel(queue)
message = UserMessage(
text, collector, sender_id, input_channel=input_channel, metadata=metadata
)
await on_new_message(message)
await queue.put("DONE")
async def _extract_sender(self, req):
return req.form.get("session", None)
# noinspection PyMethodMayBeStatic
async def _extract_platform(self, req):
return req.form.get("platform", None)
async def _extract_message_id(self, req):
return req.form.get("message_id", None)
async def _extract_group_id(self, req):
return req.form.get("group_id", None)
async def _extract_lat(self, req):
return req.form.get("lat", None)
async def _extract_lang(self, req):
return req.form.get("long", None)
# noinspection PyMethodMayBeStatic
async def _extract_message(self, req):
return req.form.get("message", None)
async def _extract_form_data(self, req):
return req.form.get("form_data", None)
def posttitle_for_menu(self,count):
title = ""
if count > 1:
title = title+"\n\nType *"
for i in range(count):
if i > 0:
title = title+str(i)+", "
title = title[:-2]
title = title+"* or type *MENU* to view the main menu."
return title
def invalid_option_selection(self, option, selection_index_count, selection):
if int(option) > selection_index_count-1 and selection:
msg = "You have entered an invalid option.\n\nPlease type *"
for count in range(selection_index_count):
if count > 0:
msg = msg + str(count)+", "
msg = msg[:-2]
msg = msg + "* or type *Menu* to view the main menu."
elif selection_index_count == 1 and selection:
msg = "We are not expecting any menu selection.\n\n Please type *Menu* to view the main menu."
else:
msg = None
return msg
# To set slots in the tracker, send the tracker id, actual slot name and value to be set in the slot
async def set_slot(self, tracker_id, slot_name, slot_value):
api_url_base = constants.HTTP_URL + tracker_id + '/tracker/events'
data = json.dumps({
"event": "slot",
"name": slot_name,
"value": slot_value
})
headers = {'Content-Type': 'application/json'}
set_slot = await requests.post(api_url_base, data=data, headers=headers)
if set_slot.status_code != 200:
set_slot = await requests.post(api_url_base, data=data, headers=headers)
return "Done"
async def get_slot_value(self,tracker_id,slot_name):
api_url_base = constants.HTTP_URL+tracker_id+'/tracker'
selection_response = await requests.get(api_url_base)
selection_status_code = selection_response.status_code
if selection_status_code == 200:
selection_text = json.loads(selection_response.text)
selection_index = selection_text['slots'][slot_name]
else:
selection_index = None
return selection_index
async def get_selection_index_value(self,tracker_id,text):
option = text
selection_index = await self.get_slot_value(tracker_id,"selection_index")
final_text = ""
if selection_index != None:
for x in range(1,len(selection_index)):
# logging.info(selection_index)
if text in selection_index[str(x)]["title"]:
final_text = selection_index[str(x)]["payload"]
# await self.set_slot(tracker_id,"selected_index",int(x))
if text in selection_index:
# await self.set_slot(tracker_id,"selected_index",int(text))
final_text = selection_index[text]["payload"]
if(final_text==""):
return text, len(selection_index.keys()), option
else:
return final_text, len(selection_index.keys()), option
def stream_response(
self,
on_new_message: Callable[[UserMessage], Awaitable[None]],
text: Text,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> Callable[[Any], Awaitable[None]]:
async def stream(resp: Any) -> None:
q = Queue()
task = asyncio.ensure_future(
self.on_message_wrapper(
on_new_message, text, q, sender_id, input_channel, metadata
)
)
result = None # declare variable up front to avoid pytype error
while True:
result = await q.get()
if result == "DONE":
break
else:
await resp.write(json.dumps(result) + "\n")
await task
return stream # pytype: disable=bad-return-type
def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
custom_webhook = Blueprint(
"custom_webhook_{}".format(type(self).__name__),
inspect.getmodule(self).__name__,
)
# noinspection PyUnusedLocal
@custom_webhook.route("/", methods=["GET"])
async def health(request: Request) -> HTTPResponse:
return response.json({"status": "ok"})
@custom_webhook.route("/webhook", methods=["POST"])
async def receive(request: Request) -> HTTPResponse:
print(request.form)
sender_id = await self._extract_sender(request)
input_channel = await self._extract_platform(request)
group_id = await self._extract_group_id(request)
lat = await self._extract_lat(request)
lang = await self._extract_lang(request)
## SET GROUP ID
# try:
# await self.set_slot(sender_id,"group_id",group_id)
# except Exception as e:
# logger.exception(e)
## SET SENDER ID
try:
await self.set_slot(sender_id,"sender_id",sender_id)
except Exception as e:
logger.exception(e)
## SET LATITUDE
try:
if lat is not None:
await self.set_slot(sender_id,"lat",lat)
except Exception as e:
logger.exception(e)
## SET LONGITUDE
try:
if lang is not None:
await self.set_slot(sender_id,"lang",lang)
except Exception as e:
logger.exception(e)
text = await self._extract_message(request) if await self._extract_message(request) is not None else ""
#get the selection index value if selection index is selected.
selection = False
try:
if int(text):
selection = True
except Exception as e:
selection = False
text,selection_index_count,option = await self.get_selection_index_value(sender_id,text)
print(selection, text, selection_index_count, option)
if selection_index_count > 1 and selection == True:
msg = self.invalid_option_selection(option,selection_index_count,selection)
print(msg)
if msg is not None:
message = {
"success": 1,
"message": [
{
"message":{
"template":{
"elements":{
"title":msg,
"says":"",
"visemes":""
}
}
}
}
],
"session": sender_id
}
return response.json(message)
should_use_stream = rasa.utils.endpoints.bool_arg(
request, "stream", default=False
)
if should_use_stream:
return response.stream(
self.stream_response(on_new_message, text, sender_id),
content_type="text/event-stream",
)
else:
collector = CollectingOutputChannel()
# noinspection PyBroadException
try:
msg = await on_new_message(
UserMessage(
text, collector, sender_id, input_channel=input_channel
)
)
message_received_is = collector.messages[0]
text = message_received_is.get("text")
text = json.loads(text) # convert the string to json
recipient_id = message_received_is.get("recipient_id")
# SET SELECTION INDEX IF BUTTONS FOUND
try:
button_added = False
for index,message in enumerate(text):
if "message" in message:
message = message["message"]
if "template" in message:
message = message["template"]["elements"]
if "buttons" in message:
buttons = message["buttons"]
selection_index = {str(index+1):{"title":button["title"],"payload":button["payload"]} for index,button in enumerate(buttons)}
selection_index["type"] = "selection_index"
logging.info(selection_index)
await self.set_slot(sender_id,"selection_index",selection_index)
button_added = True
text[index]["message"]["template"]["elements"]["posttitle"] = self.posttitle_for_menu(len(selection_index))
except Exception as e:
logging.info("IN CHANNEL EXCEPTION WHILE SETTING BUTTONS")
logging.exception(e)
message = {
"success": 1,
"message": text,
"session": recipient_id
}
return response.json(message)
except CancelledError:
logger.error(
"Message handling timed out for "
"user message '{}'.".format(text)
)
except Exception:
logger.exception(
"An exception occured while handling "
"user message '{}'.".format(text)
)
return response.json(message)
return custom_webhook
Have anyone else came across this error and found any solution to it? Thanks in advance for your help