Using Asynchronous function and Coroutines on Component

Hi! I’m trying use “async def” custom function inside “PROCESS” on custom component on RASA CORE, and it is returning to me the error: “RuntimeError: this event loop is already running.”. The rasa system is running into a ASYNC UVLOOP context. How I can create async functions on component?

This message appears when I create: loop = ayncio.get_event_loop() loo.run_until_complete(main())

This is my test code calling a NASA API and insert into a ASYNCIO context.

from rasa.nlu.components import Component

from rasa.nlu import utils from rasa.nlu.model import Metadata import requests import asyncio

class NasaApiCall(Component): “”“A pre-trained sentiment component”"" name = “NasaApiCall” provides = [“entities”] requires = [] defaults = {} language_list = None

def __init__(self, component_config=None):
    super(NasaApiCall, self).__init__(component_config)

def train(self, training_data, cfg, **kwargs):
    """Not needed, because the the model is pretrained"""
    pass


def process(self, message, **kwargs):
    """Retrieve the text message, pass it to the classifier
        and append the prediction results to the message class."""


    apis_nasa = ['http://api.open-notify.org/astros.json',
                 'http://api.open-notify.org/iss-now.json',
                'http://api.open-notify.org/iss-pass.json?lat={0}&lon={1}']
    weather = 'http://api.weatherstack.com/current?access_key=c819582b84ab52739ea8fd0b8cd01b20&query={}'

    async def nasa(apis, pos, lat=None, lon=None):

        nonlocal apis_nasa

        api = apis[pos]
        if lat and lon:
            api = api.format(lat,lon)
        api_nasa = requests.get(api)
        asyncio.sleep(2)
        print(api_nasa.json())
        print("-------------------")

    async def weather_info(city):

        nonlocal weather

        w_info = requests.get(weather.format(city))
        await asyncio.sleep(2)
        print(w_info.json())
        print("-------------------")

    async def main():

        nonlocal apis_nasa

        await asyncio.gather(*[
            nasa(apis_nasa,0), weather_info('Brasilia'), nasa(apis_nasa,1), nasa(apis_nasa,2,45.0,-122.3)
        ])

    loop = asyncio.get_running_loop()  
    loop.run_until_complete(main())
    

    
    entity = {"value": 1,
            "entity": 'cuisine',
            "extractor": "sentiment_extractor"}
    message.set('entities',[entity],add_to_output=True)

def persist(self, file_name, model_dir):
    """Pass because a pre-trained model is already persisted"""

    pass

In[ ]:

Thank you.

1 Like

Hi @leoitcode, as you mentioned, Rasa is running on an event loop context. The problem with the example code you posted is probably that you’re using run_until_complete(), which tries to execute a coroutine by starting an event loop and then stopping it. However at this point the event loop has already been started by Rasa before, so trying to start it again will lead to the RuntimeError: this event loop is already running. error.

To run your main coroutine using the already running event loop, you could try using create_task() instead of run_until_complete(), which will schedule the task to run on the loop and then return.

1 Like

Yes, thank you for the tip @fede , the whole problem was 3.6 version of python, I could fix this in Python 3.7 version with asyncio.get_running_loop().

1 Like