Fetching only the latest events from the MongoDB Tracker Store

Hello there! I am currently working on Custom Tracker Store based on the MongoDB Tracker Store.

I would like to have the option to fetch only the latest events from my MongoDB instance, and not the whole array of events. By the latest I mean some kind of configuration 5 minutes or 10 minutes, or 1 hour. There is already an existing functionality in Rasa, which I was noted to in this topic, making the configuration of session length possible. But as I can see in the code here and here there are still some requests in the code, which are pulling the whole data from the MongoDB, not depending on my session configuration.

This kind of requests may start making problems when the dialogues with a single sender_id will get big, and we also don’t need all this data from days ago in our use case. So I implement functionality with timestamps, where I filter the events array with a timestamp that is greater than (actual time - session expiration time) (e.g. 5 minutes) Here is my code:

import itertools
from typing import Any, Dict, Iterator, List, Optional, Text
from rasa.core.brokers.broker import EventBroker
from rasa.core.tracker_store import MongoTrackerStore
import datetime
import calendar
import time
import logging

from rasa.shared.core.domain import Domain
from rasa.shared.core.events import SessionStarted
from rasa.shared.core.trackers import DialogueStateTracker, EventVerbosity

logger = logging.getLogger(__name__)

class MyMongoTrackerStore(MongoTrackerStore):

    def __init__(
        self,
        minutes: Optional[int] = 5, #last x minutes to fetch only the latest events from database
        **kwargs: Dict[Text, Any],
    ) -> None:
        self.minutes = minutes
        logger.info("CUSTOM TRACKER STORE with mongo")
        logger.info(f"{kwargs}")
        logger.info(f"minutes: {self.minutes}")
        super().__init__(**kwargs)


def save(self, tracker, timeout=None):

    """Saves the current conversation state."""
    if self.event_broker:
        self.stream_events(tracker)
    additional_events = self._additional_events(tracker)
    # logger.info(f"Additional events array to be pushed into db\n{list(additional_events)}")

    for e in additional_events:
        logger.info(f"{e.as_dict()}")

    

    self.conversations.update_one(
        {"sender_id": tracker.sender_id},
        {
            "$set": self._current_tracker_state_without_events(tracker),
            "$push": {
                "events": {"$each": [e.as_dict() for e in additional_events]}
            },
        },
        upsert=True,
    )

def retrieve(
    self, sender_id: Text,
) -> Optional[List[Dict[Text, Any]]]:

    logger.info(f"Session length for saved events is {self.minutes} minutes")

    timestamp = calendar.timegm(time.gmtime()) - 60*self.minutes
    logger.info(f"Generated timestamp {timestamp}")

    logger.info(
        f"Timestamp man-readable GMT +0 {datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')}")

    stored = self.conversations.aggregate([
        {"$match": {"sender_id": sender_id}},
        {"$project": {
            "_id": 0,
            "sender_id": 1,
            "active_loop": 1,
            "events":{
                "$filter": {
                    "input": "$events",
                    "as": "events",
                    "cond": {"$gte": ["$$events.timestamp", timestamp]}
                }
            },
            "followup_action": 1,
            "latest_action": 1,
            "latest_action_name": 1,
            "latest_event_time": 1,
            "latest_input_channel": 1,
            "latest_message": 1,
            "paused": 1,
            "slots": 1

        }}

    ])

    stored = list(stored)

    logger.info(f"num Stored objects: {len(stored)}")

    assert len(stored) == 1

    stored = stored[0]

    logger.info(f"stored object: {stored}")

    # look for conversations which have used an `int` sender_id in the past

    # and update them.

    if not stored and sender_id.isdigit():

        from pymongo import ReturnDocument

        stored = self.conversations.find_one_and_update(

            {"sender_id": int(sender_id)},

            {"$set": {"sender_id": str(sender_id)}},

            return_document=ReturnDocument.AFTER,

        )

    if not stored:

        return None

    if stored is not None:

        if self.domain:

            events = DialogueStateTracker.from_dict(

                sender_id, stored.get("events"), self.domain.slots

            )

        else:

            logger.warning("No domain set, returning None")

            return None

    else:

        logger.info("No info in tracker store for this user")

        return None

    return events

`

I make the logging of stored object and the events that are going to be saved into the DB by the bot.

Here are my logs: rasa_core.log (5.5 KB)

So as we can see in the logs, these events are coming through the save method, but don’t really get saved inside the conversations collection.

Could someone please tell me why the events don’t get saved inside the collection? Do I miss something? Or may it be possible not to fetch and save the whole conversation history inside the python object, in some other way than I try to do that?

Only pulling events with certain timestamp might potentially disrupt your conversations of your users. The session approach which is in the official code should be robust and working. The methods which retrieve the entire list of events shouldn’t be called unless you’re using specific server endpoints.

1 Like

Hello @YanBull , were you able to solve the problem? I am also using MongoDB as my custom tracker store and for me too there are no events that are getting stored.

If you have solved it, can you please share the problem and the solution?

Thanks! Akshay