RASA Core integration issue with Kafka event broker - "Error - NoBrokersAvailable"

Hello,

I’m receiving “kafka.errors.NoBrokersAvailable: NoBrokersAvailable” error while trying to connect to Kafka event broker from RASA Core server. The RasaX server seems to connect fine.

So basically the consumer service (RasaX) can connect to Kafka broker successfully but the producer server (Rasa Core) is failing to connect to Kafka broker.

The endpoint and environment variables configuration is the same for both Rasa Core and RasaX servers.

Attached Rasa Core server (primary server) log, RasaX server log and RasaX server debug logs. RasaCoreLog.txt (17.1 KB) RasaXServerDebugLog.txt (82.0 KB) RasaXServerLog.txt (2.0 KB)

Endpoint configuration:-

event_broker:
url: ${KAFKA_HOST}
sasl_username: ${SASL_USERNAME}
sasl_password: ${SASL_PASSWORD}
topic: ${KAFKA_TOPIC}
security_protocol: ${KAFKA_SECURITY_PROTOCOL}
type: kafka

RasaCore primary server & RasaX environment variables:-

- KAFKA_HOST=${KAFKA_HOST:-kafka:9092}
- KAFKA_TOPIC=${KAFKA_TOPIC:-rasa_core_events}
- KAFKA_SECURITY_PROTOCOL=${KAFKA_SECURITY_PROTOCOL:-SASL_PLAINTEXT}
- SASL_USERNAME=${SASL_USERNAME:-test}
- SASL_PASSWORD=${SASL_PASSWORD:-test}

Kafka docker container:-

kafka:
     image: confluentinc/cp-kafka:5.3.1
     container_name: gds_kafka
     ports:
         - "9092:9092"
     environment:
         KAFKA_BROKER_ID: 1
         KAFKA_ADVERTISED_HOST_NAME: ${KAFKA_ADVERTISED_HOST_NAME:-kafka}
         KAFKA_CREATE_TOPICS: "rasa_core_events:1:1"
         KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181           
         KAFKA_SASL_USERNAME: test
         KAFKA_SASL_PASSWORD: test
         KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${DOCKER_HOST_NAME:-kafka}:9092
         KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Any suggestions would be greatly appreciated.

Thanks Hari

Are you sure the value for KAFKA_HOST is set for the rasa-production container?

@stephens Thanks for your reply. The KAFKA_HOST is set for rasa-production, rasa-worker and RasaX.

The problem is RASA X and RASA core are not configured to use the same security protocol. There is a mismatch between the code for the two components (RasaX security_protocol=“PLAINTEXT” vs RasaCore security_protocol=“SASL_PLAINTEXT”).

Rasa core server is not working If Kafka server is configured with PLAINTEXT and RasaX is not working if Kafka server is configured with SASL_PLAINTEXT.

This requires fix in code. It would be great if RASA team address this issue. The preferable fix is to add an else condition to RasaX consumer code to send security_protocol =SASL_PLAINTEXT which is secure than PLAINTEXT protocol for the Production environment.

Please note I’ve verified the latest image 0.32.0a1 and the mismatch is still there.

rasax\community\services\event_consumers\kafka_consumer.py

def _create_consumer(self) -> None:
	# noinspection PyPackageRequirements
	import kafka

	if "plaintext" in self.security_protocol.lower():
	    self.consumer = kafka.KafkaConsumer(
		self.topic,
		bootstrap_servers=self.host,
		security_protocol="PLAINTEXT",
		sasl_mechanism="PLAIN",
		sasl_plain_username=self.sasl_username,
		sasl_plain_password=self.sasl_password,
		ssl_check_hostname=False,
	    )
	elif "ssl" in self.security_protocol.lower():
	    self.consumer = kafka.KafkaConsumer(
		self.topic,
		bootstrap_servers=self.host,
		security_protocol="SSL",
		ssl_cafile=self.ssl_cafile,
		ssl_certfile=self.ssl_certfile,
		ssl_keyfile=self.ssl_keyfile,
		ssl_check_hostname=self.ssl_check_hostname,
	    )
	else:
	    raise ValueError(
		f"Cannot initialise `kafka.KafkaConsumer` "
		f"with security protocol '{self.security_protocol}'."
	    )

rasa\core\brokers\kafka.py

def _create_producer(self) -> None:
	import kafka

	if self.security_protocol == "SASL_PLAINTEXT":
	    self.producer = kafka.KafkaProducer(
		bootstrap_servers=[self.host],
		value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
		sasl_plain_username=self.sasl_username,
		sasl_plain_password=self.sasl_password,
		sasl_mechanism="PLAIN",
		security_protocol=self.security_protocol,
	    )
	elif self.security_protocol == "SSL":
	    self.producer = kafka.KafkaProducer(
		bootstrap_servers=[self.host],
		value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
		ssl_cafile=self.ssl_cafile,
		ssl_certfile=self.ssl_certfile,
		ssl_keyfile=self.ssl_keyfile,
		ssl_check_hostname=False,
		security_protocol=self.security_protocol,
	    )

@stephens I’ve manually updated consumer code in RasaX service container to support SASL_PLAINTEXT protocol and then everything works fine. It is a simple fix to RasaX code. All we need is an elif condition below to support SASL_PLAINTEXT protocol. It would be great if Rasa team consider this fix in next release. I’m not sure if I can submit a PR request as RasaX is not OSS. Please let me know if there is any way that I can submit a PR.

rasax\community\services\event_consumers\kafka_consumer.py

def _create_consumer(self) -> None:
	# noinspection PyPackageRequirements
	import kafka

	elif "sasl_plaintext" in self.security_protocol.lower():
	    self.consumer = kafka.KafkaConsumer(
		self.topic,
		bootstrap_servers=self.host,
		security_protocol="SASL_PLAINTEXT",
		sasl_mechanism="PLAIN",
		sasl_plain_username=self.sasl_username,
		sasl_plain_password=self.sasl_password,
		ssl_check_hostname=False,
	    )

Hi @hari. Would you be able to create an issue for this on GitHub, so we can get it fixed?

@tyd submitted an issue on GitHub and here is the link Update RasaX Kafka event broker consumer code to support SSAL_PLAINTEXT protocol · Issue #6539 · RasaHQ/rasa · GitHub

I created the issue by following your link and it is showing under rasa issues. Please let me know if I have to submit on different page for RasaX. Please note the code needs to be fixed in RasaX.

Thanks Hari

@hari Thanks! That works. All Rasa Open Source and Rasa X issues from the community belong in that repo

@tyd Thank you. Hope it will be fixed soon.

Just an update, This was fixed in version 0.32.2