forked from akkaze/unet-lightning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUnet.py
141 lines (116 loc) · 4.78 KB
/
Unet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import logging
from argparse import ArgumentParser
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch import optim
from torch.utils.data import DataLoader, random_split
from torch.utils.data.distributed import DistributedSampler
import pytorch_lightning as pl
from dataset import DirDataset
class Unet(pl.LightningModule):
def __init__(self, hparams):
super(Unet, self).__init__()
self.hparams = hparams
self.n_channels = hparams.n_channels
self.n_classes = hparams.n_classes
self.bilinear = True
def double_conv(in_channels, out_channels):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
)
def down(in_channels, out_channels):
return nn.Sequential(
nn.MaxPool2d(2),
double_conv(in_channels, out_channels)
)
class up(nn.Module):
def __init__(self, in_channels, out_channels, bilinear=True):
super().__init__()
if bilinear:
self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
else:
self.up = nn.ConvTranpose2d(in_channels // 2, in_channels // 2,
kernel_size=2, stride=2)
self.conv = double_conv(in_channels, out_channels)
def forward(self, x1, x2):
x1 = self.up(x1)
# [?, C, H, W]
diffY = x2.size()[2] - x1.size()[2]
diffX = x2.size()[3] - x1.size()[3]
x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
diffY // 2, diffY - diffY // 2])
x = torch.cat([x2, x1], dim=1) ## why 1?
return self.conv(x)
self.inc = double_conv(self.n_channels, 64)
self.down1 = down(64, 128)
self.down2 = down(128, 256)
self.down3 = down(256, 512)
self.down4 = down(512, 512)
self.up1 = up(1024, 256)
self.up2 = up(512, 128)
self.up3 = up(256, 64)
self.up4 = up(128, 64)
self.out = nn.Conv2d(64, self.n_classes, kernel_size=1)
def forward(self, x):
x1 = self.inc(x)
x2 = self.down1(x1)
x3 = self.down2(x2)
x4 = self.down3(x3)
x5 = self.down4(x4)
x = self.up1(x5, x4)
x = self.up2(x, x3)
x = self.up3(x, x2)
x = self.up4(x, x1)
return self.out(x)
def training_step(self, batch, batch_nb):
x, y = batch
y_hat = self.forward(x)
loss = F.cross_entropy(y_hat, y) if self.n_classes > 1 else \
F.binary_cross_entropy_with_logits(y_hat, y)
tensorboard_logs = {'train_loss': loss}
return {'loss': loss, 'log': tensorboard_logs}
def validation_step(self, batch, batch_nb):
x, y = batch
y_hat = self.forward(x)
loss = F.cross_entropy(y_hat, y) if self.n_classes > 1 else \
F.binary_cross_entropy_with_logits(y_hat, y)
return {'val_loss': loss}
def validation_end(self, outputs):
avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
tensorboard_logs = {'val_loss': avg_loss}
return {'avg_val_loss': avg_loss, 'log': tensorboard_logs}
def configure_optimizers(self):
return torch.optim.RMSprop(self.parameters(), lr=0.1, weight_decay=1e-8)
def __dataloader(self):
dataset = self.hparams.dataset
dataset = DirDataset(f'./dataset/{dataset}/train', f'./dataset/{dataset}/train_masks')
n_val = int(len(dataset) * 0.1)
n_train = len(dataset) - n_val
train_ds, val_ds = random_split(dataset, [n_train, n_val])
train_loader = DataLoader(train_ds, batch_size=1, pin_memory=True, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=1, pin_memory=True, shuffle=False)
return {
'train': train_loader,
'val': val_loader,
}
@pl.data_loader
def train_dataloader(self):
return self.__dataloader()['train']
@pl.data_loader
def val_dataloader(self):
return self.__dataloader()['val']
@staticmethod
def add_model_specific_args(parent_parser):
parser = ArgumentParser(parents=[parent_parser])
parser.add_argument('--n_channels', type=int, default=3)
parser.add_argument('--n_classes', type=int, default=1)
return parser