-
Notifications
You must be signed in to change notification settings - Fork 0
/
bigram.py
126 lines (108 loc) · 3.89 KB
/
bigram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import torch
import torch.nn as nn
from torch.nn import functional as F
#hyperparameters
batch_size = 32 #how many sequences to run in parallel
block_size = 8 #the maximum context length of predictions
max_iters = 3000
eval_interval = 300
learning_rate = 1e-2
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters =100
n_embed = 32
print(device)
torch.manual_seed(1337)
with open('input.txt', 'r', encoding='utf-8') as f:
text = f.read()
#here are the unique characters in the text
chars = sorted(list(set(text)))
vocab_size = len(chars)
#making an encoder Decoder
stoi = {ch:i for i,ch in enumerate(chars)}
itos = {i:ch for i,ch in enumerate(chars)}
encode = lambda s:[stoi[c] for c in s]
decode = lambda d: ''.join([itos[i] for i in d])
#encoding whole dataset
data = torch.tensor(encode(text),dtype = torch.long)
#making a train test split
n = int(0.9*len(data))
train_data = data[:n]
val_data = data[n:]
train_data[:block_size+1]
#data loading
def get_batch(split):
#generate a small batch of inputs
data = train_data if split == "train" else val_data
ix = torch.randint(len(data)-block_size, (batch_size,))
x = torch.stack([data[i:i+block_size] for i in ix])
y = torch.stack([data[i+1:i+1+block_size] for i in ix])
x,y = x.to(device),y.to(device)
return x,y
@torch.no_grad()
def estimate_loss():
out = {}
model.eval()
for split in ['train','val']:
losses = torch.zeros(eval_iters)
for k in range(eval_iters):
X,Y = get_batch(split)
logits,loss = model(X,Y)
losses[k] = loss.item()
out[split] = losses.mean().item()
model.train()
return out
# making a bigram language model
class BigramLanguageModel(nn.Module):
def __init__(self):
super().__init__()
#each token directly reads the logits from the next token in the lookup table
#table size is vocab_Size x vocab_Size
self.token_embedding_table= nn.Embedding(vocab_size,n_embed)
self.position_embedding_table = nn.Embedding(block_size,n_embed)
self.lm_head = nn.Linear(n_embed,vocab_size)
def forward(self,idx,targets = None):
#idx and targets are both (B,T) of tensor integers
#batch x time x channel
token_embed = self.token_embedding_table(idx) #(B,T,C)
logits = self.lm_head(token_embed) #(B,T,Vocab_size)
if targets is None:
loss = None
#pytorch wants b x c x t
else:
B,T,C = logits.shape
logits = logits.view(B*T,C)
targets = targets.view(B*T)
loss = F.cross_entropy(logits,targets)
return logits,loss
def generate(self, idx,max_new_tokens):
#idx is (B,T) of integers
for _ in range(max_new_tokens):
logits,loss = self(idx)
#focus only on the last timestep
logits = logits[:,-1,:] #becomes (B,C)
#apply softmaxx to get probabilities
probs = F.softmax(logits,dim=-1) #(B,C)
#sample from the distribution
idx_next = torch.multinomial(probs,num_samples = 1) #(B,1)
#append sampled index to running sequence
idx = torch.cat((idx,idx_next),dim=1) #(B,T+1)
return idx
model = BigramLanguageModel()
m = model.to(device)
#optimizer object
optimizer = torch.optim.AdamW(m.parameters(),lr = 1e-3)
for iter in range(max_iters):
#sample batch of data
if iter % eval_interval == 0:
losses = estimate_loss()
print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
# sample a batch of data
xb, yb = get_batch('train')
# evaluate the loss
logits, loss = model(xb, yb)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))