here is my component
it is in a python package
from __future__ import annotations
import logging
import typing
import warnings
from typing import Any, Dict, List, Optional, Text, Tuple, Type
import numpy as np
import re
import string
import rasa.shared.utils.io
import rasa.utils.io as io_utils
import sklearn
from rasa.engine.graph import GraphComponent, ExecutionContext
from rasa.engine.recipes.default_recipe import DefaultV1Recipe
from rasa.engine.storage.resource import Resource
from rasa.engine.storage.storage import ModelStorage
from rasa.shared.constants import DOCS_URL_TRAINING_DATA_NLU
from rasa.nlu.classifiers import LABEL_RANKING_LENGTH
from rasa.shared.nlu.constants import TEXT,METADATA_EXAMPLE
from rasa.nlu.classifiers.classifier import IntentClassifier
from rasa.shared.nlu.training_data.training_data import TrainingData
from rasa.shared.nlu.training_data.message import Message
from sklearn.preprocessing import LabelEncoder
from rasa.nlu.classifiers.diet_classifier import DIETClassifier
logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
import keras
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import GlobalMaxPooling1D
import nltk
from nltk.tokenize import word_tokenize
from sklearn.model_selection import train_test_split
from keras.layers.embeddings import Embedding
@DefaultV1Recipe.register(
DefaultV1Recipe.ComponentType.INTENT_CLASSIFIER, is_trainable=True
)
class KerasEmbeddingIntentClassifier(GraphComponent, IntentClassifier):
@classmethod
def required_components(cls) -> List[Type]:
"""Components that should be included in the pipeline before this component."""
return []
@staticmethod
def get_default_config() -> Dict[Text, Any]:
"""The component's default config (see parent class for full docstring)."""
return {
"max_len" : 1000,
"embedding_dim" : 200,
"trunc_type" : 'post',
"padding_type" : 'post',
"optimizer":"adam",
"loss":"sparse_categorical_crossentropy",
"metrics":['accuracy'],
"activation_1":'relu',
"activation_2":'softmax',
"epochs":400,
"verbose":2,
"num_threads": 1
}
def __init__(
self,
config: Dict[Text, Any],
model_storage: ModelStorage,
resource: Resource,
clf: "keras.models.Sequential" = None,
le: "sklearn.preprocessing.LabelEncoder" = None,
fe:"keras.preprocessing.text.Tokenizer"=None
) -> None:
from keras.preprocessing.text import Tokenizer
from sklearn.preprocessing import LabelEncoder
self.component_config = config
self._model_storage = model_storage
self._resource = resource
if fe is not None:
self.fe=fe
rasa.shared.utils.io.raise_warning(f"feature tokenizer is not none:{self.fe.word_index}")
else:
self.fe=Tokenizer()
if le is not None:
self.le = le
else:
self.le = LabelEncoder()
self.clf = clf
@classmethod
def create(
cls,
config: Dict[Text, Any],
model_storage: ModelStorage,
resource: Resource,
execution_context: ExecutionContext,
) -> KerasEmbeddingIntentClassifier:
"""Creates a new untrained component (see parent class for full docstring)."""
return cls(config, model_storage, resource)
@classmethod
def required_packages(cls) -> List[Text]:
"""Any extra python dependencies required for this component to run."""
return ["keras","sklearn","nltk"]
def transform_labels_str2num_seq(self, labels: List[Text]) -> np.ndarray:
"""Transforms a list of strings into numeric label representation.
:param labels: List of labels to convert to numeric representation
"""
return self.le.fit_transform(labels)
def transform_nlu_examples_str2num(self,X)-> list:
self.fe.fit_on_texts(X)
return self.fe.texts_to_sequences(X)
def pad_nlu_examples(self,X)-> np.ndarray:
from keras.preprocessing.sequence import pad_sequences
return pad_sequences(X,padding=self.component_config["padding_type"],maxlen=self.component_config['max_len'])
def clean_text(self,text):
import nltk
from nltk.tokenize import word_tokenize
import string
text = text.lower()
# tokenize the text
tokens = word_tokenize(text)
tokens = [w.lower() for w in tokens]
# remove puntuations
table = str.maketrans('', '', string.punctuation)
stripped = [w.translate(table) for w in tokens]
return ' '.join(stripped)
def train(self, training_data: TrainingData) -> Resource:
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint
from nltk.tokenize import word_tokenize
num_threads = self.component_config["num_threads"]
labels = [e.get("intent") for e in training_data.intent_examples]
examples=[e.get(TEXT) for e in training_data.intent_examples]
rasa.shared.utils.io.raise_warning(f"examples{examples}")
cleaned_x=[self.clean_text(t) for t in examples]
rasa.shared.utils.io.raise_warning(f"cleaned:{cleaned_x}")
tokenized_cleaned_x=[word_tokenize(t) for t in cleaned_x]
rasa.shared.utils.io.raise_warning(f"tokenized cleaned{tokenized_cleaned_x}")
if len(set(labels)) < 2:
rasa.shared.utils.io.raise_warning(
"Can not train an intent classifier as there are not "
"enough intents. Need at least 2 different intents. "
"Skipping training of intent classifier.",
docs=DOCS_URL_TRAINING_DATA_NLU,
)
return self._resource
y = self.transform_labels_str2num_seq(labels)
X=self.transform_nlu_examples_str2num(tokenized_cleaned_x)
rasa.shared.utils.io.raise_warning(f"y{y}")
rasa.shared.utils.io.raise_warning(f"x:{X}")
X_padded=self.pad_nlu_examples(X)
trainX, testX, trainY, testY = train_test_split(X_padded, y, test_size=0.2, random_state=42)
self.clf = self._create_classifier(num_threads, y)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
checkpoint = ModelCheckpoint('model-{epoch:03d}-{val_accuracy:03f}.h5', verbose=1, monitor='val_acc',
save_best_only=True, mode='auto')
self.clf.fit(trainX,trainY,epochs=self.component_config['epochs'],validation_data=(testX,testY),callbacks=[checkpoint],verbose=self.component_config['verbose'])
self.clf.summary()
self.persist()
return self._resource
def glove_embedding_matrix(self):
import os
embedding_index = {}
glove_dir = "C:\\Users\\maral\\PycharmProjects\\FSKTMBot\\wordEmbeddingClassifier\\pretrained_glove"
f = open(os.path.join(glove_dir, 'glove.twitter.27B.200d.txt'), encoding="utf-8")
for line in f:
values = line.split()
word = values[0]
coeff = np.asarray(values[1:], dtype='float32')
embedding_index[word] = coeff
f.close()
embedding_matrix = np.zeros((len(self.fe.word_index)+1, self.component_config["embedding_dim"]))
for word, i in self.fe.word_index.items():
embedding_vector = embedding_index.get(word)
if embedding_vector is not None:
embedding_matrix[i] = embedding_vector
return embedding_matrix
def _create_classifier(
self, num_threads: int, y: np.ndarray
) -> "keras.models.Sequential":
from keras.models import Sequential
from keras.layers import Dense,GlobalMaxPooling1D,Dropout
from keras.layers.embeddings import Embedding
embedding_matrix=self.glove_embedding_matrix()
model = Sequential()
model.add(Embedding(len(self.fe.word_index)+1,
self.component_config["embedding_dim"],weights=[embedding_matrix], input_length=self.component_config["max_len"],trainable=False))
model.add(GlobalMaxPooling1D())
model.add(Dense(self.component_config["embedding_dim"],activation=self.component_config["activation_1"]))
model.add(Dense(50,activation=self.component_config["activation_1"]))
model.add(Dropout(0.5))
model.add(Dense(18, activation=self.component_config["activation_2"]))
# compile the model
model.compile(optimizer=self.component_config['optimizer'],
loss=self.component_config['loss'], metrics=self.component_config['metrics'])
return model
def predict_prob(self, X: np.ndarray) -> np.ndarray:
"""Given a bow vector of an input text, predict the intent label.
Return probabilities for all labels.
:param X: bow of input text
:return: vector of probabilities containing one entry for each label.
"""
rasa.shared.utils.io.raise_warning(f'predict_proba{(self.clf.predict(X))}')
return self.clf.predict(X)
def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Given a bow vector of an input text, predict most probable label.
Return only the most likely label.
:param X: bow of input text
:return: tuple of first, the most probable label and second,
its probability.
"""
pred_result = self.predict_prob(X)
# sort the probabilities retrieving the indices of
# the elements in sorted order
sorted_indices = np.fliplr(np.argsort(pred_result, axis=1))
rasa.shared.utils.io.raise_warning(f"sorted_indicies_type:{type(sorted_indices)}")
return sorted_indices, pred_result[:, sorted_indices]
def transform_labels_num2str(self,intent_ids)->np.ndarray:
return self.le.inverse_transform(intent_ids[0])
def process(self, messages: List[Message]) -> List[Message]:
"""Return the most likely intent and its probability for a message."""
from nltk.tokenize import word_tokenize
for message in messages:
if not self.clf:
# component is either not trained or didn't
# receive enough training data
intent = None
intent_ranking = []
else:
msg =str(message.get(TEXT))
rasa.shared.utils.io.raise_warning(f"message:{msg}")
msg=[self.clean_text(msg)]
msg=[word_tokenize(w) for w in msg]
msg=self.fe.texts_to_sequences(msg)
rasa.shared.utils.io.raise_warning(f"message_trans:{msg}")
padded_X=self.pad_nlu_examples(msg)
rasa.shared.utils.io.raise_warning(f"message_padded:{padded_X}")
intent_ids, probabilities = self.predict(padded_X)
rasa.shared.utils.io.raise_warning(f"intent_ids:{intent_ids}")
intents = self.transform_labels_num2str(intent_ids)
rasa.shared.utils.io.raise_warning(f"intents:{intents}")
# `predict` returns a matrix as it is supposed
# to work for multiple examples as well, hence we need to flatten
probabilities = probabilities.flatten().astype('float64')
rasa.shared.utils.io.raise_warning(f"flatten prob:{probabilities}")
if intents.size > 0 and probabilities.size > 0:
ranking = list(zip(list(intents), list(probabilities)))[
:LABEL_RANKING_LENGTH
]
intent = {"name": intents[0], "confidence": probabilities[0]}
intent_ranking = [
{"name": intent_name, "confidence": score}
for intent_name, score in ranking
]
else:
intent = {"name": None, "confidence": 0.0}
intent_ranking = []
rasa.shared.utils.io.raise_warning(f"probabilites item type:{type(probabilities[0])}")
message.set("intent", intent, add_to_output=True)
message.set("intent_ranking", intent_ranking, add_to_output=True)
return messages
def persist(self) -> None:
"""Persist this model into the passed directory."""
with self._model_storage.write_to(self._resource) as model_dir:
file_name = self.__class__.__name__
classifier_file_name = model_dir / f"{file_name}_classifier.h5"
encoder_file_name = model_dir / f"{file_name}_labels_encoder.pkl"
feature_encoder_file_name=model_dir/f"{file_name}_features_encoder.pkl"
if self.clf and self.le:
io_utils.json_pickle(encoder_file_name, self.le.classes_)
io_utils.json_pickle(feature_encoder_file_name,self.fe.word_index)
self.clf.save(classifier_file_name)
@classmethod
def load(
cls,
config: Dict[Text, Any],
model_storage: ModelStorage,
resource: Resource,
execution_context: ExecutionContext,
**kwargs: Any,
) -> KerasEmbeddingIntentClassifier:
"""Loads trained component (see parent class for full docstring)."""
from keras.preprocessing.text import Tokenizer
from keras.models import load_model
from sklearn.preprocessing import LabelEncoder
try:
with model_storage.read_from(resource) as model_dir:
file_name = cls.__name__
classifier_file = model_dir / f"{file_name}_classifier.h5"
rasa.shared.utils.io.raise_warning(f"classifier_file:{(classifier_file)}")
if classifier_file.exists():
rasa.shared.utils.io.raise_warning(f"classifier_file_existance:{(classifier_file.exists())}")
classifier = load_model(classifier_file,compile=True)
rasa.shared.utils.io.raise_warning(f"classifier:{classifier}")
encoder_file = model_dir / f"{file_name}_labels_encoder.pkl"
features_encoder_file=model_dir / f"{file_name}_features_encoder.pkl"
labels = io_utils.json_unpickle(encoder_file)
features=io_utils.json_unpickle(features_encoder_file)
labels_encoder = LabelEncoder()
labels_encoder.classes_=labels
rasa.shared.utils.io.raise_warning(f"label_encoder_index:{labels_encoder.classes_}")
features_encoder=Tokenizer()
# rasa.shared.utils.io.raise_warning(f"training_data:{training_data}")
features_encoder.fit_on_texts(features)
rasa.shared.utils.io.raise_warning(f"features index:{features_encoder.word_index}")
return cls(config, model_storage, resource, classifier, labels_encoder,features_encoder)
except ValueError:
logger.debug(
f"Failed to load '{cls.__name__}' from model storage. Resource "
f"'{resource.name}' doesn't exist."
)
return cls(config, model_storage, resource)