Hi community,
I am trying to build a chatbot for my degree final year project. and I am suppose to design an intent classification myself. so I need to create a custom component in rasa nlu pipeline instead of Diet or any other component in rasa. So far I am using an embedding sequential keras model which summary of my model is as below. eventho its accuracy is not quiet good but I have to make this work. so when I train the rasa it trains and saves my model however when run the shell my model is not responsive and I believe it is because that I am not saving the model or pickling the model in correct way in def persist() I would like to know if any one can help me on this.
I was inspired by sklearn classifier component in rasa to make my own component and it is working to some extend. my code is as below:
from __future__ import annotations
import logging
import typing
import warnings
from typing import Any, Dict, List, Optional, Text, Tuple, Type
import numpy as np
import rasa.shared.utils.io
import rasa.utils.io as io_utils
from rasa.nlu.featurizers.dense_featurizer.dense_featurizer import DenseFeaturizer
from rasa.engine.graph import GraphComponent, ExecutionContext
from rasa.engine.recipes.default_recipe import DefaultV1Recipe
from rasa.engine.storage.resource import Resource
from rasa.engine.storage.storage import ModelStorage
from rasa.shared.constants import DOCS_URL_TRAINING_DATA_NLU
from rasa.nlu.classifiers import LABEL_RANKING_LENGTH
from rasa.shared.nlu.constants import TEXT,METADATA_EXAMPLE
from rasa.nlu.classifiers.classifier import IntentClassifier
from rasa.shared.nlu.training_data.training_data import TrainingData
from rasa.shared.nlu.training_data.message import Message
from rasa.nlu.featurizers.sparse_featurizer.count_vectors_featurizer import CountVectorsFeaturizer
from rasa.nlu.featurizers.dense_featurizer.mitie_featurizer import MitieFeaturizer
logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
    import keras
    from keras.preprocessing.sequence import pad_sequences
    from keras.models import Sequential
    from keras.layers import Dense
    from keras.layers import GlobalAveragePooling1D
    from keras.layers.embeddings import Embedding
@DefaultV1Recipe.register(
    DefaultV1Recipe.ComponentType.INTENT_CLASSIFIER, is_trainable=True
)
class KerasEmbeddingIntentClassifier(GraphComponent, IntentClassifier):
    @classmethod
    def required_components(cls) -> List[Type]:
        """Components that should be included in the pipeline before this component."""
        return []
    @staticmethod
    def get_default_config() -> Dict[Text, Any]:
        """The component's default config (see parent class for full docstring)."""
        return {
            "vocab_size":3000,
            "max_len" : 2000,
            "embedding_dim" : 20,
            "trunc_type" : 'post',
            "padding_type" : 'post',
            "oov_tok" : '<OOV>',
            "optimizer":"adam",
            "loss":"sparse_categorical_crossentropy",
            "metrics":['accuracy'],
            "activation_1":'relu',
            "activation_2":'softmax',
            "epochs":1000,
            "verbose":2,
            "num_threads": 1
        }
    def __init__(
            self,
            config: Dict[Text, Any],
            model_storage: ModelStorage,
            resource: Resource,
            clf: "keras.models.Sequential" = None,
            le: "keras.preprocessing.text.Tokenizer" = None,
    ) -> None:
        from keras.preprocessing.text import Tokenizer
        self.component_config = config
        self._model_storage = model_storage
        self._resource = resource
        if le is not None:
            self.le = le
        else:
            self.le = Tokenizer()
        self.clf = clf
    @classmethod
    def create(
        cls,
        config: Dict[Text, Any],
        model_storage: ModelStorage,
        resource: Resource,
        execution_context: ExecutionContext,
    ) -> KerasEmbeddingIntentClassifier:
        """Creates a new untrained component (see parent class for full docstring)."""
        return cls(config, model_storage, resource)
    @classmethod
    def required_packages(cls) -> List[Text]:
        """Any extra python dependencies required for this component to run."""
        return ["keras"]
    def transform_labels_str2num_seq(self, labels: List[Text]) -> np.ndarray:
        """Transforms a list of strings into numeric label representation.
        :param labels: List of labels to convert to numeric representation
        """
        self.le.fit_on_texts(labels)
        return  np.array(self.le.texts_to_sequences(labels))
    def transform_labels_numSeq2str(self, labels: List[Text]) -> np.ndarray:
        labels_num2str=self.le.sequences_to_texts(labels)
        return np.array(labels_num2str[0].split(" "))
    def transform_nlu_examples_str2num(self,X)-> list:
        from keras.preprocessing.text import Tokenizer
        nlu_tokenizer=Tokenizer(self.component_config['vocab_size'],oov_token=self.component_config['oov_tok'])
        nlu_tokenizer.fit_on_texts(X)
        return nlu_tokenizer.texts_to_sequences(X)
    def pad_nlu_examples(self,X)-> np.ndarray:
        from keras.preprocessing.sequence import pad_sequences
        return pad_sequences(X,padding=self.component_config["padding_type"],maxlen=self.component_config['max_len'])
    def train(self, training_data: TrainingData) -> Resource:
        num_threads = self.component_config["num_threads"]
        labels = [e.get("intent") for e in training_data.intent_examples]
        x_train=[e.get(TEXT) for e in training_data.intent_examples]
        # x_train=training_data.sanitize_examples(x_train)
        # ss=list()
        # for e in training_data.training_examples:
        #     sequence_feature, sentence_features = e.get_dense_features(TEXT)
        #     rasa.shared.utils.io.raise_warning(f"seq:{sentence_features.features}")
        #     ss.append(sequence_feature.features[0])
        # examples=np.stack([e.get_sparse_features(TEXT) for e in training_data.intent_examples])
        if len(set(labels)) < 2:
            rasa.shared.utils.io.raise_warning(
                "Can not train an intent classifier as there are not "
                "enough intents. Need at least 2 different intents. "
                "Skipping training of intent classifier.",
                docs=DOCS_URL_TRAINING_DATA_NLU,
            )
            return self._resource
        # rasa.shared.utils.io.raise_warning(f"y_train:{labels}")
        # rasa.shared.utils.io.raise_warning(f"x_train:{x_train}")
        # for example in training_data.intent_examples:
        #     examples.append((self._get_sentence_features(example)).tolist())
        y = self.transform_labels_str2num_seq(labels)
        X=self.transform_nlu_examples_str2num(x_train)
        rasa.shared.utils.io.raise_warning(f"y_train:{y}")
        # rasa.shared.utils.io.raise_warning(f"x_train:{X}")
        X_padded=self.pad_nlu_examples(X)
        self.clf = self._create_classifier(num_threads, y)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            self.clf.fit(X_padded, y,epochs=self.component_config['epochs'],verbose=self.component_config['verbose'])
        self.persist()
        return self._resource
    def _create_classifier(
            self, num_threads: int, y: np.ndarray
    ) -> "keras.models.Sequential":
        from keras.models import Sequential
        from keras.layers import Dense
        from keras.layers import GlobalAveragePooling1D
        from keras.layers.embeddings import Embedding
        model = Sequential()
        model.add(Embedding(self.component_config["vocab_size"],
                            self.component_config["embedding_dim"], input_length=self.component_config["max_len"]))
        model.add(GlobalAveragePooling1D())
        model.add(Dense(self.component_config["embedding_dim"], activation=self.component_config["activation_1"]))
        model.add(Dense(y.size + 1, activation=self.component_config["activation_2"]))
        # compile the model
        model.compile(optimizer=self.component_config['optimizer'],
                      loss=self.component_config['loss'], metrics=self.component_config['metrics'])
        return model
    def predict_prob(self, X: np.ndarray) -> np.ndarray:
        """Given a bow vector of an input text, predict the intent label.
        Return probabilities for all labels.
        :param X: bow of input text
        :return: vector of probabilities containing one entry for each label.
        """
        return self.clf.predict_proba(X)
    def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Given a bow vector of an input text, predict most probable label.
        Return only the most likely label.
        :param X: bow of input text
        :return: tuple of first, the most probable label and second,
                 its probability.
        """
        pred_result = self.predict_prob(X)
        # sort the probabilities retrieving the indices of
        # the elements in sorted order
        sorted_indices = np.fliplr(np.argsort(pred_result, axis=1))
        return sorted_indices, pred_result[:, sorted_indices]
    def process(self, messages: List[Message]) -> List[Message]:
        """Return the most likely intent and its probability for a message."""
        for message in messages:
            if not self.clf:
                # component is either not trained or didn't
                # receive enough training data
                intent = None
                intent_ranking = []
            else:
                X = message.get(TEXT)
                X=self.transform_nlu_examples_str2num(X)
                padded_X=self.pad_nlu_examples(X)
                intent_ids, probabilities = self.predict(padded_X)
                intents = self.transform_labels_num2str(intent_ids)
                # `predict` returns a matrix as it is supposed
                # to work for multiple examples as well, hence we need to flatten
                probabilities = probabilities.flatten()
                if intents.size > 0 and probabilities.size > 0:
                    ranking = list(zip(list(intents), list(probabilities)))[
                        :LABEL_RANKING_LENGTH
                    ]
                    intent = {"name": intents[0], "confidence": probabilities[0]}
                    intent_ranking = [
                        {"name": intent_name, "confidence": score}
                        for intent_name, score in ranking
                    ]
                else:
                    intent = {"name": None, "confidence": 0.0}
                    intent_ranking = []
            message.set("intent", intent, add_to_output=True)
            message.set("intent_ranking", intent_ranking, add_to_output=True)
        return messages
    def persist(self) -> None:
        """Persist this model into the passed directory."""
        with self._model_storage.write_to(self._resource) as model_dir:
            file_name = self.__class__.__name__
            classifier_file_name = model_dir / f"{file_name}_classifier.pkl"
            encoder_file_name = model_dir / f"{file_name}_encoder.pkl"
            if self.clf and self.le:
                io_utils.json_pickle(encoder_file_name, self.le.index_word)
                io_utils.json_pickle(classifier_file_name, self.clf.weights)
    @classmethod
    def load(
            cls,
            config: Dict[Text, Any],
            model_storage: ModelStorage,
            resource: Resource,
            execution_context: ExecutionContext,
            **kwargs: Any,
    ) -> KerasEmbeddingIntentClassifier:
        """Loads trained component (see parent class for full docstring)."""
        from keras.preprocessing.text import Tokenizer
        try:
            with model_storage.read_from(resource) as model_dir:
                file_name = cls.__name__
                classifier_file = model_dir / f"{file_name}_classifier.pkl"
                if classifier_file.exists():
                    classifier = io_utils.json_unpickle(classifier_file)
                    encoder_file = model_dir / f"{file_name}_encoder.pkl"
                    classes = io_utils.json_unpickle(encoder_file)
                    encoder = Tokenizer()
                    encoder.word_index= classes
                    return cls(config, model_storage, resource, classifier, encoder, )
        except ValueError:
            logger.debug(
                f"Failed to load '{cls.__name__}' from model storage. Resource "
                f"'{resource.name}' doesn't exist."
            )
        return cls(config, model_storage, resource)
