Skip to content

declearn.aggregator.AveragingAggregator

Bases: Aggregator[ModelUpdates]

Average-based-aggregation Aggregator subclass.

This class implements local updates' averaging, with optional client-based and/or number-of-training-steps-based weighting.

It may therefore be used to implement FedAvg and derivatives that use simple weighting schemes.

Source code in declearn/aggregator/_avg.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class AveragingAggregator(Aggregator[ModelUpdates]):
    """Average-based-aggregation Aggregator subclass.

    This class implements local updates' averaging, with optional
    client-based and/or number-of-training-steps-based weighting.

    It may therefore be used to implement FedAvg and derivatives
    that use simple weighting schemes.
    """

    name = "averaging"

    def __init__(
        self,
        steps_weighted: bool = True,
        client_weights: Optional[Dict[str, float]] = None,
    ) -> None:
        """Instantiate an averaging aggregator.

        Parameters
        ----------
        steps_weighted:
            Whether to conduct a weighted averaging of local model
            updates based on local numbers of training steps.
        client_weights:
            DEPRECATED - this argument no longer affects computations,
            save when using the deprecated 'aggregate' method.
            Optional dict of client-wise base weights to use.
            If None, homogeneous base weights are used.

        Notes
        -----
        * One may specify `client_weights` and use `steps_weighted=True`.
          In that case, the product of the client's base weight and their
          number of training steps taken will be used (and unit-normed).
        * One may use incomplete `client_weights`. In that case, unknown-
          clients' base weights will be set to 1.
        """
        self.steps_weighted = steps_weighted
        self.client_weights = client_weights or {}
        if client_weights:  # pragma: no cover
            warnings.warn(
                f"'client_weights' argument to '{self.__class__.__name__}' was"
                " deprecated in DecLearn v2.4 and is no longer used, saved by"
                " the deprecated 'aggregate' method. It will be removed in"
                " DecLearn v2.6 and/or v3.0.",
                DeprecationWarning,
            )

    def get_config(
        self,
    ) -> Dict[str, Any]:
        return {
            "steps_weighted": self.steps_weighted,
            "client_weights": self.client_weights,
        }

    def prepare_for_sharing(
        self,
        updates: Vector,
        n_steps: int,
    ) -> ModelUpdates:
        if self.steps_weighted:
            updates = updates * n_steps
            weights = n_steps
        else:
            weights = 1
        return ModelUpdates(updates, weights)

    def finalize_updates(
        self,
        updates: ModelUpdates,
    ) -> Vector:
        return updates.updates / updates.weights

    def aggregate(
        self,
        updates: Dict[str, Vector],
        n_steps: Dict[str, int],
    ) -> Vector:
        # Make use of 'client_weights' as part of this DEPRECATED method.
        with warnings.catch_warnings():
            warnings.simplefilter(action="ignore", category=DeprecationWarning)
            weights = self.compute_client_weights(updates, n_steps)
        steps_weighted = self.steps_weighted
        try:
            self.steps_weighted = True
            return super().aggregate(updates, weights)  # type: ignore
        finally:
            self.steps_weighted = steps_weighted

    def compute_client_weights(
        self,
        updates: Dict[str, Vector],
        n_steps: Dict[str, int],
    ) -> Dict[str, float]:
        """Compute weights to use when averaging a given set of updates.

        This method is DEPRECATED as of DecLearn v2.4.
        It will be removed in DecLearn 2.6 and/or 3.0.

        Parameters
        ----------
        updates: dict[str, Vector]
            Client-wise updates, as a dictionary with clients' names as
            string keys and updates as Vector values.
        n_steps: dict[str, int]
            Client-wise number of local training steps performed during
            the training round having produced the updates.

        Returns
        -------
        weights: dict[str, float]
            Client-wise updates-averaging weights, suited to the input
            parameters and normalized so that they sum to 1.
        """
        warnings.warn(
            f"'{self.__class__.__name__}.compute_client_weights' was"
            " deprecated in DecLearn v2.4. It will be removed in DecLearn"
            " v2.6 and/or v3.0.",
            DeprecationWarning,
        )
        if self.steps_weighted:
            weights = {
                client: steps * self.client_weights.get(client, 1.0)
                for client, steps in n_steps.items()
            }
        else:
            weights = {
                client: self.client_weights.get(client, 1.0)
                for client in updates
            }
        total = sum(weights.values())
        return {client: weight / total for client, weight in weights.items()}

__init__(steps_weighted=True, client_weights=None)

Instantiate an averaging aggregator.

Parameters:

Name Type Description Default
steps_weighted bool

Whether to conduct a weighted averaging of local model updates based on local numbers of training steps.

True
client_weights Optional[Dict[str, float]]

DEPRECATED - this argument no longer affects computations, save when using the deprecated 'aggregate' method. Optional dict of client-wise base weights to use. If None, homogeneous base weights are used.

None

Notes

  • One may specify client_weights and use steps_weighted=True. In that case, the product of the client's base weight and their number of training steps taken will be used (and unit-normed).
  • One may use incomplete client_weights. In that case, unknown- clients' base weights will be set to 1.
Source code in declearn/aggregator/_avg.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self,
    steps_weighted: bool = True,
    client_weights: Optional[Dict[str, float]] = None,
) -> None:
    """Instantiate an averaging aggregator.

    Parameters
    ----------
    steps_weighted:
        Whether to conduct a weighted averaging of local model
        updates based on local numbers of training steps.
    client_weights:
        DEPRECATED - this argument no longer affects computations,
        save when using the deprecated 'aggregate' method.
        Optional dict of client-wise base weights to use.
        If None, homogeneous base weights are used.

    Notes
    -----
    * One may specify `client_weights` and use `steps_weighted=True`.
      In that case, the product of the client's base weight and their
      number of training steps taken will be used (and unit-normed).
    * One may use incomplete `client_weights`. In that case, unknown-
      clients' base weights will be set to 1.
    """
    self.steps_weighted = steps_weighted
    self.client_weights = client_weights or {}
    if client_weights:  # pragma: no cover
        warnings.warn(
            f"'client_weights' argument to '{self.__class__.__name__}' was"
            " deprecated in DecLearn v2.4 and is no longer used, saved by"
            " the deprecated 'aggregate' method. It will be removed in"
            " DecLearn v2.6 and/or v3.0.",
            DeprecationWarning,
        )

compute_client_weights(updates, n_steps)

Compute weights to use when averaging a given set of updates.

This method is DEPRECATED as of DecLearn v2.4. It will be removed in DecLearn 2.6 and/or 3.0.

Parameters:

Name Type Description Default
updates Dict[str, Vector]

Client-wise updates, as a dictionary with clients' names as string keys and updates as Vector values.

required
n_steps Dict[str, int]

Client-wise number of local training steps performed during the training round having produced the updates.

required

Returns:

Name Type Description
weights dict[str, float]

Client-wise updates-averaging weights, suited to the input parameters and normalized so that they sum to 1.

Source code in declearn/aggregator/_avg.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def compute_client_weights(
    self,
    updates: Dict[str, Vector],
    n_steps: Dict[str, int],
) -> Dict[str, float]:
    """Compute weights to use when averaging a given set of updates.

    This method is DEPRECATED as of DecLearn v2.4.
    It will be removed in DecLearn 2.6 and/or 3.0.

    Parameters
    ----------
    updates: dict[str, Vector]
        Client-wise updates, as a dictionary with clients' names as
        string keys and updates as Vector values.
    n_steps: dict[str, int]
        Client-wise number of local training steps performed during
        the training round having produced the updates.

    Returns
    -------
    weights: dict[str, float]
        Client-wise updates-averaging weights, suited to the input
        parameters and normalized so that they sum to 1.
    """
    warnings.warn(
        f"'{self.__class__.__name__}.compute_client_weights' was"
        " deprecated in DecLearn v2.4. It will be removed in DecLearn"
        " v2.6 and/or v3.0.",
        DeprecationWarning,
    )
    if self.steps_weighted:
        weights = {
            client: steps * self.client_weights.get(client, 1.0)
            for client, steps in n_steps.items()
        }
    else:
        weights = {
            client: self.client_weights.get(client, 1.0)
            for client in updates
        }
    total = sum(weights.values())
    return {client: weight / total for client, weight in weights.items()}