Dask on Ray for DaskGraphRunner: Serialization of GraphNode class

Hello everyone,

I am trying to distribute graph trainer on a distributed Ray multi-node cluster using Dask on Ray as per this Dask-on-Ray Docs.

Replacing line 101 of dask.py with ray_dask_get from ray.util.dask, I get the following error since Ray’s scheduler is now trying to serialize Rasa’s code:

TypeError: Could not serialize the argument <rasa.engine.graph.GraphNode object at 0x7f212f0769d0> for a task or actor ray.util.dask.scheduler.dask_task_wrapper. 
Check https://docs.ray.io/en/master/serialization.html
#troubleshooting for more information.

GraphNode is located in graph.py, but I don’t know how to serialize it and make it suitable for the cluster. The URL in the error basically talks about omitting the attributes of the class using __reduce__ or custom serializers and I am confused which attributes are making the problem here.

Can anybody help? @rctatman

EDIT 1: Found culprit in complete error output: error_output_pickle_dask_on_ray_rasa.txt (4.5 KB)

TypeError: can't pickle sqlalchemy.cprocessors.UnicodeResultProcessor objects

However I still don’t know what uses this object.

2 Likes

I tried to use multiprocessing on Dask for the same kind of problem and the issue is definitely the serialization of SQLAlchemy objects. Multithreading did work but i didn’t see severe performance improvements

If you have noticed, they are using dask in sort of an eager mode on a single thread but i guess this might be something to figure out tbh. I didn’t understand the need of SQL objects to save GraphNode

1 Like

Hi @souvikg10 ,

Did you force your training runs when running multi-threaded? When doing that, for a complete training I noticed about 6 to 7s of improvement.

Since single-threaded dask.get scheduler is used, there was no need for serializing the SQLAlchemy objects and Rasa did not run into this sort of problem like I did. I would also like to figure out in which use case the database is used here. If I can see how this connection takes place, I could just write custom serializer for that connection.

Quick repository search yields that SQLAlchemy is used in Broker, Tracker Store and Caching. The last one certainly has to do with the model training, right? Even though I am forcing the training, which should omit the cache completely, I get the above error.

I think the caching still creates SQL objects. what you can do is make the cache size env var as 0 and try. I think this file is definitely creating the cache object even if you force retraining. so every Node in the GraphPipeline becomes sort of an SQL object. Can you check if you disable cache using the env var as mentioned below, does it still creates a database?

1 Like

Hi @souvikg10 ,

still the same error. Exporting RASA_MAX_CACHE_SIZE and running RASA_MAX_CACHE_SIZE=0 rasa train I get the same error.

Do you still have a .rasa folder ? Remove it first and then try perhaps.

For me in order to avoid creating cache this worked but I am not sure whether this is enough to remove all Sql objects. My likely culprit is the caching which creates a db and keeps all the fingerprints.

Could you also print the type of each schema node or the contents of the schema node?

Removed the .rasa cache folder, nothing changed. Here’s a complete output of GraphSchema: output_graph_schema.txt (7.6 KB)

Interesting maybe the core policies interacts with the trackers which are likely saved in the DB but I would be surprised if this happens while training. I see no other reason why an SQL object is serialised and required between nodes in a DAG.

I will try to take a look as well next week. You just made it very interesting :smile: I tried last month but also was quite confused why Sql objects are used between the processes.

1 Like

Thanks @souvikg10 I need every help I can get.

Using Python debugger in VSCode with this configuration, it raises the exceptions on line 388 of _raylet.pyx file.

Okay, running inspect_serializibility separately on each attribute of the GraphNode class, it appears that LoggingHook and TrainingHook are non-serializable. The following output was produced by the inspect function:

================================================================================
Checking Serializability of [<rasa.engine.training.hooks.LoggingHook object at 0x7f8e50571ed0>, <rasa.engine.training.hooks.TrainingHook object at 0x7f8e50505d50>]
================================================================================
!!! FAIL serialization: can't pickle sqlalchemy.cprocessors.UnicodeResultProcessor objects
WARNING: Did not find non-serializable object in [<rasa.engine.training.hooks.LoggingHook object at 0x7f8e50571ed0>, <rasa.engine.training.hooks.TrainingHook object at 0x7f8e50505d50>]. This may be an oversight.
================================================================================
Variable: 

        FailTuple(test [obj=[<rasa.engine.training.hooks.LoggingHook object at 0x7f8e50571ed0>, <rasa.engine.training.hooks.TrainingHook object at 0x7f8e50505d50>], parent=None])

was found to be non-serializable. There may be multiple other undetected variables that were non-serializable. 
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class. 
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
================================================================================

Is it safe to remove some of these hooks?

Edit 1: So the hooks are used to cache fingerprints of nodes during model training according to Py doc of TrainingHook class, but why this runs even though I disabled the cache by running RASA_MAX_CACHE_SIZE=0 rasa train Setting hooks to an empty list gave me a bunch of “mandatory parameter missing”.

If you are using a virtualenv you can navigate to the caching.py and set it to Rasa max cache” = 0 in the code itself instead of setting an env var and see what happens.

Hi,

I have test printed it to the console right after reading the environment variable in code and it showed 0.0, so it was really set to 0.

To separate the normal training from distributed one, I have copied each function or class responsible for training and added ‘_dist’ identifier as a suffix. I also copied the DaskGraphRunner into it’s own DaskOnRayGraphRunner class with only difference being the scheduler - ray_dask_get instead of dask.get.

This is the code for serializing and deserializing TrainingHook class - the one which is unserializable by default because of cache attribute:

In hooks.py I added the following to omit the cache attribute:

def serializer_training_hook(a):
    return (None, a._model_storage, a._pruned_schema) # None is for cache

def deserializer_training_hook(a):
    return TrainingHook(a)

and then registered these functions in the constructor of DaskOnRayGraphRunner:

        ray.util.register_serializer(TrainingHook, serializer=serializer_training_hook, deserializer=deserializer_training_hook)

Then I get these errors when running:

(dask:schema_validator pid=1954152) 2022-01-28 18:35:25,967     ERROR serialization.py:283 -- __init__() missing 2 required positional arguments: 'model_storage' and 'pruned_schema'
...
(dask:schema_validator pid=1954152) TypeError: __init__() missing 2 required positional arguments: 'model_storage' and 'pruned_schema'

Complete error output: complete_error_output_28_02_2022.txt (32.2 KB)

Finally managed to make it work by passing None instead of hooks, therefore disabling hooks in dask.py / dask_on_ray.py in my case:

self._instantiated_nodes: Dict[Text, GraphNode] = self._instantiate_nodes(
            graph_schema, model_storage, execution_context, None # <--- this none here
        )

and commenting out line 102 of generator.py:

# assert domain == self.domain

which checks if the tracker is used on the same domain.

I can now run the training on the Ray cluster!

My average time without using Ray and going with sequential but forced complete training with vanilla DaskGraphRunner was ~33 seconds, but with Dask on Ray it is about ~24 seconds, almost the same as threaded Dask, but since this is on a cluster, multiple machines can add their vCPU resources to the Ray cluster and support training.

My CPU has 16 threads in total and observing the htop while training I can see that all threads are busy >60%.

I have also checked if removing hooks in the normal training has any meaningful influence on the training time and apparently it does not. For normal training it makes no difference when None is passed instead of hooks.

Downside is no cache and no hooks in general but this can be a task for another time in the future. Thanks @souvikg10 for suggestions. :smiley:

1 Like