前言 因为是面向小白,实践向的,解释相关理论的会再发篇文章。 Q:我一点AI都不懂怎么办? A:没事作者也是面向gpt编程,纯小白也可以尝试^_^ Q:为什么会想自己写一个大模型?现在的大模型已经很成熟了我为什么要自己写一个? A:一个是最近的热点deepseek,感觉AI很有意思。但主要是觉得自己写一个很酷()事情都是从零到一的,我认为自己先写一个完全属于自己的模型,即使它一直报错,即使它回答的问题一直不对,在写完的那一刻的成就感也是无可比拟的。有成就感,有兴趣,才能一步步深入地学习了解^_^ 一、搭建开发环境1
| pip install numpy tqdm matplotlib
|
1
| pip install numpy tqdm matplotlib
| 二、准备一个数据集1.准备环境(以openwebtext为例)python版本得高一点,我用的3.12,3.8不行。 1
| pip install -r requirements.txt
|
如果老报错版本问题就直接下(没报错就用requirements.txt) 1
| pip install beautifulsoup4 certifi chardet cssselect feedfinder2 feedparser htmlmin idna jieba3k lxml newspaper3k nltk numpy pandas pillow python-dateutil pytorch-pretrained-bert pytz pyyaml recordtype requests-file requests singledispatch six soupsieve spacy tinysegmenter tldextract tqdm urllib3 urlparse2 pycurl pebble chardet transformers
| 2.加载&预处理数据(1)、直接下载Pushshift 数据这里面的url是已经去重了的,正常流程如下 1
2
3
4
5
6
7
8
| 提取 URL
python extract_urls.py --single_file pushshift_dumps/RS_v2_2005-06.xz
想提取一个时间范围内的 URL
python extract_urls.py --year_start 2016 --year_end 2018
去重 URL
python deduplicate_urls.py --input_dir url_dumps
| (2)、下载HTML数据1
| python312 download.py D:\Tools\openwebtext\URLs\RS_2011-01.bz2.deduped.txt --n_procs 100 --scraper raw --chunk_size 100000 --compress --timeout 30
|
将抓取的 HTML 页面存储在 scraped 文件夹中,并压缩存档。 等挺久的,挂着睡觉了。默认它done了就是好了()
纯小白向从零开始手写大模型
纯小白向从零开始手写大模型
(3)、从HTML中提取文本1
| pip install --upgrade newspaper3k
|
把extract_text.py里的save_parsed_file改成如下 1
2
3
4
5
6
7
8
9
10
| def save_parsed_file(filename, text, out_dir):
# 获取文件的完整路径
file_path = os.path.join(out_dir, filename)
# 确保目录存在,如果不存在则创建
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# 写入文件
with open(file_path, 'w', encoding='utf-8') as handle:
handle.write(text)
|
纯小白向从零开始手写大模型
1
| python312 extract_text.py --html_archive scraped/RS_2011-01-1_data.xz --n_procs 100
|
从 HTML 中提取出文本内容并保存为 .txt 文件. --如果中间有报错重新来的话,记得把原来提取的文件删掉,文件夹在scraped里 (4)、分词处理1
| python -m spacy download en_core_web_sm
|
更改tokenize_text.py 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
| import spacy
import io
import argparse
import glob
import os
import tqdm
from multiprocessing import Pool
from functools import partial
import chardet
def detect_encoding(file_path):
"""检测文件的实际编码"""
with open(file_path, 'rb') as f:
raw_data = f.read(1024) # 读取文件的前 1KB 数据
result = chardet.detect(raw_data)
return result['encoding'] or 'utf-8' # 如果检测失败,默认返回 'utf-8'
def save_tokenized_text(output_dir, filename, text):
# 构建完整输出路径
text_file = os.path.join(output_dir, filename)
# 确保目标目录存在
os.makedirs(os.path.dirname(text_file), exist_ok=True)
# 保存文件
with io.open(text_file, 'w', encoding='utf-8') as fo:
fo.write(text)
def tokenizeSpacy(args):
nlp = spacy.load("en_core_web_sm") # 加载 spaCy 模型
extraction_file_paths = glob.glob(args.input_glob)
for extraction_file_path in extraction_file_paths:
path, filename = os.path.split(extraction_file_path)
text_file = os.path.join(
args.output_dir, filename.replace('.txt', '.tokenized.txt'))
# 确保输出目录存在
os.makedirs(os.path.dirname(text_file), exist_ok=True)
# 检测文件编码
file_encoding = detect_encoding(extraction_file_path)
try:
# 打开输入文件和输出文件
with io.open(extraction_file_path, 'r', encoding=file_encoding) as fi, \
io.open(text_file, 'w', encoding='utf-8') as fo:
omitted_line_count = 0
for line in fi:
if len(line.strip()) > 0: # 忽略空行
doc = nlp(line)
fo.write(' '.join([x.text for x in doc]) + '\n')
else:
omitted_line_count += 1
print(f'Omitted {omitted_line_count} empty lines from {filename}')
except UnicodeDecodeError:
print(f"Failed to decode {extraction_file_path} with encoding {file_encoding}. Skipping this file.")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--input_glob', type=str, default='*.txt')
parser.add_argument('--output_dir', type=str, default='tokenized')
parser.add_argument('--tokenizer', type=str, default='spacy', choices=['spacy', 'gpt2'])
parser.add_argument('--combine', type=int, default=1e8, help="min tokens per file in gpt2 mode")
parser.add_argument('--file_bs', type=int, default=10000, help="files per batch in gpt2 mode")
# 解析命令行参数
args = parser.parse_args()
# 确保输出目录存在
os.makedirs(args.output_dir, exist_ok=True)
# 根据 tokenizer 选择执行的函数
if args.tokenizer == 'spacy':
tokenizeSpacy(args)
else:
print("GPT-2 tokenizer is not implemented in this version.")
|
1
| python312 tokenize_text.py --input_glob "parsed/RS_2011-01/*.txt" --output_dir tokenized
|
纯小白向从零开始手写大模型
三、构建和训练 GPT 类似模型下面的文件层级关系如下 1
2
3
4
5
6
7
8
9
10
11
12
13
| gpt_project/
├── model/
│ ├── gpt.py
│ ├── transformer_block.py
├── data/
│ ├── dataset.py
│ ├── tokenizer.py # 可选
│ ├── tokenized/ # 存放所有分词好的 .txt 文件
├── train/
│ ├── train.py # 训练代码
|——train_model/
|
├── inference.py # 生成文本
|
纯小白向从零开始手写大模型
1. dataset.py用于处理数据集 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| #dataset.py
import torch
import os
from collections import Counter
from transformers import AutoTokenizer
class TextDataset(torch.utils.data.Dataset):
def __init__(self, directory_path, seq_length, tokenizer):
self.seq_length = seq_length
self.tokenizer = tokenizer
self.data = []
self.vocab = {}
self.inverse_vocab = {}
# 第一步:统计所有单词的频率
word_counter = Counter()
# 遍历 directory_path 目录中的所有 .tokenized.txt 文件
for filename in os.listdir(directory_path):
if filename.endswith(".tokenized.txt"):
file_path = os.path.join(directory_path, filename)
with open(file_path, "r", encoding="utf-8") as f:
words = f.read().split()
word_counter.update(words)
# 第二步:创建词汇表,给每个单词分配一个 ID
self.vocab = {word: idx + 1 for idx, (word, _) in enumerate(word_counter.items())}
self.vocab['<pad>'] = 0 # 为 padding 添加一个 ID
self.vocab['<unk>'] = len(self.vocab) # 为未知单词添加一个 ID
# 创建逆词汇表
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
# 第三步:将文本转换为 token ID
for filename in os.listdir(directory_path):
if filename.endswith(".tokenized.txt"):
file_path = os.path.join(directory_path, filename)
with open(file_path, "r", encoding="utf-8") as f:
words = f.read().split()
# 将每个单词转换为 token ID,如果不在词汇表中则使用 <unk>
token_ids = [self.vocab.get(word, self.vocab['<unk>']) for word in words]
self.data.append(token_ids)
# 将数据转化为训练所需的序列形式
self.data = [self.pad_sequence(seq) for seq in self.data]
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
input_text = self.data[idx]
# 编码输入文本
input_ids = torch.tensor(input_text) # 转换为 tensor
target_ids = input_ids.clone() # 使用输入作为目标
return input_ids, target_ids
def pad_sequence(self, seq):
"""填充序列到 seq_length"""
if len(seq) < self.seq_length:
# 使用 pad token 填充
seq += [self.vocab['<pad>']] * (self.seq_length - len(seq))
else:
# 如果超出 seq_length,则截断
seq = seq[:self.seq_length]
return seq
'''
def __getitem__(self, idx):
input_ids = torch.tensor(self.data[idx], dtype=torch.long)
# 如果输入序列长度小于 seq_length,进行填充
padding_length = self.seq_length - input_ids.size(0)
if padding_length > 0:
padding = torch.tensor([self.vocab['<pad>']] * padding_length, dtype=torch.long)
input_ids = torch.cat([input_ids, padding], dim=0)
# 设置 target_ids 为 input_ids 的下一个 token(即语言模型的训练目标)
target_ids = input_ids[1:].clone()
target_ids = torch.cat([target_ids, torch.tensor([self.vocab['<pad>']], dtype=torch.long)])
return input_ids, target_ids
'''
| 2.gpt.py实现 GPT 模型 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
| # gpt.py
import torch
import torch.nn as nn
import os
import sys
import torch.nn.functional as F
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)
from model.transformer_block import TransformerBlock
class GPT(nn.Module):
def __init__(self, vocab_size, embed_size, num_layers, num_heads, hidden_dim, max_length):
super(GPT, self).__init__()
self.hidden_dim = hidden_dim # 添加 hidden_dim 变量
class GPT(nn.Module):
def __init__(self, vocab_size, embed_size, num_heads, num_layers, max_length):
super(GPT, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.position_embedding = nn.Embedding(max_length, embed_size)
self.blocks = nn.ModuleList([
TransformerBlock(embed_size, num_heads, embed_size * 4)
for _ in range(num_layers)
])
self.fc_out = nn.Linear(embed_size, vocab_size)
def forward(self, x):
batch_size, seq_length = x.shape
positions = torch.arange(0, seq_length).expand(batch_size, seq_length)
x = self.embedding(x) + self.position_embedding(positions)
for block in self.blocks:
x = block(x)
return self.fc_out(x)
def generate(self, input_ids, max_length=100, temperature=1.0, top_k=50):
self.eval() # 设置为评估模式
# 获取初始输出
generated_ids = input_ids
for _ in range(max_length):
outputs = self(generated_ids)
logits = outputs # 假设模型的输出是 logits
logits = logits[:, -1, :] # 只关注最新生成的 token
# 应用温度采样
logits = logits / temperature
# Top-K 采样
if top_k > 0:
top_k_values, top_k_indices = torch.topk(logits, top_k)
top_k_probs = F.softmax(top_k_values, dim=-1)
next_token = torch.multinomial(top_k_probs, 1)
next_token = top_k_indices.gather(-1, next_token)
else:
# 默认采样
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, 1)
# 添加生成的 token 到输入序列
generated_ids = torch.cat([generated_ids, next_token], dim=-1)
return generated_ids
| 3.transformer_block.py用于实现 GPT 模型的一个基本组件——Transformer 块。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| # transformer_block.py
import torch
import torch.nn as nn
class AttentionHead(nn.Module):
def __init__(self, embed_size, head_size):
super().__init__()
self.q = nn.Linear(embed_size, head_size, bias=False)
self.k = nn.Linear(embed_size, head_size, bias=False)
self.v = nn.Linear(embed_size, head_size, bias=False)
self.scale = head_size ** -0.5
def forward(self, x):
q = self.q(x)
k = self.k(x)
v = self.v(x)
scores = torch.matmul(q, k.transpose(-2, -1)) * self.scale
attention = torch.softmax(scores, dim=-1)
return torch.matmul(attention, v)
class TransformerBlock(nn.Module):
def __init__(self, embed_size, num_heads, feed_forward_size):
super().__init__()
self.attention = nn.MultiheadAttention(embed_dim=embed_size, num_heads=num_heads)
self.ff = nn.Sequential(
nn.Linear(embed_size, feed_forward_size),
nn.ReLU(),
nn.Linear(feed_forward_size, embed_size),
)
self.norm1 = nn.LayerNorm(embed_size)
self.norm2 = nn.LayerNorm(embed_size)
def forward(self, x):
attention_out, _ = self.attention(x, x, x)
x = self.norm1(x + attention_out)
ff_out = self.ff(x)
return self.norm2(x + ff_out)
| 4. generate.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import torch
from model.gpt import GPT
from data.tokenizer import SimpleTokenizer
def generate_text(prompt, model, tokenizer, max_length=50):
tokens = tokenizer.encode(prompt)
input_data = torch.tensor([tokens])
model.eval()
with torch.no_grad():
for _ in range(max_length):
output = model(input_data)
next_token = torch.argmax(output[:, -1, :], dim=-1)
input_data = torch.cat([input_data, next_token.unsqueeze(0)], dim=1)
if next_token.item() == tokenizer.word_to_idx["<END>"]:
break
return tokenizer.decode(input_data[0].tolist())
# Example
if __name__ == "__main__":
vocab_size = 10000
embed_size = 128
num_heads = 8
num_layers = 4
max_length = 100
# Load pretrained model
model = GPT(vocab_size, embed_size, num_heads, num_layers, max_length)
tokenizer = SimpleTokenizer(vocab_size)
tokenizer.build_vocab("This is a small test corpus <END>")
prompt = "This is a"
print(generate_text(prompt, model, tokenizer))
| 5.train.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
| # train/train.py
import sys
import os
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)
import glob
import torch
from torch.utils.data import DataLoader
from data.dataset import TextDataset
from model.gpt import GPT
from torch.optim import AdamW
from torch.optim.lr_scheduler import StepLR
from transformers import AutoTokenizer
from transformers import GPT2LMHeadModel
# 在外部加载模型和分词器
model = GPT2LMHeadModel.from_pretrained('gpt2')
def train():
# 加载分词器
tokenizer = AutoTokenizer.from_pretrained('gpt2')
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# 配置数据集
dataset = TextDataset(directory_path="data/tokenized", seq_length=128, tokenizer=tokenizer)
data_loader = DataLoader(dataset, batch_size=4, shuffle=True)
# 配置优化器
optimizer = AdamW(model.parameters(), lr=1e-5)
scheduler = StepLR(optimizer, step_size=5, gamma=0.5) # 每5个epoch减少学习率
# 训练模型
model.train()
for epoch in range(3): # 假设训练3个epoch
total_loss = 0
for batch_idx, (input_ids, target_ids) in enumerate(data_loader):
optimizer.zero_grad()
input_ids = input_ids.to(device)
target_ids = target_ids.to(device)
# 前向传播
outputs = model(input_ids, labels=target_ids)
loss = outputs.loss
total_loss += loss.item()
# 反向传播
loss.backward()
# 训练过程中添加梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
# 输出每个 epoch 的损失
print(f"Epoch {epoch+1}, Loss: {total_loss / len(data_loader)}")
# 学习率调度
scheduler.step()
# 保存训练好的模型
torch.save(model.state_dict(), "trained_model/model.pth")
tokenizer.save_pretrained("trained_model")
if __name__ == "__main__":
train()
| 6. inference.py这是推理脚本,用于加载训练好的模型并进行推理(生成预测) 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| # inference.py
import torch
import sys
import os
import torch.nn.functional as F
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)
from model.gpt import GPT
from transformers import AutoTokenizer
def load_model():
# 加载模型
model = GPT(vocab_size=50000, embed_size=256, num_layers=6, num_heads=8, max_length=512)
model.load_state_dict(torch.load("trained_model/model.pth"), strict=False)
model.eval() # 设置为评估模式
# 加载分词器
tokenizer = AutoTokenizer.from_pretrained('gpt2')
return model, tokenizer # 返回模型和分词器
def chat():
model, tokenizer = load_model() # 加载模型和分词器
while True:
text = input("Input: ") # 获取用户输入
if text.lower() == "quit":
break
# 对输入文本进行编码
input_ids = tokenizer.encode(text, return_tensors='pt')
# 生成文本
generated_ids = model.generate(input_ids, max_length=100, temperature=1.0, top_k=50)
# 解码并打印生成的文本
output_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
print(f"GPT: {output_text}")
if __name__ == "__main__":
chat()
|
然后运行顺序: 1
| python312 train/train.py
|
纯小白向从零开始手写大模型
纯小白向从零开始手写大模型
精确的模型需要深入的理论和庞大的数据训练。虽然很人机,并且一点也不智能,但是已经向未知的领域迈出了第一步不是吗?
|