Custom nlu pipeline component

Hi community,

I am trying to build a chatbot for my degree final year project. and I am suppose to design an intent classification myself. so I need to create a custom component in rasa nlu pipeline instead of Diet or any other component in rasa. So far I am using an embedding sequential keras model which summary of my model is as below. eventho its accuracy is not quiet good but I have to make this work. so when I train the rasa it trains and saves my model however when run the shell my model is not responsive and I believe it is because that I am not saving the model or pickling the model in correct way in def persist() I would like to know if any one can help me on this.

I was inspired by sklearn classifier component in rasa to make my own component and it is working to some extend. my code is as below:

from __future__ import annotations
import logging
import typing
import warnings
from typing import Any, Dict, List, Optional, Text, Tuple, Type

import numpy as np

import rasa.shared.utils.io
import rasa.utils.io as io_utils
from rasa.nlu.featurizers.dense_featurizer.dense_featurizer import DenseFeaturizer
from rasa.engine.graph import GraphComponent, ExecutionContext
from rasa.engine.recipes.default_recipe import DefaultV1Recipe
from rasa.engine.storage.resource import Resource
from rasa.engine.storage.storage import ModelStorage
from rasa.shared.constants import DOCS_URL_TRAINING_DATA_NLU
from rasa.nlu.classifiers import LABEL_RANKING_LENGTH
from rasa.shared.nlu.constants import TEXT,METADATA_EXAMPLE
from rasa.nlu.classifiers.classifier import IntentClassifier
from rasa.shared.nlu.training_data.training_data import TrainingData
from rasa.shared.nlu.training_data.message import Message
from rasa.nlu.featurizers.sparse_featurizer.count_vectors_featurizer import CountVectorsFeaturizer
from rasa.nlu.featurizers.dense_featurizer.mitie_featurizer import MitieFeaturizer
logger = logging.getLogger(__name__)

if typing.TYPE_CHECKING:
    import keras
    from keras.preprocessing.sequence import pad_sequences
    from keras.models import Sequential
    from keras.layers import Dense
    from keras.layers import GlobalAveragePooling1D
    from keras.layers.embeddings import Embedding



@DefaultV1Recipe.register(
    DefaultV1Recipe.ComponentType.INTENT_CLASSIFIER, is_trainable=True
)
class KerasEmbeddingIntentClassifier(GraphComponent, IntentClassifier):
    @classmethod
    def required_components(cls) -> List[Type]:
        """Components that should be included in the pipeline before this component."""
        return []

    @staticmethod
    def get_default_config() -> Dict[Text, Any]:
        """The component's default config (see parent class for full docstring)."""
        return {
            "vocab_size":3000,
            "max_len" : 2000,
            "embedding_dim" : 20,
            "trunc_type" : 'post',
            "padding_type" : 'post',
            "oov_tok" : '<OOV>',
            "optimizer":"adam",
            "loss":"sparse_categorical_crossentropy",
            "metrics":['accuracy'],
            "activation_1":'relu',
            "activation_2":'softmax',
            "epochs":1000,
            "verbose":2,
            "num_threads": 1
        }

    def __init__(
            self,
            config: Dict[Text, Any],
            model_storage: ModelStorage,
            resource: Resource,
            clf: "keras.models.Sequential" = None,
            le: "keras.preprocessing.text.Tokenizer" = None,
    ) -> None:
        from keras.preprocessing.text import Tokenizer
        self.component_config = config
        self._model_storage = model_storage
        self._resource = resource

        if le is not None:
            self.le = le
        else:
            self.le = Tokenizer()
        self.clf = clf
    @classmethod
    def create(
        cls,
        config: Dict[Text, Any],
        model_storage: ModelStorage,
        resource: Resource,
        execution_context: ExecutionContext,
    ) -> KerasEmbeddingIntentClassifier:
        """Creates a new untrained component (see parent class for full docstring)."""
        return cls(config, model_storage, resource)

    @classmethod
    def required_packages(cls) -> List[Text]:
        """Any extra python dependencies required for this component to run."""
        return ["keras"]

    def transform_labels_str2num_seq(self, labels: List[Text]) -> np.ndarray:
        """Transforms a list of strings into numeric label representation.

        :param labels: List of labels to convert to numeric representation
        """

        self.le.fit_on_texts(labels)
        return  np.array(self.le.texts_to_sequences(labels))
    def transform_labels_numSeq2str(self, labels: List[Text]) -> np.ndarray:
        labels_num2str=self.le.sequences_to_texts(labels)
        return np.array(labels_num2str[0].split(" "))

    def transform_nlu_examples_str2num(self,X)-> list:
        from keras.preprocessing.text import Tokenizer
        nlu_tokenizer=Tokenizer(self.component_config['vocab_size'],oov_token=self.component_config['oov_tok'])
        nlu_tokenizer.fit_on_texts(X)
        return nlu_tokenizer.texts_to_sequences(X)
    def pad_nlu_examples(self,X)-> np.ndarray:
        from keras.preprocessing.sequence import pad_sequences
        return pad_sequences(X,padding=self.component_config["padding_type"],maxlen=self.component_config['max_len'])
    def train(self, training_data: TrainingData) -> Resource:
        num_threads = self.component_config["num_threads"]
        labels = [e.get("intent") for e in training_data.intent_examples]

        x_train=[e.get(TEXT) for e in training_data.intent_examples]

        # x_train=training_data.sanitize_examples(x_train)

        # ss=list()
        # for e in training_data.training_examples:
        #     sequence_feature, sentence_features = e.get_dense_features(TEXT)
        #     rasa.shared.utils.io.raise_warning(f"seq:{sentence_features.features}")
        #     ss.append(sequence_feature.features[0])
        # examples=np.stack([e.get_sparse_features(TEXT) for e in training_data.intent_examples])

        if len(set(labels)) < 2:
            rasa.shared.utils.io.raise_warning(
                "Can not train an intent classifier as there are not "
                "enough intents. Need at least 2 different intents. "
                "Skipping training of intent classifier.",
                docs=DOCS_URL_TRAINING_DATA_NLU,
            )
            return self._resource
        # rasa.shared.utils.io.raise_warning(f"y_train:{labels}")
        # rasa.shared.utils.io.raise_warning(f"x_train:{x_train}")
        # for example in training_data.intent_examples:
        #     examples.append((self._get_sentence_features(example)).tolist())
        y = self.transform_labels_str2num_seq(labels)

        X=self.transform_nlu_examples_str2num(x_train)
        rasa.shared.utils.io.raise_warning(f"y_train:{y}")
        # rasa.shared.utils.io.raise_warning(f"x_train:{X}")
        X_padded=self.pad_nlu_examples(X)


        self.clf = self._create_classifier(num_threads, y)
        with warnings.catch_warnings():

            warnings.simplefilter("ignore")

            self.clf.fit(X_padded, y,epochs=self.component_config['epochs'],verbose=self.component_config['verbose'])

        self.persist()
        return self._resource
    def _create_classifier(
            self, num_threads: int, y: np.ndarray
    ) -> "keras.models.Sequential":
        from keras.models import Sequential
        from keras.layers import Dense
        from keras.layers import GlobalAveragePooling1D
        from keras.layers.embeddings import Embedding
        model = Sequential()
        model.add(Embedding(self.component_config["vocab_size"],
                            self.component_config["embedding_dim"], input_length=self.component_config["max_len"]))
        model.add(GlobalAveragePooling1D())
        model.add(Dense(self.component_config["embedding_dim"], activation=self.component_config["activation_1"]))
        model.add(Dense(y.size + 1, activation=self.component_config["activation_2"]))
        # compile the model
        model.compile(optimizer=self.component_config['optimizer'],
                      loss=self.component_config['loss'], metrics=self.component_config['metrics'])
        return model
    def predict_prob(self, X: np.ndarray) -> np.ndarray:
        """Given a bow vector of an input text, predict the intent label.

        Return probabilities for all labels.

        :param X: bow of input text
        :return: vector of probabilities containing one entry for each label.
        """
        return self.clf.predict_proba(X)

    def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Given a bow vector of an input text, predict most probable label.

        Return only the most likely label.

        :param X: bow of input text
        :return: tuple of first, the most probable label and second,
                 its probability.
        """
        pred_result = self.predict_prob(X)
        # sort the probabilities retrieving the indices of
        # the elements in sorted order
        sorted_indices = np.fliplr(np.argsort(pred_result, axis=1))
        return sorted_indices, pred_result[:, sorted_indices]


    def process(self, messages: List[Message]) -> List[Message]:
        """Return the most likely intent and its probability for a message."""
        for message in messages:
            if not self.clf:
                # component is either not trained or didn't
                # receive enough training data
                intent = None
                intent_ranking = []
            else:
                X = message.get(TEXT)
                X=self.transform_nlu_examples_str2num(X)
                padded_X=self.pad_nlu_examples(X)
                intent_ids, probabilities = self.predict(padded_X)
                intents = self.transform_labels_num2str(intent_ids)
                # `predict` returns a matrix as it is supposed
                # to work for multiple examples as well, hence we need to flatten
                probabilities = probabilities.flatten()

                if intents.size > 0 and probabilities.size > 0:
                    ranking = list(zip(list(intents), list(probabilities)))[
                        :LABEL_RANKING_LENGTH
                    ]

                    intent = {"name": intents[0], "confidence": probabilities[0]}

                    intent_ranking = [
                        {"name": intent_name, "confidence": score}
                        for intent_name, score in ranking
                    ]
                else:
                    intent = {"name": None, "confidence": 0.0}
                    intent_ranking = []

            message.set("intent", intent, add_to_output=True)
            message.set("intent_ranking", intent_ranking, add_to_output=True)

        return messages
    def persist(self) -> None:
        """Persist this model into the passed directory."""
        with self._model_storage.write_to(self._resource) as model_dir:
            file_name = self.__class__.__name__
            classifier_file_name = model_dir / f"{file_name}_classifier.pkl"
            encoder_file_name = model_dir / f"{file_name}_encoder.pkl"

            if self.clf and self.le:
                io_utils.json_pickle(encoder_file_name, self.le.index_word)
                io_utils.json_pickle(classifier_file_name, self.clf.weights)

    @classmethod
    def load(
            cls,
            config: Dict[Text, Any],
            model_storage: ModelStorage,
            resource: Resource,
            execution_context: ExecutionContext,
            **kwargs: Any,
    ) -> KerasEmbeddingIntentClassifier:
        """Loads trained component (see parent class for full docstring)."""
        from keras.preprocessing.text import Tokenizer

        try:
            with model_storage.read_from(resource) as model_dir:
                file_name = cls.__name__
                classifier_file = model_dir / f"{file_name}_classifier.pkl"

                if classifier_file.exists():
                    classifier = io_utils.json_unpickle(classifier_file)

                    encoder_file = model_dir / f"{file_name}_encoder.pkl"
                    classes = io_utils.json_unpickle(encoder_file)
                    encoder = Tokenizer()
                    encoder.word_index= classes

                    return cls(config, model_storage, resource, classifier, encoder, )
        except ValueError:
            logger.debug(
                f"Failed to load '{cls.__name__}' from model storage. Resource "
                f"'{resource.name}' doesn't exist."
            )
        return cls(config, model_storage, resource)

Hello, Doesn’t seem like there is an obvious error in the component implementation but i notice you are persisting the weights of the classifier and when you load it back, does the same class has the predict_proba method? I remember that the keras model object has the predict_proba() method but i could be wrong. Does it give you any error?

actually just now i saved self.clf.state_updates from the keras model. and when I run rasa shell it actually loading the model however it is not doing any prediction the intent name is none and the confidence is 0. which the prediction is being done on message in def process()

Hey, did you add your project dir Python path ??

Print and debug the predict_proba() since you have the persist why not use model.save() of keras and use model.load() instead of pickling like sklearn.

I think you should follow how Keras Sequential API suggests to save and load model.

thank you so much… you were correct I just save and load model with keras and it is working fine with rasa when I run shell. however the model is not good at all… I tested and created this model on jupyter with another dataset to predict intents and the predictions was quite good then I decided to integrate with rasa which is not good at all :frowning:

Can you try it with same dataset in Rasa? It is possible the quality of data can dent your report. Try to run cross validation on this.

Also there seems to be a lot of parameters provided from the config.