From ff5d383497184d7a00d676914b9f1ebbafcc7266 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Fri, 1 Mar 2024 14:39:06 +0100 Subject: [PATCH 01/14] Add initial draft --- examples/custom-mods/.gitignore | 2 + examples/custom-mods/README.md | 67 +++++++++++ examples/custom-mods/client.py | 154 ++++++++++++++++++++++++++ examples/custom-mods/pyproject.toml | 19 ++++ examples/custom-mods/requirements.txt | 7 ++ examples/custom-mods/server.py | 45 ++++++++ examples/custom-mods/task.py | 95 ++++++++++++++++ 7 files changed, 389 insertions(+) create mode 100644 examples/custom-mods/.gitignore create mode 100644 examples/custom-mods/README.md create mode 100644 examples/custom-mods/client.py create mode 100644 examples/custom-mods/pyproject.toml create mode 100644 examples/custom-mods/requirements.txt create mode 100644 examples/custom-mods/server.py create mode 100644 examples/custom-mods/task.py diff --git a/examples/custom-mods/.gitignore b/examples/custom-mods/.gitignore new file mode 100644 index 00000000000..260d28a67c6 --- /dev/null +++ b/examples/custom-mods/.gitignore @@ -0,0 +1,2 @@ +wandb/ +.runs_history/ diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md new file mode 100644 index 00000000000..de1b6fdbb81 --- /dev/null +++ b/examples/custom-mods/README.md @@ -0,0 +1,67 @@ +# Flower App (PyTorch) ๐Ÿงช + +> ๐Ÿงช = This example covers experimental features that might change in future versions of Flower +> Please consult the regular PyTorch code examples ([quickstart](https://github.com/adap/flower/tree/main/examples/quickstart-pytorch), [advanced](https://github.com/adap/flower/tree/main/examples/advanced-pytorch)) to learn how to use Flower with PyTorch. + +The following steps describe how to start a long-running Flower server (SuperLink) and then run a Flower App (consisting of a `ClientApp` and a `ServerApp`). + +## Preconditions + +Let's assume the following project structure: + +```bash +$ tree . +. +โ”œโ”€โ”€ client.py # <-- contains `ClientApp` +โ”œโ”€โ”€ server.py # <-- contains `ServerApp` +โ”œโ”€โ”€ server_workflow.py # <-- contains `ServerApp` with workflow +โ”œโ”€โ”€ server_custom.py # <-- contains `ServerApp` with custom main function +โ”œโ”€โ”€ task.py # <-- task-specific code (model, data) +โ””โ”€โ”€ requirements.txt # <-- dependencies +``` + +## Install dependencies + +```bash +pip install -r requirements.txt +``` + +## Start the long-running Flower server (SuperLink) + +```bash +flower-superlink --insecure +``` + +## Start the long-running Flower client (SuperNode) + +In a new terminal window, start the first long-running Flower client: + +```bash +flower-client-app client:app --insecure +``` + +In yet another new terminal window, start the second long-running Flower client: + +```bash +flower-client-app client:app --insecure +``` + +## Run the Flower App + +With both the long-running server (SuperLink) and two clients (SuperNode) up and running, we can now run the actual Flower App: + +```bash +flower-server-app server:app --insecure +``` + +Or, to try the workflow example, run: + +```bash +flower-server-app server_workflow:app --insecure +``` + +Or, to try the custom server function example, run: + +```bash +flower-server-app server_custom:app --insecure +``` diff --git a/examples/custom-mods/client.py b/examples/custom-mods/client.py new file mode 100644 index 00000000000..f73b54947b9 --- /dev/null +++ b/examples/custom-mods/client.py @@ -0,0 +1,154 @@ +import datetime +import os +import time + +import flwr as fl +import tensorflow as tf +import wandb +from flwr.common import ConfigsRecord +from flwr.client.typing import ClientAppCallable, Mod +from flwr.common.context import Context +from flwr.common.message import Message +from flwr.common.constant import MESSAGE_TYPE_FIT + +from task import ( + Net, + DEVICE, + load_data, + get_parameters, + set_parameters, + train, + test, +) + + +# Load model and data (simple CNN, CIFAR-10) +net = Net().to(DEVICE) +trainloader, testloader = load_data() + + +# Define Flower client +class FlowerClient(fl.client.NumPyClient): + def get_parameters(self, config): + return get_parameters(net) + + def fit(self, parameters, config): + set_parameters(net, parameters) + results = train(net, trainloader, testloader, epochs=1, device=DEVICE) + return get_parameters(net), len(trainloader.dataset), results + + def evaluate(self, parameters, config): + set_parameters(net, parameters) + loss, accuracy = test(net, testloader) + return loss, len(testloader.dataset), {"accuracy": accuracy} + + +def client_fn(cid: str): + return FlowerClient().to_client() + + +def get_wandb_mod(name: str) -> Mod: + def wandb_mod(fwd: Message, context: Context, app: ClientAppCallable) -> Message: + start_time = None + + project_name = name + run_id = fwd.metadata.run_id + round = int(fwd.metadata.group_id) + group_name = f"Workload ID: {run_id}" + + client_id = str(fwd.metadata.dst_node_id) + run_name = f"Client ID: {client_id}" + + time_diff = None + + wandb.init( + project=project_name, + group=group_name, + name=run_name, + id=f"{run_id}{client_id}", + resume="allow", + reinit=True, + ) + + start_time = time.time() + + bwd = app(fwd, context) + + msg_type = bwd.metadata.message_type + + if msg_type == MESSAGE_TYPE_FIT: + + time_diff = time.time() - start_time + + metrics = bwd.content.configs_records + + results_to_log = dict( + metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) + ) + + if time_diff is not None: + results_to_log[f"{msg_type}_time"] = time_diff + + wandb.log(results_to_log, step=int(round), commit=True) + + return bwd + + return wandb_mod + + +def get_tensorboard_mod(logdir) -> Mod: + os.makedirs(logdir, exist_ok=True) + + # To allow multiple runs and group those we will create a subdir + # in the logdir which is named as number of directories in logdir + 1 + run_id = str( + len( + [ + name + for name in os.listdir(logdir) + if os.path.isdir(os.path.join(logdir, name)) + ] + ) + ) + run_id = run_id + "-" + datetime.datetime.now().strftime("%Y%m%dT%H%M%S") + logdir_run = os.path.join(logdir, run_id) + + def tensorboard_mod( + fwd: Message, context: Context, app: ClientAppCallable + ) -> Message: + client_id = str(fwd.metadata.dst_node_id) + + round = int(fwd.metadata.group_id) + + start_time = time.time() + + bwd = app(fwd, context) + + time_diff = time.time() - start_time + + if bwd.metadata.message_type == MESSAGE_TYPE_FIT: + writer = tf.summary.create_file_writer(os.path.join(logdir_run, client_id)) + + metrics = dict( + bwd.content.configs_records.get("fitres.metrics", ConfigsRecord()) + ) + # Write aggregated loss + with writer.as_default(step=round): # pylint: disable=not-context-manager + tf.summary.scalar(f"fit_time", time_diff, step=round) + for metric in metrics: + tf.summary.scalar( + f"{metric}", + metrics[metric], + step=round, + ) + writer.flush() + + return bwd + + return tensorboard_mod + + +# Run via `flower-client-app client:app` +app = fl.client.ClientApp( + client_fn=client_fn, +) diff --git a/examples/custom-mods/pyproject.toml b/examples/custom-mods/pyproject.toml new file mode 100644 index 00000000000..0290323fbb9 --- /dev/null +++ b/examples/custom-mods/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["poetry-core>=1.4.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +name = "app-pytorch" +version = "0.1.0" +description = "Multi-Tenant Federated Learning with Flower and PyTorch" +authors = ["The Flower Authors "] + +[tool.poetry.dependencies] +python = ">=3.8,<3.11" +flwr = { path = "../../", develop = true, extras = ["simulation"] } +torch = "1.13.1" +torchvision = "0.14.1" +tqdm = "4.65.0" +wandb = "0.16.3" +tensorflow-cpu = { version = ">=2.9.1,<2.11.1 || >2.11.1", markers = "platform_machine == \"x86_64\"" } +tensorflow-macos = { version = ">=2.9.1,<2.11.1 || >2.11.1", markers = "sys_platform == \"darwin\" and platform_machine == \"arm64\"" } diff --git a/examples/custom-mods/requirements.txt b/examples/custom-mods/requirements.txt new file mode 100644 index 00000000000..6a201036c36 --- /dev/null +++ b/examples/custom-mods/requirements.txt @@ -0,0 +1,7 @@ +flwr-nightly[rest,simulation]>=1.0, <2.0 +torch==1.13.1 +torchvision==0.14.1 +tqdm==4.65.0 +wandb==0.16.3 +tensorflow-cpu>=2.9.1, != 2.11.1 ; platform_machine == "x86_64" +tensorflow-macos>=2.9.1, != 2.11.1 ; sys_platform == "darwin" and platform_machine == "arm64" diff --git a/examples/custom-mods/server.py b/examples/custom-mods/server.py new file mode 100644 index 00000000000..c2d8a4fe5ee --- /dev/null +++ b/examples/custom-mods/server.py @@ -0,0 +1,45 @@ +from typing import List, Tuple + +import flwr as fl +from flwr.common import Metrics + + +# Define metric aggregation function +def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: + examples = [num_examples for num_examples, _ in metrics] + + # Multiply accuracy of each client by number of examples used + train_losses = [ + num_examples * float(m["train_loss"]) for num_examples, m in metrics + ] + train_accuracies = [ + num_examples * float(m["train_accuracy"]) for num_examples, m in metrics + ] + val_losses = [num_examples * float(m["val_loss"]) for num_examples, m in metrics] + val_accuracies = [ + num_examples * float(m["val_accuracy"]) for num_examples, m in metrics + ] + + # Aggregate and return custom metric (weighted average) + return { + "train_loss": sum(train_losses) / sum(examples), + "train_accuracy": sum(train_accuracies) / sum(examples), + "val_loss": sum(val_losses) / sum(examples), + "val_accuracy": sum(val_accuracies) / sum(examples), + } + + +# Define strategy +strategy = fl.server.strategy.FedAvg( + fraction_fit=1.0, # Select all available clients + fraction_evaluate=0.0, # Disable evaluation + min_available_clients=2, + fit_metrics_aggregation_fn=weighted_average, +) + + +# Run via `flower-server-app server:app` +app = fl.server.ServerApp( + config=fl.server.ServerConfig(num_rounds=3), + strategy=strategy, +) diff --git a/examples/custom-mods/task.py b/examples/custom-mods/task.py new file mode 100644 index 00000000000..276aace885d --- /dev/null +++ b/examples/custom-mods/task.py @@ -0,0 +1,95 @@ +import warnings +from collections import OrderedDict + +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.utils.data import DataLoader +from torchvision.datasets import CIFAR10 +from torchvision.transforms import Compose, Normalize, ToTensor +from tqdm import tqdm + + +warnings.filterwarnings("ignore", category=UserWarning) +DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + +class Net(nn.Module): + """Model (simple CNN adapted from 'PyTorch: A 60 Minute Blitz')""" + + def __init__(self) -> None: + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = x.view(-1, 16 * 5 * 5) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + return self.fc3(x) + + +def train(net, trainloader, valloader, epochs, device): + """Train the model on the training set.""" + print("Starting training...") + net.to(device) # move model to GPU if available + criterion = torch.nn.CrossEntropyLoss().to(device) + optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + net.train() + for _ in range(epochs): + for images, labels in trainloader: + images, labels = images.to(device), labels.to(device) + optimizer.zero_grad() + loss = criterion(net(images), labels) + loss.backward() + optimizer.step() + + train_loss, train_acc = test(net, trainloader) + val_loss, val_acc = test(net, valloader) + + results = { + "train_loss": train_loss, + "train_accuracy": train_acc, + "val_loss": val_loss, + "val_accuracy": val_acc, + } + return results + + +def test(net, testloader): + """Validate the model on the test set.""" + net.to(DEVICE) + criterion = torch.nn.CrossEntropyLoss() + correct, loss = 0, 0.0 + with torch.no_grad(): + for images, labels in tqdm(testloader): + outputs = net(images.to(DEVICE)) + labels = labels.to(DEVICE) + loss += criterion(outputs, labels).item() + correct += (torch.max(outputs.data, 1)[1] == labels).sum().item() + accuracy = correct / len(testloader.dataset) + return loss, accuracy + + +def load_data(): + """Load CIFAR-10 (training and test set).""" + trf = Compose([ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + trainset = CIFAR10("./data", train=True, download=True, transform=trf) + testset = CIFAR10("./data", train=False, download=True, transform=trf) + return DataLoader(trainset, batch_size=32, shuffle=True), DataLoader(testset) + + +def get_parameters(net): + return [val.cpu().numpy() for _, val in net.state_dict().items()] + + +def set_parameters(net, parameters): + params_dict = zip(net.state_dict().keys(), parameters) + state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict}) + net.load_state_dict(state_dict, strict=True) From 1ee8d1e67aabe3c813256d609216ee6d849f19e2 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Fri, 1 Mar 2024 14:39:33 +0100 Subject: [PATCH 02/14] Remove unecessary text in README --- examples/custom-mods/README.md | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index de1b6fdbb81..66ae097df4e 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -14,8 +14,6 @@ $ tree . . โ”œโ”€โ”€ client.py # <-- contains `ClientApp` โ”œโ”€โ”€ server.py # <-- contains `ServerApp` -โ”œโ”€โ”€ server_workflow.py # <-- contains `ServerApp` with workflow -โ”œโ”€โ”€ server_custom.py # <-- contains `ServerApp` with custom main function โ”œโ”€โ”€ task.py # <-- task-specific code (model, data) โ””โ”€โ”€ requirements.txt # <-- dependencies ``` @@ -53,15 +51,3 @@ With both the long-running server (SuperLink) and two clients (SuperNode) up and ```bash flower-server-app server:app --insecure ``` - -Or, to try the workflow example, run: - -```bash -flower-server-app server_workflow:app --insecure -``` - -Or, to try the custom server function example, run: - -```bash -flower-server-app server_custom:app --insecure -``` From dea9d4c5677cf4c88a1f00be321ce3c283fba7c5 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Fri, 1 Mar 2024 17:03:29 +0100 Subject: [PATCH 03/14] Use wandb by default --- examples/custom-mods/client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/custom-mods/client.py b/examples/custom-mods/client.py index f73b54947b9..e8200d585f5 100644 --- a/examples/custom-mods/client.py +++ b/examples/custom-mods/client.py @@ -151,4 +151,6 @@ def tensorboard_mod( # Run via `flower-client-app client:app` app = fl.client.ClientApp( client_fn=client_fn, + mods=[get_wandb_mod("Custom mods example")], + # mods=[get_tensorboard_mod(".runs_history/")], ) From a2dbb08ab9fc5a7c258af5e0107f6fc199dde6c8 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Fri, 1 Mar 2024 17:53:11 +0100 Subject: [PATCH 04/14] Create separate apps --- examples/custom-mods/client.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/examples/custom-mods/client.py b/examples/custom-mods/client.py index e8200d585f5..1e0b5470e31 100644 --- a/examples/custom-mods/client.py +++ b/examples/custom-mods/client.py @@ -148,9 +148,14 @@ def tensorboard_mod( return tensorboard_mod -# Run via `flower-client-app client:app` -app = fl.client.ClientApp( +# Run via `flower-client-app client:wandb_app` +wandb_app = fl.client.ClientApp( client_fn=client_fn, mods=[get_wandb_mod("Custom mods example")], - # mods=[get_tensorboard_mod(".runs_history/")], +) + +# Run via `flower-client-app client:tb_app` +tb_app = fl.client.ClientApp( + client_fn=client_fn, + mods=[get_tensorboard_mod(".runs_history/")], ) From db96c6b1d5f9d64f8cf10a08711b760168ed0eda Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Fri, 1 Mar 2024 17:55:17 +0100 Subject: [PATCH 05/14] Update README --- examples/custom-mods/README.md | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index 66ae097df4e..97f216284ac 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -1,4 +1,4 @@ -# Flower App (PyTorch) ๐Ÿงช +# Using custom mods ๐Ÿงช > ๐Ÿงช = This example covers experimental features that might change in future versions of Flower > Please consult the regular PyTorch code examples ([quickstart](https://github.com/adap/flower/tree/main/examples/quickstart-pytorch), [advanced](https://github.com/adap/flower/tree/main/examples/advanced-pytorch)) to learn how to use Flower with PyTorch. @@ -35,15 +35,31 @@ flower-superlink --insecure In a new terminal window, start the first long-running Flower client: ```bash -flower-client-app client:app --insecure +flower-client-app client:wandb_app --insecure ``` +For Weights and Biases, or, + +```bash +flower-client-app client:tb_app --insecure +``` + +for Tensorboard. + In yet another new terminal window, start the second long-running Flower client: ```bash -flower-client-app client:app --insecure +flower-client-app client:wandb_app --insecure ``` +For Weights and Biases, or, + +```bash +flower-client-app client:tb_app --insecure +``` + +for Tensorboard. + ## Run the Flower App With both the long-running server (SuperLink) and two clients (SuperNode) up and running, we can now run the actual Flower App: From 843ea03e9e288360f3d89de2507a5ce3974189e1 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Mon, 4 Mar 2024 22:26:54 +0100 Subject: [PATCH 06/14] Add documentation --- examples/custom-mods/README.md | 303 +++++++++++++++++++++++++++++++-- examples/custom-mods/client.py | 43 ++--- 2 files changed, 300 insertions(+), 46 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index 97f216284ac..1d0012a2aa8 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -3,9 +3,274 @@ > ๐Ÿงช = This example covers experimental features that might change in future versions of Flower > Please consult the regular PyTorch code examples ([quickstart](https://github.com/adap/flower/tree/main/examples/quickstart-pytorch), [advanced](https://github.com/adap/flower/tree/main/examples/advanced-pytorch)) to learn how to use Flower with PyTorch. -The following steps describe how to start a long-running Flower server (SuperLink) and then run a Flower App (consisting of a `ClientApp` and a `ServerApp`). +The following steps describe how to write custom Flower Mods and use them in a simple example. -## Preconditions +## Writing custom Flower Mods + +### Flower Mods basics + +As described [here](https://flower.ai/docs/framework/how-to-use-built-in-mods.html#what-are-mods), Flower Mods in their simplest form can be described as: + +```python +def basic_mod(msg: Message, context: Context, app: ClientApp) -> Message: + # Do something with incoming Message (or Context) + # before passing to the inner ``ClientApp`` + msg = app(msg, context) + # Do something with outgoing Message (or Context) + # before returning + return msg +``` + +and used when defining the `ClientApp`: + +```python +app = fl.client.ClientApp( + client_fn=client_fn, + mods=[basic_mod], +) +``` + +Note that in this specific case, this mod won't modify anything, and perform FL as usual. + +### WandB Flower Mod + +If we want to write a mod to monitor our client-side training using [Weights & Biases](https://github.com/wandb/wandb), we can follow the steps below. + +First, we need to initialize our W&B project with the correct parameters: + +```python +wandb.init( + project=..., + group=..., + name=..., + id=..., + resume="allow", + reinit=True, +) +``` + +In our case, the group should be the `run_id`, specific to a `ServerApp` run, and the `name` should be the `node_id`. This will make it easy to navigate our W&B project, as for each run we will be able to see the computed results as a whole or for each individual client. + +The `id` needs to be unique, so it will be a combinaison of `run_id` and `node_id`. + +In the end we have: + +```python +def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: + run_id = msg.metadata.run_id + group_name = f"Workload ID: {run_id}" + + client_id = str(msg.metadata.dst_node_id) + run_name = f"Client ID: {client_id}" + + wandb.init( + project="Mod Name", + group=group_name, + name=run_name, + id=f"{run_id}{client_id}", + resume="allow", + reinit=True, + ) +``` + +Now, before the message is processed by the server, we will store the starting time and the round number, in order to compute the time it took the client to perform its fit step. + +```python +round = int(msg.metadata.group_id) +start_time = time.time() +``` + +And then, we can send the message to the client: + +```python +bwd = app(msg, context) +``` + +And now, we the message we got back, we can gather our metrics: + +```python +msg_type = bwd.metadata.message_type + +if msg_type == MESSAGE_TYPE_FIT: + + time_diff = time.time() - start_time + + metrics = bwd.content.configs_records + + results_to_log = dict( + metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) + ) + + results_to_log[f"{msg_type}_time"] = time_diff +``` + +Note that we store our metrics in the `results_to_log` variable and that we only initialize this variable when our client is sending back fit results. + +Finally, we can send our results to W&B using: + +```python +wandb.log(results_to_log, step=int(round), commit=True) +``` + +The complete mod becomes: + +```python +def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: + round = int(msg.metadata.group_id) + + run_id = msg.metadata.run_id + group_name = f"Workload ID: {run_id}" + + client_id = str(msg.metadata.dst_node_id) + run_name = f"Client ID: {client_id}" + + wandb.init( + project="Mod Name", + group=group_name, + name=run_name, + id=f"{run_id}{client_id}", + resume="allow", + reinit=True, + ) + + start_time = time.time() + + bwd = app(msg, context) + + msg_type = bwd.metadata.message_type + + if msg_type == MESSAGE_TYPE_FIT: + + time_diff = time.time() - start_time + + metrics = bwd.content.configs_records + + results_to_log = dict( + metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) + ) + + results_to_log[f"{msg_type}_time"] = time_diff + + wandb.log(results_to_log, step=int(round), commit=True) + + return bwd +``` + +And it can be used like: + +```python +app = fl.client.ClientApp( + client_fn=client_fn, + mods=[wandb_mod], +) +``` + +If we want to pass an argument to our mod, we can use a wrapper function: + +```python +def get_wandb_mod(name: str) -> Mod: + def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: + round = int(msg.metadata.group_id) + + run_id = msg.metadata.run_id + group_name = f"Workload ID: {run_id}" + + client_id = str(msg.metadata.dst_node_id) + run_name = f"Client ID: {client_id}" + + wandb.init( + project=name, + group=group_name, + name=run_name, + id=f"{run_id}{client_id}", + resume="allow", + reinit=True, + ) + + start_time = time.time() + + bwd = app(msg, context) + + msg_type = bwd.metadata.message_type + + if msg_type == MESSAGE_TYPE_FIT: + + time_diff = time.time() - start_time + + metrics = bwd.content.configs_records + + results_to_log = dict( + metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) + ) + + results_to_log[f"{msg_type}_time"] = time_diff + + wandb.log(results_to_log, step=int(round), commit=True) + + return bwd + + return wandb_mod +``` + +And use it like: + +```python +app = fl.client.ClientApp( + client_fn=client_fn, + mods=[get_wandb_mod("Custom mods example")], +) +``` + +### Tensorboard Flower Mod + +The [Tensorboard] Mod will only differ in the initialization and how the data is sent to Tensorboard: + +```python +def get_tensorboard_mod(logdir) -> Mod: + os.makedirs(logdir, exist_ok=True) + + def tensorboard_mod( + msg: Message, context: Context, app: ClientAppCallable + ) -> Message: + logdir_run = os.path.join(logdir, msg.metadata.run_id) + + client_id = str(msg.metadata.dst_node_id) + + round = int(msg.metadata.group_id) + + start_time = time.time() + + bwd = app(msg, context) + + time_diff = time.time() - start_time + + if bwd.metadata.message_type == MESSAGE_TYPE_FIT: + writer = tf.summary.create_file_writer(os.path.join(logdir_run, client_id)) + + metrics = dict( + bwd.content.configs_records.get("fitres.metrics", ConfigsRecord()) + ) + # Write aggregated loss + with writer.as_default(step=round): # pylint: disable=not-context-manager + tf.summary.scalar(f"fit_time", time_diff, step=round) + for metric in metrics: + tf.summary.scalar( + f"{metric}", + metrics[metric], + step=round, + ) + writer.flush() + + return bwd + + return tensorboard_mod +``` + +For the initialization, Tensorboard uses a custom directory path, which can, in this case, be passed as an argument to the wrapper function. + +## Running the example + +### Preconditions Let's assume the following project structure: @@ -18,27 +283,29 @@ $ tree . โ””โ”€โ”€ requirements.txt # <-- dependencies ``` -## Install dependencies +### Install dependencies ```bash pip install -r requirements.txt ``` -## Start the long-running Flower server (SuperLink) +For [W&B](wandb.ai) you will also need a valid account. + +### Start the long-running Flower server (SuperLink) ```bash flower-superlink --insecure ``` -## Start the long-running Flower client (SuperNode) +### Start the long-running Flower client (SuperNode) -In a new terminal window, start the first long-running Flower client: +In a new terminal window, start the first long-running Flower client using: ```bash flower-client-app client:wandb_app --insecure ``` -For Weights and Biases, or, +for W&B monitoring, and: ```bash flower-client-app client:tb_app --insecure @@ -46,24 +313,28 @@ flower-client-app client:tb_app --insecure for Tensorboard. -In yet another new terminal window, start the second long-running Flower client: +In yet another new terminal window, start the second long-running Flower client (with the mod of your choice): ```bash -flower-client-app client:wandb_app --insecure +flower-client-app client:{wandb,tb}_app --insecure ``` -For Weights and Biases, or, +### Run the Flower App + +With both the long-running server (SuperLink) and two clients (SuperNode) up and running, we can now run the actual Flower App: ```bash -flower-client-app client:tb_app --insecure +flower-server-app server:app --insecure ``` -for Tensorboard. +### Check the results -## Run the Flower App +For W&B, you will need to login on the [website](wandb.ai). -With both the long-running server (SuperLink) and two clients (SuperNode) up and running, we can now run the actual Flower App: +For Tensorboard, you will need to run the following command in your terminal: -```bash -flower-server-app server:app --insecure +```sh +tensorboard --logdir ``` + +Where `` needs to be replaced by the directory passed as an argument to the wrapper function (`.runs_history/` by default). diff --git a/examples/custom-mods/client.py b/examples/custom-mods/client.py index 1e0b5470e31..15bc0743f31 100644 --- a/examples/custom-mods/client.py +++ b/examples/custom-mods/client.py @@ -48,21 +48,17 @@ def client_fn(cid: str): def get_wandb_mod(name: str) -> Mod: - def wandb_mod(fwd: Message, context: Context, app: ClientAppCallable) -> Message: - start_time = None + def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: + round = int(msg.metadata.group_id) - project_name = name - run_id = fwd.metadata.run_id - round = int(fwd.metadata.group_id) + run_id = msg.metadata.run_id group_name = f"Workload ID: {run_id}" - client_id = str(fwd.metadata.dst_node_id) + client_id = str(msg.metadata.dst_node_id) run_name = f"Client ID: {client_id}" - time_diff = None - wandb.init( - project=project_name, + project=name, group=group_name, name=run_name, id=f"{run_id}{client_id}", @@ -72,7 +68,7 @@ def wandb_mod(fwd: Message, context: Context, app: ClientAppCallable) -> Message start_time = time.time() - bwd = app(fwd, context) + bwd = app(msg, context) msg_type = bwd.metadata.message_type @@ -86,8 +82,7 @@ def wandb_mod(fwd: Message, context: Context, app: ClientAppCallable) -> Message metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) ) - if time_diff is not None: - results_to_log[f"{msg_type}_time"] = time_diff + results_to_log[f"{msg_type}_time"] = time_diff wandb.log(results_to_log, step=int(round), commit=True) @@ -99,30 +94,18 @@ def wandb_mod(fwd: Message, context: Context, app: ClientAppCallable) -> Message def get_tensorboard_mod(logdir) -> Mod: os.makedirs(logdir, exist_ok=True) - # To allow multiple runs and group those we will create a subdir - # in the logdir which is named as number of directories in logdir + 1 - run_id = str( - len( - [ - name - for name in os.listdir(logdir) - if os.path.isdir(os.path.join(logdir, name)) - ] - ) - ) - run_id = run_id + "-" + datetime.datetime.now().strftime("%Y%m%dT%H%M%S") - logdir_run = os.path.join(logdir, run_id) - def tensorboard_mod( - fwd: Message, context: Context, app: ClientAppCallable + msg: Message, context: Context, app: ClientAppCallable ) -> Message: - client_id = str(fwd.metadata.dst_node_id) + logdir_run = os.path.join(logdir, msg.metadata.run_id) + + client_id = str(msg.metadata.dst_node_id) - round = int(fwd.metadata.group_id) + round = int(msg.metadata.group_id) start_time = time.time() - bwd = app(fwd, context) + bwd = app(msg, context) time_diff = time.time() - start_time From ea6f42dab1e8e72fa93b40ca97fbcfe44a0b8fc1 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Mon, 4 Mar 2024 22:33:21 +0100 Subject: [PATCH 07/14] Fix README --- examples/custom-mods/README.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index 1d0012a2aa8..aae9cafc42c 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -223,7 +223,7 @@ app = fl.client.ClientApp( ### Tensorboard Flower Mod -The [Tensorboard] Mod will only differ in the initialization and how the data is sent to Tensorboard: +The [Tensorboard](https://www.tensorflow.org/tensorboard) Mod will only differ in the initialization and how the data is sent to Tensorboard: ```python def get_tensorboard_mod(logdir) -> Mod: @@ -268,6 +268,15 @@ def get_tensorboard_mod(logdir) -> Mod: For the initialization, Tensorboard uses a custom directory path, which can, in this case, be passed as an argument to the wrapper function. +It can be used in the following way: + +```python +app = fl.client.ClientApp( + client_fn=client_fn, + mods=[get_tensorboard_mod(".runs_history/")], +) +``` + ## Running the example ### Preconditions @@ -337,4 +346,4 @@ For Tensorboard, you will need to run the following command in your terminal: tensorboard --logdir ``` -Where `` needs to be replaced by the directory passed as an argument to the wrapper function (`.runs_history/` by default). +Where `` needs to be replaced by the directory passed as an argument to the wrapper function (`.runs_history/` by default). From b1d5f5ec016cff6ff984fd67ab220ce2bf55c7a6 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Tue, 5 Mar 2024 14:19:07 +0100 Subject: [PATCH 08/14] Update examples/custom-mods/README.md --- examples/custom-mods/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index aae9cafc42c..500bc146212 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -51,7 +51,7 @@ wandb.init( In our case, the group should be the `run_id`, specific to a `ServerApp` run, and the `name` should be the `node_id`. This will make it easy to navigate our W&B project, as for each run we will be able to see the computed results as a whole or for each individual client. -The `id` needs to be unique, so it will be a combinaison of `run_id` and `node_id`. +The `id` needs to be unique, so it will be a combination of `run_id` and `node_id`. In the end we have: From 477418cb79c9c218391773c59f15054012e821cb Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 14:50:46 +0100 Subject: [PATCH 09/14] Apply suggestions from code review Co-authored-by: Daniel J. Beutel --- examples/custom-mods/README.md | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index 500bc146212..11344723cf4 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -58,16 +58,16 @@ In the end we have: ```python def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: run_id = msg.metadata.run_id - group_name = f"Workload ID: {run_id}" + group_name = f"Run ID: {run_id}" - client_id = str(msg.metadata.dst_node_id) - run_name = f"Client ID: {client_id}" + node_id = str(msg.metadata.dst_node_id) + run_name = f"Node ID: {node_id}" wandb.init( project="Mod Name", group=group_name, name=run_name, - id=f"{run_id}{client_id}", + id=f"{run_id}{node_id}", resume="allow", reinit=True, ) @@ -83,19 +83,19 @@ start_time = time.time() And then, we can send the message to the client: ```python -bwd = app(msg, context) +reply = app(msg, context) ``` -And now, we the message we got back, we can gather our metrics: +And now, with the message we got back, we can gather our metrics: ```python -msg_type = bwd.metadata.message_type +msg_type = reply.metadata.message_type if msg_type == MESSAGE_TYPE_FIT: time_diff = time.time() - start_time - metrics = bwd.content.configs_records + metrics = reply.content.configs_records results_to_log = dict( metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) @@ -217,13 +217,15 @@ And use it like: ```python app = fl.client.ClientApp( client_fn=client_fn, - mods=[get_wandb_mod("Custom mods example")], + mods=[ + get_wandb_mod("Custom mods example"), + ], ) ``` -### Tensorboard Flower Mod +### TensorBoard Flower Mod -The [Tensorboard](https://www.tensorflow.org/tensorboard) Mod will only differ in the initialization and how the data is sent to Tensorboard: +The [TensorBoard](https://www.tensorflow.org/tensorboard) Mod will only differ in the initialization and how the data is sent to TensorBoard: ```python def get_tensorboard_mod(logdir) -> Mod: From 8bb98c3dccc6f7e683240aef4a16de7ca1544cd8 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 15:08:01 +0100 Subject: [PATCH 10/14] Apply suggestions --- examples/custom-mods/README.md | 101 ++++++++++++++------------------- examples/custom-mods/client.py | 62 ++++++++++---------- 2 files changed, 75 insertions(+), 88 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index 11344723cf4..29102f180cc 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -15,10 +15,10 @@ As described [here](https://flower.ai/docs/framework/how-to-use-built-in-mods.ht def basic_mod(msg: Message, context: Context, app: ClientApp) -> Message: # Do something with incoming Message (or Context) # before passing to the inner ``ClientApp`` - msg = app(msg, context) + reply = app(msg, context) # Do something with outgoing Message (or Context) # before returning - return msg + return reply ``` and used when defining the `ClientApp`: @@ -76,7 +76,7 @@ def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message Now, before the message is processed by the server, we will store the starting time and the round number, in order to compute the time it took the client to perform its fit step. ```python -round = int(msg.metadata.group_id) +server_round = int(msg.metadata.group_id) start_time = time.time() ``` @@ -89,71 +89,62 @@ reply = app(msg, context) And now, with the message we got back, we can gather our metrics: ```python -msg_type = reply.metadata.message_type - -if msg_type == MESSAGE_TYPE_FIT: +if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): time_diff = time.time() - start_time metrics = reply.content.configs_records - results_to_log = dict( - metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) - ) - - results_to_log[f"{msg_type}_time"] = time_diff + results_to_log = dict(metrics.get("fitres.metrics", ConfigsRecord())) + results_to_log["fit_time"] = time_diff ``` -Note that we store our metrics in the `results_to_log` variable and that we only initialize this variable when our client is sending back fit results. +Note that we store our metrics in the `results_to_log` variable and that we only initialize this variable when our client is sending back fit results (with content in it). Finally, we can send our results to W&B using: ```python -wandb.log(results_to_log, step=int(round), commit=True) +wandb.log(results_to_log, step=int(server_round), commit=True) ``` The complete mod becomes: ```python def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: - round = int(msg.metadata.group_id) + server_round = int(msg.metadata.group_id) run_id = msg.metadata.run_id - group_name = f"Workload ID: {run_id}" + group_name = f"Run ID: {run_id}" - client_id = str(msg.metadata.dst_node_id) - run_name = f"Client ID: {client_id}" + node_id = str(msg.metadata.dst_node_id) + run_name = f"Node ID: {node_id}" wandb.init( project="Mod Name", group=group_name, name=run_name, - id=f"{run_id}{client_id}", + id=f"{run_id}{node_id}", resume="allow", reinit=True, ) start_time = time.time() - bwd = app(msg, context) - - msg_type = bwd.metadata.message_type + reply = app(msg, context) - if msg_type == MESSAGE_TYPE_FIT: + if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): time_diff = time.time() - start_time - metrics = bwd.content.configs_records + metrics = reply.content.configs_records - results_to_log = dict( - metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) - ) + results_to_log = dict(metrics.get("fitres.metrics", ConfigsRecord())) - results_to_log[f"{msg_type}_time"] = time_diff + results_to_log["fit_time"] = time_diff - wandb.log(results_to_log, step=int(round), commit=True) + wandb.log(results_to_log, step=int(server_round), commit=True) - return bwd + return reply ``` And it can be used like: @@ -170,44 +161,40 @@ If we want to pass an argument to our mod, we can use a wrapper function: ```python def get_wandb_mod(name: str) -> Mod: def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: - round = int(msg.metadata.group_id) + server_round = int(msg.metadata.group_id) run_id = msg.metadata.run_id - group_name = f"Workload ID: {run_id}" + group_name = f"Run ID: {run_id}" - client_id = str(msg.metadata.dst_node_id) - run_name = f"Client ID: {client_id}" + node_id = str(msg.metadata.dst_node_id) + run_name = f"Node ID: {node_id}" wandb.init( project=name, group=group_name, name=run_name, - id=f"{run_id}{client_id}", + id=f"{run_id}{node_id}", resume="allow", reinit=True, ) start_time = time.time() - bwd = app(msg, context) + reply = app(msg, context) - msg_type = bwd.metadata.message_type - - if msg_type == MESSAGE_TYPE_FIT: + if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): time_diff = time.time() - start_time - metrics = bwd.content.configs_records + metrics = reply.content.configs_records - results_to_log = dict( - metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) - ) + results_to_log = dict(metrics.get("fitres.metrics", ConfigsRecord())) - results_to_log[f"{msg_type}_time"] = time_diff + results_to_log["fit_time"] = time_diff - wandb.log(results_to_log, step=int(round), commit=True) + wandb.log(results_to_log, step=int(server_round), commit=True) - return bwd + return reply return wandb_mod ``` @@ -234,36 +221,36 @@ def get_tensorboard_mod(logdir) -> Mod: def tensorboard_mod( msg: Message, context: Context, app: ClientAppCallable ) -> Message: - logdir_run = os.path.join(logdir, msg.metadata.run_id) + logdir_run = os.path.join(logdir, str(msg.metadata.run_id)) - client_id = str(msg.metadata.dst_node_id) + node_id = str(msg.metadata.dst_node_id) - round = int(msg.metadata.group_id) + server_round = int(msg.metadata.group_id) start_time = time.time() - bwd = app(msg, context) + reply = app(msg, context) time_diff = time.time() - start_time - if bwd.metadata.message_type == MESSAGE_TYPE_FIT: - writer = tf.summary.create_file_writer(os.path.join(logdir_run, client_id)) + if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): + writer = tf.summary.create_file_writer(os.path.join(logdir_run, node_id)) metrics = dict( - bwd.content.configs_records.get("fitres.metrics", ConfigsRecord()) + reply.content.configs_records.get("fitres.metrics", ConfigsRecord()) ) - # Write aggregated loss - with writer.as_default(step=round): # pylint: disable=not-context-manager - tf.summary.scalar(f"fit_time", time_diff, step=round) + + with writer.as_default(step=server_round): + tf.summary.scalar(f"fit_time", time_diff, step=server_round) for metric in metrics: tf.summary.scalar( f"{metric}", metrics[metric], - step=round, + step=server_round, ) writer.flush() - return bwd + return reply return tensorboard_mod ``` diff --git a/examples/custom-mods/client.py b/examples/custom-mods/client.py index 15bc0743f31..45e8acf1ceb 100644 --- a/examples/custom-mods/client.py +++ b/examples/custom-mods/client.py @@ -49,44 +49,40 @@ def client_fn(cid: str): def get_wandb_mod(name: str) -> Mod: def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: - round = int(msg.metadata.group_id) + server_round = int(msg.metadata.group_id) run_id = msg.metadata.run_id - group_name = f"Workload ID: {run_id}" + group_name = f"Run ID: {run_id}" - client_id = str(msg.metadata.dst_node_id) - run_name = f"Client ID: {client_id}" + node_id = str(msg.metadata.dst_node_id) + run_name = f"Node ID: {node_id}" wandb.init( project=name, group=group_name, name=run_name, - id=f"{run_id}{client_id}", + id=f"{run_id}{node_id}", resume="allow", reinit=True, ) start_time = time.time() - bwd = app(msg, context) + reply = app(msg, context) - msg_type = bwd.metadata.message_type - - if msg_type == MESSAGE_TYPE_FIT: + time_diff = time.time() - start_time - time_diff = time.time() - start_time + if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): - metrics = bwd.content.configs_records + metrics = reply.content.configs_records - results_to_log = dict( - metrics.get(f"{msg_type}res.metrics", ConfigsRecord()) - ) + results_to_log = dict(metrics.get("fitres.metrics", ConfigsRecord())) - results_to_log[f"{msg_type}_time"] = time_diff + results_to_log["fit_time"] = time_diff - wandb.log(results_to_log, step=int(round), commit=True) + wandb.log(results_to_log, step=int(server_round), commit=True) - return bwd + return reply return wandb_mod @@ -97,36 +93,36 @@ def get_tensorboard_mod(logdir) -> Mod: def tensorboard_mod( msg: Message, context: Context, app: ClientAppCallable ) -> Message: - logdir_run = os.path.join(logdir, msg.metadata.run_id) + logdir_run = os.path.join(logdir, str(msg.metadata.run_id)) - client_id = str(msg.metadata.dst_node_id) + node_id = str(msg.metadata.dst_node_id) - round = int(msg.metadata.group_id) + server_round = int(msg.metadata.group_id) start_time = time.time() - bwd = app(msg, context) + reply = app(msg, context) time_diff = time.time() - start_time - if bwd.metadata.message_type == MESSAGE_TYPE_FIT: - writer = tf.summary.create_file_writer(os.path.join(logdir_run, client_id)) + if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): + writer = tf.summary.create_file_writer(os.path.join(logdir_run, node_id)) metrics = dict( - bwd.content.configs_records.get("fitres.metrics", ConfigsRecord()) + reply.content.configs_records.get("fitres.metrics", ConfigsRecord()) ) - # Write aggregated loss - with writer.as_default(step=round): # pylint: disable=not-context-manager - tf.summary.scalar(f"fit_time", time_diff, step=round) + + with writer.as_default(step=server_round): + tf.summary.scalar(f"fit_time", time_diff, step=server_round) for metric in metrics: tf.summary.scalar( f"{metric}", metrics[metric], - step=round, + step=server_round, ) writer.flush() - return bwd + return reply return tensorboard_mod @@ -134,11 +130,15 @@ def tensorboard_mod( # Run via `flower-client-app client:wandb_app` wandb_app = fl.client.ClientApp( client_fn=client_fn, - mods=[get_wandb_mod("Custom mods example")], + mods=[ + get_wandb_mod("Custom mods example"), + ], ) # Run via `flower-client-app client:tb_app` tb_app = fl.client.ClientApp( client_fn=client_fn, - mods=[get_tensorboard_mod(".runs_history/")], + mods=[ + get_tensorboard_mod(".runs_history/"), + ], ) From a00c1bb3e3cd026792c49114c6542d6f08a7d902 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 17:11:59 +0100 Subject: [PATCH 11/14] Disable wandb logging --- examples/custom-mods/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/custom-mods/client.py b/examples/custom-mods/client.py index 45e8acf1ceb..024c49dcebc 100644 --- a/examples/custom-mods/client.py +++ b/examples/custom-mods/client.py @@ -1,4 +1,3 @@ -import datetime import os import time @@ -21,6 +20,7 @@ test, ) +os.environ["WANDB_SILENT"] = "true" # Load model and data (simple CNN, CIFAR-10) net = Net().to(DEVICE) From 03d27488ff049ead57bfca66b7d5cf18743b1143 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 17:15:31 +0100 Subject: [PATCH 12/14] Better capitalization --- examples/custom-mods/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index 29102f180cc..d29277e5891 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -255,7 +255,7 @@ def get_tensorboard_mod(logdir) -> Mod: return tensorboard_mod ``` -For the initialization, Tensorboard uses a custom directory path, which can, in this case, be passed as an argument to the wrapper function. +For the initialization, TensorBoard uses a custom directory path, which can, in this case, be passed as an argument to the wrapper function. It can be used in the following way: @@ -309,7 +309,7 @@ for W&B monitoring, and: flower-client-app client:tb_app --insecure ``` -for Tensorboard. +for TensorBoard. In yet another new terminal window, start the second long-running Flower client (with the mod of your choice): @@ -329,7 +329,7 @@ flower-server-app server:app --insecure For W&B, you will need to login on the [website](wandb.ai). -For Tensorboard, you will need to run the following command in your terminal: +For TensorBoard, you will need to run the following command in your terminal: ```sh tensorboard --logdir From ce869dc5b3cf2ff355fc0ea79ff1cb48246d1cec Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 6 Mar 2024 14:09:39 +0100 Subject: [PATCH 13/14] Improve logging and apply suggestions --- examples/custom-mods/README.md | 37 +++++++++---------- examples/custom-mods/client.py | 52 ++++++++++++++++++--------- examples/custom-mods/pyproject.toml | 3 +- examples/custom-mods/requirements.txt | 3 +- 4 files changed, 56 insertions(+), 39 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index d29277e5891..1aea68b824a 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -67,7 +67,7 @@ def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message project="Mod Name", group=group_name, name=run_name, - id=f"{run_id}{node_id}", + id=f"{run_id}_{node_id}", resume="allow", reinit=True, ) @@ -89,7 +89,7 @@ reply = app(msg, context) And now, with the message we got back, we can gather our metrics: ```python -if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): +if reply.metadata.message_type == MessageType.TRAIN and reply.has_content(): time_diff = time.time() - start_time @@ -113,26 +113,27 @@ The complete mod becomes: def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: server_round = int(msg.metadata.group_id) - run_id = msg.metadata.run_id - group_name = f"Run ID: {run_id}" + if reply.metadata.message_type == MessageType.TRAIN and server_round == 1: + run_id = msg.metadata.run_id + group_name = f"Run ID: {run_id}" - node_id = str(msg.metadata.dst_node_id) - run_name = f"Node ID: {node_id}" + node_id = str(msg.metadata.dst_node_id) + run_name = f"Node ID: {node_id}" - wandb.init( - project="Mod Name", - group=group_name, - name=run_name, - id=f"{run_id}{node_id}", - resume="allow", - reinit=True, - ) + wandb.init( + project="Mod Name", + group=group_name, + name=run_name, + id=f"{run_id}_{node_id}", + resume="allow", + reinit=True, + ) start_time = time.time() reply = app(msg, context) - if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): + if reply.metadata.message_type == MessageType.TRAIN and reply.has_content(): time_diff = time.time() - start_time @@ -173,7 +174,7 @@ def get_wandb_mod(name: str) -> Mod: project=name, group=group_name, name=run_name, - id=f"{run_id}{node_id}", + id=f"{run_id}_{node_id}", resume="allow", reinit=True, ) @@ -182,7 +183,7 @@ def get_wandb_mod(name: str) -> Mod: reply = app(msg, context) - if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): + if reply.metadata.message_type == MessageType.TRAIN and reply.has_content(): time_diff = time.time() - start_time @@ -233,7 +234,7 @@ def get_tensorboard_mod(logdir) -> Mod: time_diff = time.time() - start_time - if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): + if reply.metadata.message_type == MessageType.TRAIN and reply.has_content(): writer = tf.summary.create_file_writer(os.path.join(logdir_run, node_id)) metrics = dict( diff --git a/examples/custom-mods/client.py b/examples/custom-mods/client.py index 024c49dcebc..ca7a0d887a3 100644 --- a/examples/custom-mods/client.py +++ b/examples/custom-mods/client.py @@ -1,3 +1,4 @@ +import logging import os import time @@ -8,7 +9,7 @@ from flwr.client.typing import ClientAppCallable, Mod from flwr.common.context import Context from flwr.common.message import Message -from flwr.common.constant import MESSAGE_TYPE_FIT +from flwr.common.constant import MessageType from task import ( Net, @@ -20,7 +21,15 @@ test, ) -os.environ["WANDB_SILENT"] = "true" + +class WBLoggingFilter(logging.Filter): + def filter(self, record): + return ( + "login" in record.getMessage() + or "View project at" in record.getMessage() + or "View run at" in record.getMessage() + ) + # Load model and data (simple CNN, CIFAR-10) net = Net().to(DEVICE) @@ -49,22 +58,26 @@ def client_fn(cid: str): def get_wandb_mod(name: str) -> Mod: def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message: + """Flower Mod that logs the metrics dictionary returned by the client's + fit function to Weights & Biases. + """ server_round = int(msg.metadata.group_id) - run_id = msg.metadata.run_id - group_name = f"Run ID: {run_id}" + if server_round == 1 and msg.metadata.message_type == MessageType.TRAIN: + run_id = msg.metadata.run_id + group_name = f"Run ID: {run_id}" - node_id = str(msg.metadata.dst_node_id) - run_name = f"Node ID: {node_id}" - - wandb.init( - project=name, - group=group_name, - name=run_name, - id=f"{run_id}{node_id}", - resume="allow", - reinit=True, - ) + node_id = str(msg.metadata.dst_node_id) + run_name = f"Node ID: {node_id}" + + wandb.init( + project=name, + group=group_name, + name=run_name, + id=f"{run_id}_{node_id}", + resume="allow", + reinit=True, + ) start_time = time.time() @@ -72,7 +85,8 @@ def wandb_mod(msg: Message, context: Context, app: ClientAppCallable) -> Message time_diff = time.time() - start_time - if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): + # if the `ClientApp` just processed a "fit" message, let's log some metrics to W&B + if reply.metadata.message_type == MessageType.TRAIN and reply.has_content(): metrics = reply.content.configs_records @@ -93,6 +107,9 @@ def get_tensorboard_mod(logdir) -> Mod: def tensorboard_mod( msg: Message, context: Context, app: ClientAppCallable ) -> Message: + """Flower Mod that logs the metrics dictionary returned by the client's + fit function to TensorBoard. + """ logdir_run = os.path.join(logdir, str(msg.metadata.run_id)) node_id = str(msg.metadata.dst_node_id) @@ -105,7 +122,8 @@ def tensorboard_mod( time_diff = time.time() - start_time - if reply.metadata.message_type == MESSAGE_TYPE_FIT and reply.has_content(): + # if the `ClientApp` just processed a "fit" message, let's log some metrics to TensorBoard + if reply.metadata.message_type == MessageType.TRAIN and reply.has_content(): writer = tf.summary.create_file_writer(os.path.join(logdir_run, node_id)) metrics = dict( diff --git a/examples/custom-mods/pyproject.toml b/examples/custom-mods/pyproject.toml index 0290323fbb9..e690e05bab8 100644 --- a/examples/custom-mods/pyproject.toml +++ b/examples/custom-mods/pyproject.toml @@ -11,9 +11,8 @@ authors = ["The Flower Authors "] [tool.poetry.dependencies] python = ">=3.8,<3.11" flwr = { path = "../../", develop = true, extras = ["simulation"] } +tensorboard = "2.16.2" torch = "1.13.1" torchvision = "0.14.1" tqdm = "4.65.0" wandb = "0.16.3" -tensorflow-cpu = { version = ">=2.9.1,<2.11.1 || >2.11.1", markers = "platform_machine == \"x86_64\"" } -tensorflow-macos = { version = ">=2.9.1,<2.11.1 || >2.11.1", markers = "sys_platform == \"darwin\" and platform_machine == \"arm64\"" } diff --git a/examples/custom-mods/requirements.txt b/examples/custom-mods/requirements.txt index 6a201036c36..75b2c1135f1 100644 --- a/examples/custom-mods/requirements.txt +++ b/examples/custom-mods/requirements.txt @@ -1,7 +1,6 @@ flwr-nightly[rest,simulation]>=1.0, <2.0 +tensorboard==2.16.2 torch==1.13.1 torchvision==0.14.1 tqdm==4.65.0 wandb==0.16.3 -tensorflow-cpu>=2.9.1, != 2.11.1 ; platform_machine == "x86_64" -tensorflow-macos>=2.9.1, != 2.11.1 ; sys_platform == "darwin" and platform_machine == "arm64" From 06c20e53a87475e3cac36ce9d7d523ec006459e4 Mon Sep 17 00:00:00 2001 From: Javier Date: Wed, 6 Mar 2024 13:43:19 +0000 Subject: [PATCH 14/14] Apply suggestions from code review --- examples/custom-mods/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/custom-mods/README.md b/examples/custom-mods/README.md index 1aea68b824a..b0ad668c2de 100644 --- a/examples/custom-mods/README.md +++ b/examples/custom-mods/README.md @@ -304,7 +304,7 @@ In a new terminal window, start the first long-running Flower client using: flower-client-app client:wandb_app --insecure ``` -for W&B monitoring, and: +for W&B monitoring, or: ```bash flower-client-app client:tb_app --insecure @@ -328,7 +328,7 @@ flower-server-app server:app --insecure ### Check the results -For W&B, you will need to login on the [website](wandb.ai). +For W&B, you will need to login to the [website](wandb.ai). For TensorBoard, you will need to run the following command in your terminal: