Shortcuts

Source code for dalib.adaptation.mdd

"""
@author: Junguang Jiang
@contact: JiangJunguang1123@outlook.com
"""
from typing import Optional, List, Dict, Tuple, Callable
import torch.nn as nn
import torch.nn.functional as F
import torch

from dalib.modules.grl import WarmStartGradientReverseLayer


[docs]class MarginDisparityDiscrepancy(nn.Module): r"""The margin disparity discrepancy (MDD) proposed in `Bridging Theory and Algorithm for Domain Adaptation (ICML 2019) <https://arxiv.org/abs/1904.05801>`_. MDD can measure the distribution discrepancy in domain adaptation. The :math:`y^s` and :math:`y^t` are logits output by the main head on the source and target domain respectively. The :math:`y_{adv}^s` and :math:`y_{adv}^t` are logits output by the adversarial head. The definition can be described as: .. math:: \mathcal{D}_{\gamma}(\hat{\mathcal{S}}, \hat{\mathcal{T}}) = -\gamma \mathbb{E}_{y^s, y_{adv}^s \sim\hat{\mathcal{S}}} L_s (y^s, y_{adv}^s) + \mathbb{E}_{y^t, y_{adv}^t \sim\hat{\mathcal{T}}} L_t (y^t, y_{adv}^t), where :math:`\gamma` is a margin hyper-parameter, :math:`L_s` refers to the disparity function defined on the source domain and :math:`L_t` refers to the disparity function defined on the target domain. Args: source_disparity (callable): The disparity function defined on the source domain, :math:`L_s`. target_disparity (callable): The disparity function defined on the target domain, :math:`L_t`. margin (float): margin :math:`\gamma`. Default: 4 reduction (str, optional): Specifies the reduction to apply to the output: ``'none'`` | ``'mean'`` | ``'sum'``. ``'none'``: no reduction will be applied, ``'mean'``: the sum of the output will be divided by the number of elements in the output, ``'sum'``: the output will be summed. Default: ``'mean'`` Inputs: - y_s: output :math:`y^s` by the main head on the source domain - y_s_adv: output :math:`y^s` by the adversarial head on the source domain - y_t: output :math:`y^t` by the main head on the target domain - y_t_adv: output :math:`y_{adv}^t` by the adversarial head on the target domain - w_s (optional): instance weights for source domain - w_t (optional): instance weights for target domain Examples:: >>> num_outputs = 2 >>> batch_size = 10 >>> loss = MarginDisparityDiscrepancy(margin=4., source_disparity=F.l1_loss, target_disparity=F.l1_loss) >>> # output from source domain and target domain >>> y_s, y_t = torch.randn(batch_size, num_outputs), torch.randn(batch_size, num_outputs) >>> # adversarial output from source domain and target domain >>> y_s_adv, y_t_adv = torch.randn(batch_size, num_outputs), torch.randn(batch_size, num_outputs) >>> output = loss(y_s, y_s_adv, y_t, y_t_adv) """ def __init__(self, source_disparity: Callable, target_disparity: Callable, margin: Optional[float] = 4, reduction: Optional[str] = 'mean'): super(MarginDisparityDiscrepancy, self).__init__() self.margin = margin self.reduction = reduction self.source_disparity = source_disparity self.target_disparity = target_disparity def forward(self, y_s: torch.Tensor, y_s_adv: torch.Tensor, y_t: torch.Tensor, y_t_adv: torch.Tensor, w_s: Optional[torch.Tensor] = None, w_t: Optional[torch.Tensor] = None) -> torch.Tensor: source_loss = -self.margin * self.source_disparity(y_s, y_s_adv) target_loss = self.target_disparity(y_t, y_t_adv) if w_s is None: w_s = torch.ones_like(source_loss) source_loss = source_loss * w_s if w_t is None: w_t = torch.ones_like(target_loss) target_loss = target_loss * w_t loss = source_loss + target_loss if self.reduction == 'mean': loss = loss.mean() elif self.reduction == 'sum': loss = loss.sum() return loss
[docs]class ClassificationMarginDisparityDiscrepancy(MarginDisparityDiscrepancy): r""" The margin disparity discrepancy (MDD) proposed in `Bridging Theory and Algorithm for Domain Adaptation (ICML 2019) <https://arxiv.org/abs/1904.05801>`_. It measures the distribution discrepancy in domain adaptation for classification. When margin is equal to 1, it's also called disparity discrepancy (DD). The :math:`y^s` and :math:`y^t` are logits output by the main classifier on the source and target domain respectively. The :math:`y_{adv}^s` and :math:`y_{adv}^t` are logits output by the adversarial classifier. They are expected to contain raw, unnormalized scores for each class. The definition can be described as: .. math:: \mathcal{D}_{\gamma}(\hat{\mathcal{S}}, \hat{\mathcal{T}}) = \gamma \mathbb{E}_{y^s, y_{adv}^s \sim\hat{\mathcal{S}}} \log\left(\frac{\exp(y_{adv}^s[h_{y^s}])}{\sum_j \exp(y_{adv}^s[j])}\right) + \mathbb{E}_{y^t, y_{adv}^t \sim\hat{\mathcal{T}}} \log\left(1-\frac{\exp(y_{adv}^t[h_{y^t}])}{\sum_j \exp(y_{adv}^t[j])}\right), where :math:`\gamma` is a margin hyper-parameter and :math:`h_y` refers to the predicted label when the logits output is :math:`y`. You can see more details in `Bridging Theory and Algorithm for Domain Adaptation <https://arxiv.org/abs/1904.05801>`_. Args: margin (float): margin :math:`\gamma`. Default: 4 reduction (str, optional): Specifies the reduction to apply to the output: ``'none'`` | ``'mean'`` | ``'sum'``. ``'none'``: no reduction will be applied, ``'mean'``: the sum of the output will be divided by the number of elements in the output, ``'sum'``: the output will be summed. Default: ``'mean'`` Inputs: - y_s: logits output :math:`y^s` by the main classifier on the source domain - y_s_adv: logits output :math:`y^s` by the adversarial classifier on the source domain - y_t: logits output :math:`y^t` by the main classifier on the target domain - y_t_adv: logits output :math:`y_{adv}^t` by the adversarial classifier on the target domain Shape: - Inputs: :math:`(minibatch, C)` where C = number of classes, or :math:`(minibatch, C, d_1, d_2, ..., d_K)` with :math:`K \geq 1` in the case of `K`-dimensional loss. - Output: scalar. If :attr:`reduction` is ``'none'``, then the same size as the target: :math:`(minibatch)`, or :math:`(minibatch, d_1, d_2, ..., d_K)` with :math:`K \geq 1` in the case of K-dimensional loss. Examples:: >>> num_classes = 2 >>> batch_size = 10 >>> loss = ClassificationMarginDisparityDiscrepancy(margin=4.) >>> # logits output from source domain and target domain >>> y_s, y_t = torch.randn(batch_size, num_classes), torch.randn(batch_size, num_classes) >>> # adversarial logits output from source domain and target domain >>> y_s_adv, y_t_adv = torch.randn(batch_size, num_classes), torch.randn(batch_size, num_classes) >>> output = loss(y_s, y_s_adv, y_t, y_t_adv) """ def __init__(self, margin: Optional[float] = 4, **kwargs): def source_discrepancy(y: torch.Tensor, y_adv: torch.Tensor): _, prediction = y.max(dim=1) return F.cross_entropy(y_adv, prediction, reduction='none') def target_discrepancy(y: torch.Tensor, y_adv: torch.Tensor): _, prediction = y.max(dim=1) return -F.nll_loss(shift_log(1. - F.softmax(y_adv, dim=1)), prediction, reduction='none') super(ClassificationMarginDisparityDiscrepancy, self).__init__(source_discrepancy, target_discrepancy, margin, **kwargs)
[docs]class RegressionMarginDisparityDiscrepancy(MarginDisparityDiscrepancy): r""" The margin disparity discrepancy (MDD) proposed in `Bridging Theory and Algorithm for Domain Adaptation (ICML 2019) <https://arxiv.org/abs/1904.05801>`_. It measures the distribution discrepancy in domain adaptation for regression. The :math:`y^s` and :math:`y^t` are logits output by the main regressor on the source and target domain respectively. The :math:`y_{adv}^s` and :math:`y_{adv}^t` are logits output by the adversarial regressor. They are expected to contain ``normalized`` values for each factors. The definition can be described as: .. math:: \mathcal{D}_{\gamma}(\hat{\mathcal{S}}, \hat{\mathcal{T}}) = -\gamma \mathbb{E}_{y^s, y_{adv}^s \sim\hat{\mathcal{S}}} L (y^s, y_{adv}^s) + \mathbb{E}_{y^t, y_{adv}^t \sim\hat{\mathcal{T}}} L (y^t, y_{adv}^t), where :math:`\gamma` is a margin hyper-parameter and :math:`L` refers to the disparity function defined on both domains. You can see more details in `Bridging Theory and Algorithm for Domain Adaptation <https://arxiv.org/abs/1904.05801>`_. Args: loss_function (callable): The disparity function defined on both domains, :math:`L`. margin (float): margin :math:`\gamma`. Default: 1 reduction (str, optional): Specifies the reduction to apply to the output: ``'none'`` | ``'mean'`` | ``'sum'``. ``'none'``: no reduction will be applied, ``'mean'``: the sum of the output will be divided by the number of elements in the output, ``'sum'``: the output will be summed. Default: ``'mean'`` Inputs: - y_s: logits output :math:`y^s` by the main regressor on the source domain - y_s_adv: logits output :math:`y^s` by the adversarial regressor on the source domain - y_t: logits output :math:`y^t` by the main regressor on the target domain - y_t_adv: logits output :math:`y_{adv}^t` by the adversarial regressor on the target domain Shape: - Inputs: :math:`(minibatch, F)` where F = number of factors, or :math:`(minibatch, F, d_1, d_2, ..., d_K)` with :math:`K \geq 1` in the case of `K`-dimensional loss. - Output: scalar. The same size as the target: :math:`(minibatch)`, or :math:`(minibatch, d_1, d_2, ..., d_K)` with :math:`K \geq 1` in the case of K-dimensional loss. Examples:: >>> num_outputs = 2 >>> batch_size = 10 >>> loss = RegressionMarginDisparityDiscrepancy(margin=4., loss_function=F.l1_loss) >>> # output from source domain and target domain >>> y_s, y_t = torch.randn(batch_size, num_outputs), torch.randn(batch_size, num_outputs) >>> # adversarial output from source domain and target domain >>> y_s_adv, y_t_adv = torch.randn(batch_size, num_outputs), torch.randn(batch_size, num_outputs) >>> output = loss(y_s, y_s_adv, y_t, y_t_adv) """ def __init__(self, margin: Optional[float] = 1, loss_function=F.l1_loss, **kwargs): def source_discrepancy(y: torch.Tensor, y_adv: torch.Tensor): return loss_function(y_adv, y.detach(), reduction='none') def target_discrepancy(y: torch.Tensor, y_adv: torch.Tensor): return loss_function(y_adv, y.detach(), reduction='none') super(RegressionMarginDisparityDiscrepancy, self).__init__(source_discrepancy, target_discrepancy, margin, **kwargs)
[docs]def shift_log(x: torch.Tensor, offset: Optional[float] = 1e-6) -> torch.Tensor: r""" First shift, then calculate log, which can be described as: .. math:: y = \max(\log(x+\text{offset}), 0) Used to avoid the gradient explosion problem in log(x) function when x=0. Args: x (torch.Tensor): input tensor offset (float, optional): offset size. Default: 1e-6 .. note:: Input tensor falls in [0., 1.] and the output tensor falls in [-log(offset), 0] """ return torch.log(torch.clamp(x + offset, max=1.))
class GeneralModule(nn.Module): def __init__(self, backbone: nn.Module, num_classes: int, bottleneck: nn.Module, head: nn.Module, adv_head: nn.Module, grl: Optional[WarmStartGradientReverseLayer] = None, finetune: Optional[bool] = True): super(GeneralModule, self).__init__() self.backbone = backbone self.num_classes = num_classes self.bottleneck = bottleneck self.head = head self.adv_head = adv_head self.finetune = finetune self.grl_layer = WarmStartGradientReverseLayer(alpha=1.0, lo=0.0, hi=0.1, max_iters=1000, auto_step=False) if grl is None else grl def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: """""" features = self.backbone(x) features = self.bottleneck(features) outputs = self.head(features) features_adv = self.grl_layer(features) outputs_adv = self.adv_head(features_adv) if self.training: return outputs, outputs_adv else: return outputs def step(self): """ Gradually increase :math:`\lambda` in GRL layer. """ self.grl_layer.step() def get_parameters(self, base_lr=1.0) -> List[Dict]: """ Return a parameters list which decides optimization hyper-parameters, such as the relative learning rate of each layer. """ params = [ {"params": self.backbone.parameters(), "lr": 0.1 * base_lr if self.finetune else base_lr}, {"params": self.bottleneck.parameters(), "lr": base_lr}, {"params": self.head.parameters(), "lr": base_lr}, {"params": self.adv_head.parameters(), "lr": base_lr} ] return params
[docs]class ImageClassifier(GeneralModule): r"""Classifier for MDD. Classifier for MDD has one backbone, one bottleneck, while two classifier heads. The first classifier head is used for final predictions. The adversarial classifier head is only used when calculating MarginDisparityDiscrepancy. Args: backbone (torch.nn.Module): Any backbone to extract 1-d features from data num_classes (int): Number of classes bottleneck_dim (int, optional): Feature dimension of the bottleneck layer. Default: 1024 width (int, optional): Feature dimension of the classifier head. Default: 1024 grl (nn.Module): Gradient reverse layer. Will use default parameters if None. Default: None. finetune (bool, optional): Whether use 10x smaller learning rate in the backbone. Default: True Inputs: - x (tensor): input data Outputs: - outputs: logits outputs by the main classifier - outputs_adv: logits outputs by the adversarial classifier Shapes: - x: :math:`(minibatch, *)`, same shape as the input of the `backbone`. - outputs, outputs_adv: :math:`(minibatch, C)`, where C means the number of classes. .. note:: Remember to call function `step()` after function `forward()` **during training phase**! For instance, >>> # x is inputs, classifier is an ImageClassifier >>> outputs, outputs_adv = classifier(x) >>> classifier.step() """ def __init__(self, backbone: nn.Module, num_classes: int, bottleneck_dim: Optional[int] = 1024, width: Optional[int] = 1024, grl: Optional[WarmStartGradientReverseLayer] = None, finetune=True, pool_layer=None): grl_layer = WarmStartGradientReverseLayer(alpha=1.0, lo=0.0, hi=0.1, max_iters=1000, auto_step=False) if grl is None else grl if pool_layer is None: pool_layer = nn.Sequential( nn.AdaptiveAvgPool2d(output_size=(1, 1)), nn.Flatten() ) bottleneck = nn.Sequential( pool_layer, nn.Linear(backbone.out_features, bottleneck_dim), nn.BatchNorm1d(bottleneck_dim), nn.ReLU(), nn.Dropout(0.5) ) bottleneck[1].weight.data.normal_(0, 0.005) bottleneck[1].bias.data.fill_(0.1) # The classifier head used for final predictions. head = nn.Sequential( nn.Linear(bottleneck_dim, width), nn.ReLU(), nn.Dropout(0.5), nn.Linear(width, num_classes) ) # The adversarial classifier head adv_head = nn.Sequential( nn.Linear(bottleneck_dim, width), nn.ReLU(), nn.Dropout(0.5), nn.Linear(width, num_classes) ) for dep in range(2): head[dep * 3].weight.data.normal_(0, 0.01) head[dep * 3].bias.data.fill_(0.0) adv_head[dep * 3].weight.data.normal_(0, 0.01) adv_head[dep * 3].bias.data.fill_(0.0) super(ImageClassifier, self).__init__(backbone, num_classes, bottleneck, head, adv_head, grl_layer, finetune)
[docs]class ImageRegressor(GeneralModule): r"""Regressor for MDD. Regressor for MDD has one backbone, one bottleneck, while two regressor heads. The first regressor head is used for final predictions. The adversarial regressor head is only used when calculating MarginDisparityDiscrepancy. Args: backbone (torch.nn.Module): Any backbone to extract 1-d features from data num_factors (int): Number of factors bottleneck_dim (int, optional): Feature dimension of the bottleneck layer. Default: 1024 width (int, optional): Feature dimension of the classifier head. Default: 1024 finetune (bool, optional): Whether use 10x smaller learning rate in the backbone. Default: True Inputs: - x (Tensor): input data Outputs: (outputs, outputs_adv) - outputs: outputs by the main regressor - outputs_adv: outputs by the adversarial regressor Shapes: - x: :math:`(minibatch, *)`, same shape as the input of the `backbone`. - outputs, outputs_adv: :math:`(minibatch, F)`, where F means the number of factors. .. note:: Remember to call function `step()` after function `forward()` **during training phase**! For instance, >>> # x is inputs, regressor is an ImageRegressor >>> outputs, outputs_adv = regressor(x) >>> regressor.step() """ def __init__(self, backbone: nn.Module, num_factors: int, bottleneck = None, head=None, adv_head=None, bottleneck_dim: Optional[int] = 1024, width: Optional[int] = 1024, finetune=True): grl_layer = WarmStartGradientReverseLayer(alpha=1.0, lo=0.0, hi=0.1, max_iters=1000, auto_step=False) if bottleneck is None: bottleneck = nn.Sequential( nn.Conv2d(backbone.out_features, bottleneck_dim, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(bottleneck_dim), nn.ReLU(), ) # The regressor head used for final predictions. if head is None: head = nn.Sequential( nn.Conv2d(bottleneck_dim, width, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(width), nn.ReLU(), nn.Conv2d(width, width, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(width), nn.ReLU(), nn.AdaptiveAvgPool2d(output_size=(1, 1)), nn.Flatten(), nn.Linear(width, num_factors), nn.Sigmoid() ) for layer in head: if isinstance(layer, nn.Conv2d) or isinstance(layer, nn.Linear): nn.init.normal_(layer.weight, 0, 0.01) nn.init.constant_(layer.bias, 0) # The adversarial regressor head if adv_head is None: adv_head = nn.Sequential( nn.Conv2d(bottleneck_dim, width, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(width), nn.ReLU(), nn.Conv2d(width, width, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(width), nn.ReLU(), nn.AdaptiveAvgPool2d(output_size=(1, 1)), nn.Flatten(), nn.Linear(width, num_factors), nn.Sigmoid() ) for layer in adv_head: if isinstance(layer, nn.Conv2d) or isinstance(layer, nn.Linear): nn.init.normal_(layer.weight, 0, 0.01) nn.init.constant_(layer.bias, 0) super(ImageRegressor, self).__init__(backbone, num_factors, bottleneck, head, adv_head, grl_layer, finetune) self.num_factors = num_factors

Docs

Access comprehensive documentation for Transfer Learning Library

View Docs

Tutorials

Get started for Transfer Learning Library

Get Started