Rasa event brokers with Kafka

Hi there,

I am trying to create a kafka event broker from a while following the event broker documentation here.

The docs says to add event broker in config.yml which I did but I get an error when I run rasa shell command saying kafka module not found. I am sure that I have kafka-python installed and then tracker_store = InMemoryTrackerStore(domain=domain, event_broker=kafka_broker) this line says domain not found.

I am not really sure what this is.

Did anyone successfully write data to a kafka topic with kafka event brokers?

Hi @tejasuw, could you post the traceback you’re getting from rasa shell?

Thanks for the reply @mloubser

I am using the documentation as reference.

So far I have endpoint.yml file which looks like this.

  url: localhost
  topic: test
  #client_id: kafka-python-rasa
  security_protocol: PLAINTEXT
  type: kafka

and I have two python files one for kafka broker as mentioned here and the other for Kafka consumer specified here.

The goal is to receive messages from event broker, write them to a topic and read it with a kafka consumer.

When ever I run kafka consumer script, this is the error I see.

(venv) C:\Users\tejasuw\mybot\>python kafka_broker.py
Traceback (most recent call last):
  File "kafka_broker.py", line 5, in <module>
    tracker_store = InMemoryTrackerStore(domain=domain, event_broker=kafka_broker)
NameError: name 'domain' is not defined

Ok! I understand better now. Yes, you’ll need to load your domain to use it - I’ll update the documentation to reflect that. Instantiating a kafka broker in Python should look like this:

from rasa.core.brokers.kafka import KafkaEventBroker
from rasa.core.tracker_store import InMemoryTrackerStore
from rasa.shared.core.domain import Domain

kafka_broker = KafkaEventBroker(url='localhost:9092',
                                topic='rasa_events')


domain = Domain.load("domain.yml")
tracker_store = InMemoryTrackerStore(domain=domain, event_broker=kafka_broker)

PR opened here Kafka broker docs by melindaloubser1 · Pull Request #7170 · RasaHQ/rasa · GitHub

Thanks @mloubser

So you can either add kafka broker using endpoints.yml or as a script. I am trying to use enpoints.yml and it says TypeError: __init__() got an unexpected keyword argument 'client_id' sys:1: RuntimeWarning: coroutine 'BaseEventLoop.create_server' was never awaited

Can you help me here?

Here are the complete logs.

Traceback (most recent call last):
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\sanic\app.py", line 1167, in run
    serve(**server_settings)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\sanic\server.py", line 892, in serve
    trigger_events(before_start, loop)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\sanic\server.py", line 668, in trigger_events
    loop.run_until_complete(result)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\asyncio\base_events.py", line 583, in run_until_complete
    return future.result()
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\run.py", line 244, in load_agent_on_start
    _broker = EventBroker.create(endpoints.event_broker)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\brokers\broker.py", line 21, in create
    return _create_from_endpoint_config(obj)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\brokers\broker.py", line 70, in _create_from_endpoint_config
    broker = KafkaEventBroker.from_endpoint_config(endpoint_config)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\brokers\kafka.py", line 46, in from_endpoint_config
    return cls(broker_config.url, **broker_config.kwargs)
TypeError: __init__() got an unexpected keyword argument 'client_id'
Traceback (most recent call last):
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "c:\users\tejasuw\anaconda3\envs\venv\Scripts\rasa.exe\__main__.py", line 7, in <module>
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\__main__.py", line 91, in main
    cmdline_arguments.func(cmdline_arguments)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\cli\run.py", line 114, in run
    rasa.run(**vars(args))
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\run.py", line 56, in run
    **kwargs,
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\run.py", line 217, in serve_application
    endpoints.lock_store if endpoints else None
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\sanic\app.py", line 1167, in run
    serve(**server_settings)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\sanic\server.py", line 892, in serve
    trigger_events(before_start, loop)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\sanic\server.py", line 668, in trigger_events
    loop.run_until_complete(result)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\asyncio\base_events.py", line 583, in run_until_complete
    return future.result()
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\run.py", line 244, in load_agent_on_start
    _broker = EventBroker.create(endpoints.event_broker)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\brokers\broker.py", line 21, in create
    return _create_from_endpoint_config(obj)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\brokers\broker.py", line 70, in _create_from_endpoint_config
    broker = KafkaEventBroker.from_endpoint_config(endpoint_config)
  File "c:\users\tejasuw\anaconda3\envs\venv\lib\site-packages\rasa\core\brokers\kafka.py", line 46, in from_endpoint_config
    return cls(broker_config.url, **broker_config.kwargs)
TypeError: __init__() got an unexpected keyword argument 'client_id'
sys:1: RuntimeWarning: coroutine 'BaseEventLoop.create_server' was never awaited```

You need a running kafka server, and then you need to provide it in the endpoints. I don’t think you actually need the python script in this case.