Skip to content

declearn.aggregator.Aggregator

Bases: Generic[ModelUpdatesT]

Abstract class defining an API for model updates aggregation.

The aim of this abstraction (which itself operates on the Vector abstraction, so as to provide framework-agnostic algorithms) is to enable implementing arbitrary aggregation rules to turn local model updates into global updates in a federated or decentralized learning context.

An Aggregator has three main purposes:

  • Preparing and packaging data that is to be shared with peers based on local model updates into a ModelUpdates container that implements aggregation, usually via summation.
  • Finalizing such a data container into model updates.

Abstract

The following class attributes and methods must be implemented by any non-abstract child class of Aggregator:

  • name: str class attribute Name identifier of the class (should be unique across Aggregator classes). Also used for automatic type-registration of the class (see Inheritance section below).
  • prepare_for_sharing(updates: Vector, n_steps: int) -> ModelUpdates: Pre-process and package local model updates for aggregation.
  • finalize_updates(updates: ModelUpdates): Finalize pre-aggregated data into global model updates.

Overridable

  • updates_cls: type[ModelUpdates] class attribute Type of 'ModelUpdates' data structure used by this Aggregator class.
  • get_config() -> Dict[str, Any]: Return a JSON-serializable configuration dict of an instance.
  • from_config(Dict[str, Any]) -> Aggregator: Classmethod to instantiate an Aggregator from a config dict.

Inheritance

When a subclass inheriting from Aggregator is declared, it is automatically registered under the "Aggregator" group using its class-attribute name. This can be prevented by adding register=False to the inheritance specs (e.g. class MyCls(Aggregator, register=False)). See declearn.utils.register_type for details on types registration.

Source code in declearn/aggregator/_api.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@create_types_registry
class Aggregator(Generic[ModelUpdatesT], metaclass=abc.ABCMeta):
    """Abstract class defining an API for model updates aggregation.

    The aim of this abstraction (which itself operates on the Vector
    abstraction, so as to provide framework-agnostic algorithms) is
    to enable implementing arbitrary aggregation rules to turn local
    model updates into global updates in a federated or decentralized
    learning context.

    An Aggregator has three main purposes:

    - Preparing and packaging data that is to be shared with peers
      based on local model updates into a `ModelUpdates` container
      that implements aggregation, usually via summation.
    - Finalizing such a data container into model updates.

    Abstract
    --------
    The following class attributes and methods must be implemented
    by any non-abstract child class of `Aggregator`:

    - name: str class attribute
        Name identifier of the class (should be unique across Aggregator
        classes). Also used for automatic type-registration of the class
        (see `Inheritance` section below).
    - prepare_for_sharing(updates: Vector, n_steps: int) -> ModelUpdates:
        Pre-process and package local model updates for aggregation.
    - finalize_updates(updates: ModelUpdates):
        Finalize pre-aggregated data into global model updates.

    Overridable
    -----------
    - updates_cls: type[ModelUpdates] class attribute
        Type of 'ModelUpdates' data structure used by this Aggregator class.
    - get_config() -> Dict[str, Any]:
        Return a JSON-serializable configuration dict of an instance.
    - from_config(Dict[str, Any]) -> Aggregator:
        Classmethod to instantiate an Aggregator from a config dict.

    Inheritance
    -----------
    When a subclass inheriting from `Aggregator` is declared, it is
    automatically registered under the "Aggregator" group using its
    class-attribute `name`. This can be prevented by adding `register=False`
    to the inheritance specs (e.g. `class MyCls(Aggregator, register=False)`).
    See `declearn.utils.register_type` for details on types registration.
    """

    name: ClassVar[str]
    """Name identifier of the class, unique across Aggregator classes."""

    updates_cls: ClassVar[Type[ModelUpdates]] = ModelUpdates
    """Type of 'ModelUpdates' data structure used by this Aggregator class."""

    def __init_subclass__(
        cls,
        register: bool = True,
        **kwargs: Any,
    ) -> None:
        """Automatically type-register Aggregator subclasses."""
        super().__init_subclass__(**kwargs)
        if register:
            register_type(cls, cls.name, group="Aggregator")

    @abc.abstractmethod
    def prepare_for_sharing(
        self,
        updates: Vector,
        n_steps: int,  # revise: generalize kwargs?
    ) -> ModelUpdatesT:
        """Pre-process and package local model updates for aggregation.

        Parameters
        ----------
        updates:
            Local model updates, as a Vector value.
        n_steps:
            Number of local training steps taken to produce `updates`.

        Returns
        -------
        updates:
            Data to be shared with peers, wrapped into a `ModelUpdates`
            (subclass) instance.
        """

    @abc.abstractmethod
    def finalize_updates(
        self,
        updates: ModelUpdatesT,
    ) -> Vector:
        """Finalize pre-aggregated data into global model updates.

        Parameters
        ----------
        updates:
            `ModelUpdates` instance holding aggregated data to finalize,
            resulting from peers' shared instances' sum-aggregation.
        """

    def get_config(
        self,
    ) -> Dict[str, Any]:
        """Return a JSON-serializable dict with this object's parameters."""
        return {}  # pragma: no cover

    @classmethod
    def from_config(
        cls,
        config: Dict[str, Any],
    ) -> Self:
        """Instantiate an Aggregator from its configuration dict."""
        return cls(**config)

    def aggregate(
        self,
        updates: Dict[str, Vector[T]],
        n_steps: Dict[str, int],  # revise: abstract~generalize kwargs use
    ) -> Vector[T]:
        """DEPRECATED - Aggregate input vectors into a single one.

        Parameters
        ----------
        updates: dict[str, Vector]
            Client-wise updates, as a dictionary with clients' names as
            string keys and updates as Vector values.
        n_steps: dict[str, int]
            Client-wise number of local training steps performed during
            the training round having produced the updates.

        Returns
        -------
        gradients: Vector
            Aggregated updates, as a Vector - treated as gradients by
            the server-side optimizer.

        Raises
        ------
        TypeError
            If the input `updates` are an empty dict.
        """
        warnings.warn(
            "'Aggregator.aggregate' was deprecated in DecLearn v2.4 in favor "
            "of new API methods. It will be removed in DecLearn v2.6 and/or "
            "v3.0.",
            DeprecationWarning,
        )
        if not updates:
            raise TypeError("'Aggregator.aggregate' received an empty dict.")
        partials = [
            self.prepare_for_sharing(updates[client], n_steps[client])
            for client in updates
        ]
        aggregated = sum(partials[1:], start=partials[0])
        return self.finalize_updates(aggregated)

name: ClassVar[str] class-attribute

Name identifier of the class, unique across Aggregator classes.

updates_cls: ClassVar[Type[ModelUpdates]] = ModelUpdates class-attribute

Type of 'ModelUpdates' data structure used by this Aggregator class.

__init_subclass__(register=True, **kwargs)

Automatically type-register Aggregator subclasses.

Source code in declearn/aggregator/_api.py
128
129
130
131
132
133
134
135
136
def __init_subclass__(
    cls,
    register: bool = True,
    **kwargs: Any,
) -> None:
    """Automatically type-register Aggregator subclasses."""
    super().__init_subclass__(**kwargs)
    if register:
        register_type(cls, cls.name, group="Aggregator")

aggregate(updates, n_steps)

DEPRECATED - Aggregate input vectors into a single one.

Parameters:

Name Type Description Default
updates Dict[str, Vector[T]]

Client-wise updates, as a dictionary with clients' names as string keys and updates as Vector values.

required
n_steps Dict[str, int]

Client-wise number of local training steps performed during the training round having produced the updates.

required

Returns:

Name Type Description
gradients Vector

Aggregated updates, as a Vector - treated as gradients by the server-side optimizer.

Raises:

Type Description
TypeError

If the input updates are an empty dict.

Source code in declearn/aggregator/_api.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def aggregate(
    self,
    updates: Dict[str, Vector[T]],
    n_steps: Dict[str, int],  # revise: abstract~generalize kwargs use
) -> Vector[T]:
    """DEPRECATED - Aggregate input vectors into a single one.

    Parameters
    ----------
    updates: dict[str, Vector]
        Client-wise updates, as a dictionary with clients' names as
        string keys and updates as Vector values.
    n_steps: dict[str, int]
        Client-wise number of local training steps performed during
        the training round having produced the updates.

    Returns
    -------
    gradients: Vector
        Aggregated updates, as a Vector - treated as gradients by
        the server-side optimizer.

    Raises
    ------
    TypeError
        If the input `updates` are an empty dict.
    """
    warnings.warn(
        "'Aggregator.aggregate' was deprecated in DecLearn v2.4 in favor "
        "of new API methods. It will be removed in DecLearn v2.6 and/or "
        "v3.0.",
        DeprecationWarning,
    )
    if not updates:
        raise TypeError("'Aggregator.aggregate' received an empty dict.")
    partials = [
        self.prepare_for_sharing(updates[client], n_steps[client])
        for client in updates
    ]
    aggregated = sum(partials[1:], start=partials[0])
    return self.finalize_updates(aggregated)

finalize_updates(updates) abstractmethod

Finalize pre-aggregated data into global model updates.

Parameters:

Name Type Description Default
updates ModelUpdatesT

ModelUpdates instance holding aggregated data to finalize, resulting from peers' shared instances' sum-aggregation.

required
Source code in declearn/aggregator/_api.py
160
161
162
163
164
165
166
167
168
169
170
171
172
@abc.abstractmethod
def finalize_updates(
    self,
    updates: ModelUpdatesT,
) -> Vector:
    """Finalize pre-aggregated data into global model updates.

    Parameters
    ----------
    updates:
        `ModelUpdates` instance holding aggregated data to finalize,
        resulting from peers' shared instances' sum-aggregation.
    """

from_config(config) classmethod

Instantiate an Aggregator from its configuration dict.

Source code in declearn/aggregator/_api.py
180
181
182
183
184
185
186
@classmethod
def from_config(
    cls,
    config: Dict[str, Any],
) -> Self:
    """Instantiate an Aggregator from its configuration dict."""
    return cls(**config)

get_config()

Return a JSON-serializable dict with this object's parameters.

Source code in declearn/aggregator/_api.py
174
175
176
177
178
def get_config(
    self,
) -> Dict[str, Any]:
    """Return a JSON-serializable dict with this object's parameters."""
    return {}  # pragma: no cover

prepare_for_sharing(updates, n_steps) abstractmethod

Pre-process and package local model updates for aggregation.

Parameters:

Name Type Description Default
updates Vector

Local model updates, as a Vector value.

required
n_steps int

Number of local training steps taken to produce updates.

required

Returns:

Name Type Description
updates ModelUpdatesT

Data to be shared with peers, wrapped into a ModelUpdates (subclass) instance.

Source code in declearn/aggregator/_api.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@abc.abstractmethod
def prepare_for_sharing(
    self,
    updates: Vector,
    n_steps: int,  # revise: generalize kwargs?
) -> ModelUpdatesT:
    """Pre-process and package local model updates for aggregation.

    Parameters
    ----------
    updates:
        Local model updates, as a Vector value.
    n_steps:
        Number of local training steps taken to produce `updates`.

    Returns
    -------
    updates:
        Data to be shared with peers, wrapped into a `ModelUpdates`
        (subclass) instance.
    """