Custom Channel asyncio RuntimeError

I am trying to set slot values into the tracker from custom channel, which executes perfectly in some instances and gives the below error in rest of the cases

RASA Version: 1.5.3

Python 3.6

ERROR STACKTRACE

2020-04-17 16:03:20,004 [ERROR]  Exception occurred while handling uri: 'http://localhost:5005/webhooks/testBot/webhook'
Traceback (most recent call last):
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/sanic/app.py", line 976, in handle_request
    response = await response
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/www/lib/myio.py", line 204, in receive
    text,selection_index_count,option = await self.get_selection_index_value(sender_id,text)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/www/lib/myio.py", line 125, in get_selection_index_value
    selection_index = await self.get_slot_value(tracker_id,"selection_index")
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/www/lib/myio.py", line 113, in get_slot_value
    selection_response = await requests.get(api_url_base)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/api.py", line 11, in get
    return await request("get", url, params=params, **kwargs)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/api.py", line 6, in request
    return await session.request(method=method, url=url, **kwargs)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/sessions.py", line 79, in request
    resp = await self.send(prep, **send_kwargs)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/sessions.py", line 136, in send
    r = await adapter.send(request, **kwargs)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/requests_async/adapters.py", line 55, in send
    timeout=timeout,
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/interfaces.py", line 49, in request
    return await self.send(request, verify=verify, cert=cert, timeout=timeout)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/connection_pool.py", line 130, in send
    raise exc
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/connection_pool.py", line 121, in send
    request, verify=verify, cert=cert, timeout=timeout
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/connection.py", line 59, in send
    response = await self.h11_connection.send(request, timeout=timeout)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/http11.py", line 58, in send
    http_version, status_code, headers = await self._receive_response(timeout)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/http11.py", line 130, in _receive_response
    event = await self._receive_event(timeout)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/dispatch/http11.py", line 165, in _receive_event
    self.READ_NUM_BYTES, timeout, flag=self.timeout_flag
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/test.rasa/test.rasa.env/lib/python3.6/site-packages/http3/concurrency.py", line 103, in read
    data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
  File "/usr/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/usr/lib/python3.6/asyncio/streams.py", line 634, in read
    yield from self._wait_for_data('read')
  File "/usr/lib/python3.6/asyncio/streams.py", line 452, in _wait_for_data
    'already waiting for incoming data' % func_name)
RuntimeError: read() called while another coroutine is already waiting for incoming data

Code from my Custom Channel:

from __future__ import print_function
from __future__ import unicode_literals

import logging
import numpy
import json
import boto3
from datetime import datetime
from random import randint
import inspect
import requests
import requests_async as requests

from rasa.core.channels.channel import RestInput
from rasa.core import utils
from rasa.core import agent
from rasa.core.interpreter import RasaNLUInterpreter
from rasa.core.channels.channel import UserMessage
from rasa.core.channels.channel import CollectingOutputChannel
from rasa.core import utils
import rasa.utils.endpoints
from rasa_sdk.events import SlotSet
from sanic import Sanic, Blueprint, response
from sanic.request import Request
from asyncio import Queue, CancelledError
from flask import jsonify
import asyncio
from unidecode import unidecode
import re
import time
from lib import constants

from typing import Text, List, Dict, Any, Optional, Callable, Iterable, Awaitable
from sanic.response import HTTPResponse
from typing import NoReturn

logger = logging.getLogger(__name__)


class testBot(RestInput):
	"""A custom http input channel.

	This implementation is the basis for a custom implementation of a chat
	frontend. You can customize this to send messages to Rasa Core and
	retrieve responses from the agent."""

	@classmethod
	def name(cls):
		return "testBot"

	@staticmethod
	async def on_message_wrapper(
		on_new_message: Callable[[UserMessage], Awaitable[Any]],
		text: Text,
		queue: Queue,
		sender_id: Text,
		input_channel: Text,
		metadata: Optional[Dict[Text, Any]],
	) -> None:
		collector = QueueOutputChannel(queue)

		message = UserMessage(
			text, collector, sender_id, input_channel=input_channel, metadata=metadata
		)
		await on_new_message(message)

		await queue.put("DONE")

	async def _extract_sender(self, req):
		return req.form.get("session", None)

	# noinspection PyMethodMayBeStatic
	async def _extract_platform(self, req):
		return req.form.get("platform", None)

	async def _extract_message_id(self, req):
		return req.form.get("message_id", None)

	async def _extract_group_id(self, req):
		return req.form.get("group_id", None)

	async def _extract_lat(self, req):
		return req.form.get("lat", None)

	async def _extract_lang(self, req):
		return req.form.get("long", None)

	# noinspection PyMethodMayBeStatic
	async def _extract_message(self, req):
		return req.form.get("message", None)

	async def _extract_form_data(self, req):
		return req.form.get("form_data", None)

	def posttitle_for_menu(self,count):
		title = ""
		if count > 1:
			title = title+"\n\nType *"
			for i in range(count):
				if i > 0:
					title = title+str(i)+", "
			title = title[:-2]
			title = title+"* or type *MENU* to view the main menu."
		return title

	def invalid_option_selection(self, option, selection_index_count, selection):
		if int(option) > selection_index_count-1 and selection:
			msg = "You have entered an invalid option.\n\nPlease type *"
			for count in range(selection_index_count):
				if count > 0:
					msg = msg + str(count)+", "
			msg = msg[:-2]
			msg = msg + "* or type *Menu* to view the main menu."
		elif selection_index_count == 1 and selection:
			msg = "We are not expecting any menu selection.\n\n Please type *Menu* to view the main menu."
		else:
			msg = None
		return msg

	# To set slots in the tracker, send the tracker id, actual slot name and value to be set in the slot
	async def set_slot(self, tracker_id, slot_name, slot_value):
		api_url_base = constants.HTTP_URL + tracker_id + '/tracker/events'
		data = json.dumps({
			"event": "slot",
			"name": slot_name,
			"value": slot_value
		})
		headers = {'Content-Type': 'application/json'}
		set_slot = await requests.post(api_url_base, data=data, headers=headers)
		if set_slot.status_code != 200:
			set_slot = await requests.post(api_url_base, data=data, headers=headers)
		return "Done"

	async def get_slot_value(self,tracker_id,slot_name):
		api_url_base = constants.HTTP_URL+tracker_id+'/tracker'
		selection_response = await requests.get(api_url_base)
		selection_status_code = selection_response.status_code
		if selection_status_code == 200:
			selection_text = json.loads(selection_response.text)
			selection_index = selection_text['slots'][slot_name]
		else:
			selection_index = None

		return selection_index

	async def get_selection_index_value(self,tracker_id,text):
		option = text
		selection_index = await self.get_slot_value(tracker_id,"selection_index")
		final_text = ""
		if selection_index != None:
			for x in range(1,len(selection_index)):
				# logging.info(selection_index)
				if text in selection_index[str(x)]["title"]:
					final_text = selection_index[str(x)]["payload"]
					# await self.set_slot(tracker_id,"selected_index",int(x))
				if text in selection_index:
					# await self.set_slot(tracker_id,"selected_index",int(text))
					final_text = selection_index[text]["payload"]
		if(final_text==""):
			return text, len(selection_index.keys()), option
		else:
			return final_text, len(selection_index.keys()), option

	def stream_response(
		self,
		on_new_message: Callable[[UserMessage], Awaitable[None]],
		text: Text,
		sender_id: Text,
		input_channel: Text,
		metadata: Optional[Dict[Text, Any]],
	) -> Callable[[Any], Awaitable[None]]:
		async def stream(resp: Any) -> None:
			q = Queue()
			task = asyncio.ensure_future(
				self.on_message_wrapper(
					on_new_message, text, q, sender_id, input_channel, metadata
				)
			)
			result = None  # declare variable up front to avoid pytype error
			while True:
				result = await q.get()
				if result == "DONE":
					break
				else:
					await resp.write(json.dumps(result) + "\n")
			await task

		return stream  # pytype: disable=bad-return-type

	def blueprint(
		self, on_new_message: Callable[[UserMessage], Awaitable[None]]
	) -> Blueprint:
		custom_webhook = Blueprint(
			"custom_webhook_{}".format(type(self).__name__),
			inspect.getmodule(self).__name__,
		)

		# noinspection PyUnusedLocal
		@custom_webhook.route("/", methods=["GET"])
		async def health(request: Request) -> HTTPResponse:
			return response.json({"status": "ok"})

		@custom_webhook.route("/webhook", methods=["POST"])
		async def receive(request: Request) -> HTTPResponse:
			print(request.form)
			sender_id = await self._extract_sender(request)
			input_channel = await self._extract_platform(request)
			group_id = await self._extract_group_id(request)
			lat = await self._extract_lat(request)
			lang = await self._extract_lang(request)

			## SET GROUP ID
			# try:
			# 	await self.set_slot(sender_id,"group_id",group_id)
			# except Exception as e:
			# 	logger.exception(e)
			## SET SENDER ID
			try:
				await self.set_slot(sender_id,"sender_id",sender_id)
			except Exception as e:
				logger.exception(e)
			## SET LATITUDE
			try:
				if lat is not None:
					await self.set_slot(sender_id,"lat",lat)
			except Exception as e:
				logger.exception(e)
			## SET LONGITUDE
			try:
				if lang is not None:
					await self.set_slot(sender_id,"lang",lang)
			except Exception as e:
				logger.exception(e)

			text = await self._extract_message(request) if await self._extract_message(request) is not None else ""
			#get the selection index value if selection index is selected.
			selection = False
			try:
				if int(text):
					selection = True
			except Exception as e:
				selection = False

			text,selection_index_count,option = await self.get_selection_index_value(sender_id,text)
			print(selection, text, selection_index_count, option)
			if selection_index_count > 1 and selection == True:
				msg = self.invalid_option_selection(option,selection_index_count,selection)
				print(msg)
				if msg is not None:
					message = {
						"success": 1,
						"message": [
							{
								"message":{
									"template":{
										"elements":{
											"title":msg,
											"says":"",
											"visemes":""
										}
									}
								}
							}
						],
						"session": sender_id
					}					
					return response.json(message)
			should_use_stream = rasa.utils.endpoints.bool_arg(
				request, "stream", default=False
			)

			if should_use_stream:
				return response.stream(
					self.stream_response(on_new_message, text, sender_id),
					content_type="text/event-stream",
				)
			else:
				collector = CollectingOutputChannel()
				# noinspection PyBroadException
				try:
					msg = await on_new_message(
						UserMessage(
							text, collector, sender_id, input_channel=input_channel
						)
					)
					message_received_is = collector.messages[0]
					text = message_received_is.get("text")
					text = json.loads(text)  # convert the string to json
					recipient_id = message_received_is.get("recipient_id")

					# SET SELECTION INDEX IF BUTTONS FOUND
					try:
						button_added = False
						for index,message in enumerate(text):
							if "message" in message:
								message = message["message"]
								if "template" in message:
									message = message["template"]["elements"]
									if "buttons" in message:
										buttons = message["buttons"]
										selection_index = {str(index+1):{"title":button["title"],"payload":button["payload"]} for index,button in enumerate(buttons)}
										selection_index["type"] = "selection_index"
										logging.info(selection_index)
										await self.set_slot(sender_id,"selection_index",selection_index)
										button_added = True
										text[index]["message"]["template"]["elements"]["posttitle"] = self.posttitle_for_menu(len(selection_index))
					except Exception as e:
						logging.info("IN CHANNEL EXCEPTION WHILE SETTING BUTTONS")
						logging.exception(e)

					message = {
						"success": 1,
						"message": text,
						"session": recipient_id
					}
					return response.json(message)

				except CancelledError:
					logger.error(
						"Message handling timed out for "
						"user message '{}'.".format(text)
					)
				except Exception:
					logger.exception(
						"An exception occured while handling "
						"user message '{}'.".format(text)
					)
				return response.json(message)

		return custom_webhook

Have anyone else came across this error and found any solution to it? Thanks in advance for your help