Custom Channel asyncio RuntimeError

I am trying to set slot values into the tracker from custom channel, which executes perfectly in some instances and gives the below error in rest of the cases

RASA Version: 1.5.3

Python 3.6


2020-04-17 16:03:20,004 [ERROR]  Exception occurred while handling uri: 'http://localhost:5005/webhooks/testBot/webhook'
Traceback (most recent call last):
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/sanic/", line 976, in handle_request
    response = await response
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/www/lib/", line 204, in receive
    text,selection_index_count,option = await self.get_selection_index_value(sender_id,text)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/www/lib/", line 125, in get_selection_index_value
    selection_index = await self.get_slot_value(tracker_id,"selection_index")
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/www/lib/", line 113, in get_slot_value
    selection_response = await requests.get(api_url_base)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/", line 11, in get
    return await request("get", url, params=params, **kwargs)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/", line 6, in request
    return await session.request(method=method, url=url, **kwargs)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/", line 79, in request
    resp = await self.send(prep, **send_kwargs)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/", line 136, in send
    r = await adapter.send(request, **kwargs)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/", line 55, in send
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/", line 49, in request
    return await self.send(request, verify=verify, cert=cert, timeout=timeout)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/", line 130, in send
    raise exc
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/", line 121, in send
    request, verify=verify, cert=cert, timeout=timeout
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/", line 59, in send
    response = await self.h11_connection.send(request, timeout=timeout)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/", line 58, in send
    http_version, status_code, headers = await self._receive_response(timeout)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/", line 130, in _receive_response
    event = await self._receive_event(timeout)
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/", line 165, in _receive_event
    self.READ_NUM_BYTES, timeout, flag=self.timeout_flag
  File "/usr/lib/python3.6/asyncio/", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/", line 103, in read
    data = await asyncio.wait_for(, read_timeout)
  File "/usr/lib/python3.6/asyncio/", line 339, in wait_for
    return (yield from fut)
  File "/usr/lib/python3.6/asyncio/", line 634, in read
    yield from self._wait_for_data('read')
  File "/usr/lib/python3.6/asyncio/", line 452, in _wait_for_data
    'already waiting for incoming data' % func_name)
RuntimeError: read() called while another coroutine is already waiting for incoming data

Code from my Custom Channel:

from __future__ import print_function
from __future__ import unicode_literals

import logging
import numpy
import json
import boto3
from datetime import datetime
from random import randint
import inspect
import requests
import requests_async as requests

from import RestInput
from rasa.core import utils
from rasa.core import agent
from rasa.core.interpreter import RasaNLUInterpreter
from import UserMessage
from import CollectingOutputChannel
from rasa.core import utils
import rasa.utils.endpoints
from import SlotSet
from sanic import Sanic, Blueprint, response
from sanic.request import Request
from asyncio import Queue, CancelledError
from flask import jsonify
import asyncio
from unidecode import unidecode
import re
import time
from lib import constants

from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable
from sanic.response import HTTPResponse
from typing import NoReturn

logger = logging.getLogger(__name__)

class testBot(RestInput):
	"""A custom http input channel.

	This implementation is the basis for a custom implementation of a chat
	frontend. You can customize this to send messages to Rasa Core and
	retrieve responses from the agent."""

	def name(cls):
		return "testBot"

	async def on_message_wrapper(
		on_new_message: Callable[[UserMessage], Awaitable[Any]],
		text: Text,
		queue: Queue,
		sender_id: Text,
		input_channel: Text,
		metadata: Optional[Dict[Text, Any]],
	) -> None:
		collector = QueueOutputChannel(queue)

		message = UserMessage(
			text, collector, sender_id, input_channel=input_channel, metadata=metadata
		await on_new_message(message)

		await queue.put("DONE")

	async def _extract_sender(self, req):
		return req.form.get("session", None)

	# noinspection PyMethodMayBeStatic
	async def _extract_platform(self, req):
		return req.form.get("platform", None)

	async def _extract_message_id(self, req):
		return req.form.get("message_id", None)

	async def _extract_group_id(self, req):
		return req.form.get("group_id", None)

	async def _extract_lat(self, req):
		return req.form.get("lat", None)

	async def _extract_lang(self, req):
		return req.form.get("long", None)

	# noinspection PyMethodMayBeStatic
	async def _extract_message(self, req):
		return req.form.get("message", None)

	async def _extract_form_data(self, req):
		return req.form.get("form_data", None)

	def posttitle_for_menu(self,count):
		title = ""
		if count > 1:
			title = title+"\n\nType *"
			for i in range(count):
				if i > 0:
					title = title+str(i)+", "
			title = title[:-2]
			title = title+"* or type *MENU* to view the main menu."
		return title

	def invalid_option_selection(self, option, selection_index_count, selection):
		if int(option) > selection_index_count-1 and selection:
			msg = "You have entered an invalid option.\n\nPlease type *"
			for count in range(selection_index_count):
				if count > 0:
					msg = msg + str(count)+", "
			msg = msg[:-2]
			msg = msg + "* or type *Menu* to view the main menu."
		elif selection_index_count == 1 and selection:
			msg = "We are not expecting any menu selection.\n\n Please type *Menu* to view the main menu."
			msg = None
		return msg

	# To set slots in the tracker, send the tracker id, actual slot name and value to be set in the slot
	async def set_slot(self, tracker_id, slot_name, slot_value):
		api_url_base = constants.HTTP_URL + tracker_id + '/tracker/events'
		data = json.dumps({
			"event": "slot",
			"name": slot_name,
			"value": slot_value
		headers = {'Content-Type': 'application/json'}
		set_slot = await, data=data, headers=headers)
		if set_slot.status_code != 200:
			set_slot = await, data=data, headers=headers)
		return "Done"

	async def get_slot_value(self,tracker_id,slot_name):
		api_url_base = constants.HTTP_URL+tracker_id+'/tracker'
		selection_response = await requests.get(api_url_base)
		selection_status_code = selection_response.status_code
		if selection_status_code == 200:
			selection_text = json.loads(selection_response.text)
			selection_index = selection_text['slots'][slot_name]
			selection_index = None

		return selection_index

	async def get_selection_index_value(self,tracker_id,text):
		option = text
		selection_index = await self.get_slot_value(tracker_id,"selection_index")
		final_text = ""
		if selection_index != None:
			for x in range(1,len(selection_index)):
				if text in selection_index[str(x)]["title"]:
					final_text = selection_index[str(x)]["payload"]
					# await self.set_slot(tracker_id,"selected_index",int(x))
				if text in selection_index:
					# await self.set_slot(tracker_id,"selected_index",int(text))
					final_text = selection_index[text]["payload"]
			return text, len(selection_index.keys()), option
			return final_text, len(selection_index.keys()), option

	def stream_response(
		on_new_message: Callable[[UserMessage], Awaitable[None]],
		text: Text,
		sender_id: Text,
		input_channel: Text,
		metadata: Optional[Dict[Text, Any]],
	) -> Callable[[Any], Awaitable[None]]:
		async def stream(resp: Any) -> None:
			q = Queue()
			task = asyncio.ensure_future(
					on_new_message, text, q, sender_id, input_channel, metadata
			result = None  # declare variable up front to avoid pytype error
			while True:
				result = await q.get()
				if result == "DONE":
					await resp.write(json.dumps(result) + "\n")
			await task

		return stream  # pytype: disable=bad-return-type

	def blueprint(
		self, on_new_message: Callable[[UserMessage], Awaitable[None]]
	) -> Blueprint:
		custom_webhook = Blueprint(

		# noinspection PyUnusedLocal
		@custom_webhook.route("/", methods=["GET"])
		async def health(request: Request) -> HTTPResponse:
			return response.json({"status": "ok"})

		@custom_webhook.route("/webhook", methods=["POST"])
		async def receive(request: Request) -> HTTPResponse:
			sender_id = await self._extract_sender(request)
			input_channel = await self._extract_platform(request)
			group_id = await self._extract_group_id(request)
			lat = await self._extract_lat(request)
			lang = await self._extract_lang(request)

			# try:
			# 	await self.set_slot(sender_id,"group_id",group_id)
			# except Exception as e:
			# 	logger.exception(e)
				await self.set_slot(sender_id,"sender_id",sender_id)
			except Exception as e:
				if lat is not None:
					await self.set_slot(sender_id,"lat",lat)
			except Exception as e:
				if lang is not None:
					await self.set_slot(sender_id,"lang",lang)
			except Exception as e:

			text = await self._extract_message(request) if await self._extract_message(request) is not None else ""
			#get the selection index value if selection index is selected.
			selection = False
				if int(text):
					selection = True
			except Exception as e:
				selection = False

			text,selection_index_count,option = await self.get_selection_index_value(sender_id,text)
			print(selection, text, selection_index_count, option)
			if selection_index_count > 1 and selection == True:
				msg = self.invalid_option_selection(option,selection_index_count,selection)
				if msg is not None:
					message = {
						"success": 1,
						"message": [
						"session": sender_id
					return response.json(message)
			should_use_stream = rasa.utils.endpoints.bool_arg(
				request, "stream", default=False

			if should_use_stream:
					self.stream_response(on_new_message, text, sender_id),
				collector = CollectingOutputChannel()
				# noinspection PyBroadException
					msg = await on_new_message(
							text, collector, sender_id, input_channel=input_channel
					message_received_is = collector.messages[0]
					text = message_received_is.get("text")
					text = json.loads(text)  # convert the string to json
					recipient_id = message_received_is.get("recipient_id")

						button_added = False
						for index,message in enumerate(text):
							if "message" in message:
								message = message["message"]
								if "template" in message:
									message = message["template"]["elements"]
									if "buttons" in message:
										buttons = message["buttons"]
										selection_index = {str(index+1):{"title":button["title"],"payload":button["payload"]} for index,button in enumerate(buttons)}
										selection_index["type"] = "selection_index"

										await self.set_slot(sender_id,"selection_index",selection_index)
										button_added = True
										text[index]["message"]["template"]["elements"]["posttitle"] = self.posttitle_for_menu(len(selection_index))

					message = {
						"success": 1,
						"message": text,
						"session": recipient_id
					return response.json(message)

				except CancelledError:
						"Message handling timed out for "
						"user message '{}'.".format(text)
				except Exception:
						"An exception occured while handling "
						"user message '{}'.".format(text)
				return response.json(message)

		return custom_webhook

Have anyone else came across this error and found any solution to it? Thanks in advance for your help