concurrent.futures._base.CancelledError with long running custom actions

Hi,

I just encountered a similar problem, although in my case it is not caused by a slow custom action, and I have seen it occurred only once so far.

Here is the action code:

class ActionSetStartDefaultActionParameters(Action):
    """Set next_question_action slot for the beginning
    of the conversation"""
    
    def name(self) -> Text:
        return "action_set_start_default_action_parameters"
    
    def run(self,
           dispatcher: CollectingDispatcher,
           tracker: Tracker,
           domain: Dict[Text, Any]) -> List[Dict[Text, Any]]:

        # Extract any title, name, introduction and time of day flag
      latest_message = tracker.current_state()["latest_message"]["text"]

      entitiesValues = re.findall( '(title|name|intro|tod)_([^\s]*)', latest_message, re.IGNORECASE )
      
      return [
              SlotSet("next_question_action", "utter_action_first_question"),
              ] + [ SlotSet(entity,value) for (entity,value) in entitiesValues ]

As you can see this action simply performs a RegEx search to extract key/value pairs from a string and finally assign those values to slots having the same names as the keys.

Also, the error message on the RASA Bot side is slightly different than what the OP describes, but the root error is also a concurrent.futures._base.CancelledError from aiohttp

ERROR    rasa.core.processor  - Encountered an exception while running action 'action_set_start_default_action_parameters'. Bot will continue, but the actions events are lost. Please check the logs of your action server for more information.
2020-10-09 09:48:01 DEBUG    rasa.core.processor  - 
Traceback (most recent call last):
  File "/opt/venv/lib/python3.7/site-packages/rasa/core/processor.py", line 650, in _run_action
    events = await action.run(output_channel, nlg, tracker, self.domain)
  File "/opt/venv/lib/python3.7/site-packages/rasa/core/actions/action.py", line 549, in run
    json=json_body, method="post", timeout=DEFAULT_REQUEST_TIMEOUT
  File "/opt/venv/lib/python3.7/site-packages/rasa/utils/endpoints.py", line 150, in request
    **kwargs,
  File "/opt/venv/lib/python3.7/site-packages/aiohttp/client.py", line 1012, in __aenter__
    self._resp = await self._coro
  File "/opt/venv/lib/python3.7/site-packages/aiohttp/client.py", line 483, in _request
    timeout=real_timeout
  File "/opt/venv/lib/python3.7/site-packages/aiohttp/connector.py", line 523, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "/opt/venv/lib/python3.7/site-packages/aiohttp/connector.py", line 859, in _create_connection
    req, traces, timeout)
  File "/opt/venv/lib/python3.7/site-packages/aiohttp/connector.py", line 967, in _create_direct_connection
    traces=traces), loop=self._loop)
concurrent.futures._base.CancelledError

They are no errors in the action server logs.

The setup:

I’m running the RASA bot (docker image rasa/rasa:1.10.8) and action server (image rasa/rasa-sdk:1.10.2) in separate docker containers on the same docker overlay network - although the 2 containers are deployed on the same host.

Remarks:

Debugging some completely unrelated client/server application exchanging data using TCP on a similar overlay network, I have noticed that data packets can randomly be lost during transmission without triggering any error. Although it has only ever occurred between containers on different hosts, I thought I mention it in case it helps narrow down the problem.

Edit:

From my application logs, I can see that there were network packets lost between 2 others services running on the same overlay network as the RASA bot and action server at the same time as this error occurred, which suggests that in my case it is purely a networking problem.

1 Like

Hi, I had the same problem. How did you deal with it?

Hi, Jonsku, I had the same problem. How did you deal with it?

1 Like

Hi, I have the same problem with rasa slack integration. Do we have any updates on this issue.

1 Like

Hi , I am also facing same problem (using Rasa 2.6.0) . I am using Rest, MS Teams (botframework) and Slack channel. This error is coming only in case of Slack channel. Is there any specific configuration for slack channel ? I also tried to increase the SANIC_REQUEST_TIMEOUT and SANIC_RESPONSE_TIMEOUT to 360 but no success. Any pointers in this regard will really help. In forum there are similar issues reported but no resolution. I am pasting the error log.

2021-06-07 12:36:12 ERROR rasa.core.channels.slack - Exception when trying to handle message. 2021-06-07 12:36:12 ERROR rasa.core.channels.slack - Traceback (most recent call last): File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\channels\slack.py”, line 373, in process_message await on_new_message(user_msg) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\channels\channel.py”, line 89, in handler await app.agent.handle_message(message) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\agent.py”, line 576, in handle_message return await processor.handle_message(message) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\processor.py”, line 108, in handle_message await self._predict_and_execute_next_action(message.output_channel, tracker) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\processor.py”, line 660, in _predict_and_execute_next_action action, tracker, output_channel, self.nlg, prediction File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\processor.py”, line 798, in _run_action await self.execute_side_effects(events, tracker, output_channel) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\processor.py”, line 699, in execute_side_effects await self._send_bot_messages(events, tracker, output_channel) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\processor.py”, line 715, in _send_bot_messages await output_channel.send_response(tracker.sender_id, e.message()) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\channels\channel.py”, line 231, in send_response await self.send_text_message(recipient_id, message.pop(“text”), **message) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\channels\slack.py”, line 57, in send_text_message channel=recipient, as_user=True, text=message_part, type=“mrkdwn” File “c:\users\user\anaconda3\envs\rasa_2_6\lib\asyncio\coroutines.py”, line 129, in throw return self.gen.throw(type, value, traceback) File “c:\users\user\anaconda3\envs\rasa_2_6\lib\site-packages\rasa\core\channels\slack.py”, line 49, in _post_message await self.client.chat_postMessage(channel=channel, **kwargs) concurrent.futures._base.CancelledError

I had the same problem and it seems to be because Slack cancels the request after 3 seconds, which also cancels all the awaitables. For me, this meant that sometimes actions didn’t finish and the bot did not reply at all.

A workaround for the problem is to use asyncio.shield (Coroutines and Tasks — Python 3.12.0 documentation), which ensures that the inner function cannot be cancelled. My overall solution is a bit hacky, but works.

First, I created a helper method non_cancellable_shield that waits for the inner function even if the shield is canceled.

import asyncio
import logging
from typing import Any

logger = logging.getLogger(__name__)

async def non_cancellable_shield(func) -> Any:
  future = asyncio.ensure_future(func)
  try:
    # Shield task to prevent cancelling
    logger.info('Tries awaiting future')
    return await asyncio.shield(future)
  except asyncio.CancelledError:
    # Await original task
    logger.info('Shield cancelled')
    return await future

I then used this helper method in a custom Slack connector. I took the code from Github (https://github.com/RasaHQ/rasa/blob/2.7.x/rasa/core/channels/slack.py) and simply wrapped each Awaitable into the helper method. I.e. for example for _post_message:

    async def _post_message(self, channel: Text, **kwargs: Any) -> None:
        if self.thread_id:
            await non_cancellable_shield(self.client.chat_postMessage(
                channel=channel, **kwargs, thread_ts=self.thread_id
            ))
        else:
            await non_cancellable_shield(self.client.chat_postMessage(channel=channel, **kwargs))

To use the custom connector, I customized the Slack connector in credentials.yml as follows (the custom connector is in the file channels/custom_slack_connector.py and the class that inherits from the InputChannel is called CustomSlackConnector):

channels.custom_slack_connector.CustomSlackConnector:
  slack_token: ${SLACK_TOKEN}
  slack_channel: ${SLACK_CHANNEL}
  slack_signing_secret: ${SLACK_SIGNING_SECRET}

Since Slack can theoretically cancel the request at any point in the code, I also wrapped all other awaitables in the non_cancellable_shield helper method.

Note: I use Rasa 2.7.0 and Python 3.7.6

Hi @jholsten can you send your custom slack connector file? That would be a huge help for me. Thanks!

Hi @webdev-rohit! Sure, you can find the custom connector attached:

custom_slack_connector.py (22.5 KB)

Unfortunalety I can’t send you the file I’m actually using right now, so I just adapted the original source code again and added the non_cancellable_shield-calls. I can’t currently test if everything works with it, so feel free to let me know if something fails and I’ll take another look.

Hi @jholsten , I was able to wrap up my awaitables with non_cancellable_shield calls and connect my custom-slack channel successfully and get rid of http_timeout issue. Thanks a ton!

1 Like

Hi @jholsten, I have the same issue with the ‘rest’ channel.
I tried to wrap the methods with non_cancellable_shield but it still goes to the exception block if non_cancellable_shield giving ‘Shield cancelled’ message. Do you have a sample custom rest channel?

Hi @vahid_ce, that shouldn’t be a problem; normally the shielded future is still awaited and the log message can be ignored. Are you sure that the original task was not executed?

Regarding the rest channel: I did not try this yet but could look into it if you like :slight_smile: