学逆向论坛

找回密码
立即注册

只需一步,快速开始

发新帖

2万

积分

41

好友

1179

主题
发表于 2025-2-12 10:55:12 | 查看: 132| 回复: 0
前言
因为是面向小白,实践向的,解释相关理论的会再发篇文章。
Q:我一点AI都不懂怎么办?
A:没事作者也是面向gpt编程,纯小白也可以尝试^_^
Q:为什么会想自己写一个大模型?现在的大模型已经很成熟了我为什么要自己写一个?
A:一个是最近的热点deepseek,感觉AI很有意思。但主要是觉得自己写一个很酷()事情都是从零到一的,我认为自己先写一个完全属于自己的模型,即使它一直报错,即使它回答的问题一直不对,在写完的那一刻的成就感也是无可比拟的。有成就感,有兴趣,才能一步步深入地学习了解^_^
一、搭建开发环境
  • python3
  • pytorch
1
pip install numpy tqdm matplotlib

  • 依赖库
1
pip install numpy tqdm matplotlib

  • vscode / Jupyter
二、准备一个数据集1.准备环境(以openwebtext为例)
1
2
python版本得高一点,我用的3.12,3.8不行。
1
pip install -r requirements.txt

如果老报错版本问题就直接下(没报错就用requirements.txt)
1
pip install beautifulsoup4 certifi chardet cssselect feedfinder2 feedparser htmlmin idna jieba3k lxml newspaper3k nltk numpy pandas pillow python-dateutil pytorch-pretrained-bert pytz pyyaml recordtype requests-file requests singledispatch six soupsieve spacy tinysegmenter tldextract tqdm urllib3 urlparse2 pycurl pebble chardet transformers

2.加载&预处理数据(1)、直接下载Pushshift 数据
这里面的url是已经去重了的,正常流程如下
1
2
3
4
5
6
7
8
提取 URL
python extract_urls.py --single_file pushshift_dumps/RS_v2_2005-06.xz

想提取一个时间范围内的 URL
python extract_urls.py --year_start 2016 --year_end 2018

去重 URL
python deduplicate_urls.py --input_dir url_dumps

(2)、下载HTML数据
1
python312 download.py D:\Tools\openwebtext\URLs\RS_2011-01.bz2.deduped.txt --n_procs 100 --scraper raw --chunk_size 100000 --compress --timeout 30

将抓取的 HTML 页面存储在 scraped 文件夹中,并压缩存档。
等挺久的,挂着睡觉了。默认它done了就是好了()

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型
(3)、从HTML中提取文本
1
pip install --upgrade newspaper3k

把extract_text.py里的save_parsed_file改成如下
1
2
3
4
5
6
7
8
9
10
def save_parsed_file(filename, text, out_dir):
    # 获取文件的完整路径
    file_path = os.path.join(out_dir, filename)
     
    # 确保目录存在,如果不存在则创建
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
     
    # 写入文件
    with open(file_path, 'w', encoding='utf-8') as handle:
        handle.write(text)

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型
1
python312 extract_text.py --html_archive scraped/RS_2011-01-1_data.xz --n_procs 100

从 HTML 中提取出文本内容并保存为 .txt 文件.
--如果中间有报错重新来的话,记得把原来提取的文件删掉,文件夹在scraped里
(4)、分词处理
1
python -m spacy download en_core_web_sm

更改tokenize_text.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import spacy
import io
import argparse
import glob
import os
import tqdm
from multiprocessing import Pool
from functools import partial
import chardet

def detect_encoding(file_path):
    """检测文件的实际编码"""
    with open(file_path, 'rb') as f:
        raw_data = f.read(1024)  # 读取文件的前 1KB 数据
    result = chardet.detect(raw_data)
    return result['encoding'] or 'utf-8'  # 如果检测失败,默认返回 'utf-8'



def save_tokenized_text(output_dir, filename, text):
    # 构建完整输出路径
    text_file = os.path.join(output_dir, filename)

    # 确保目标目录存在
    os.makedirs(os.path.dirname(text_file), exist_ok=True)

    # 保存文件
    with io.open(text_file, 'w', encoding='utf-8') as fo:
        fo.write(text)

def tokenizeSpacy(args):
    nlp = spacy.load("en_core_web_sm")  # 加载 spaCy 模型
    extraction_file_paths = glob.glob(args.input_glob)

    for extraction_file_path in extraction_file_paths:
        path, filename = os.path.split(extraction_file_path)
        text_file = os.path.join(
            args.output_dir, filename.replace('.txt', '.tokenized.txt'))

        # 确保输出目录存在
        os.makedirs(os.path.dirname(text_file), exist_ok=True)

        # 检测文件编码
        file_encoding = detect_encoding(extraction_file_path)

        try:
            # 打开输入文件和输出文件
            with io.open(extraction_file_path, 'r', encoding=file_encoding) as fi, \
                    io.open(text_file, 'w', encoding='utf-8') as fo:

                omitted_line_count = 0
                for line in fi:
                    if len(line.strip()) > 0:  # 忽略空行
                        doc = nlp(line)
                        fo.write(' '.join([x.text for x in doc]) + '\n')
                    else:
                        omitted_line_count += 1

            print(f'Omitted {omitted_line_count} empty lines from {filename}')
        except UnicodeDecodeError:
            print(f"Failed to decode {extraction_file_path} with encoding {file_encoding}. Skipping this file.")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_glob', type=str, default='*.txt')
    parser.add_argument('--output_dir', type=str, default='tokenized')
    parser.add_argument('--tokenizer', type=str, default='spacy', choices=['spacy', 'gpt2'])
    parser.add_argument('--combine', type=int, default=1e8, help="min tokens per file in gpt2 mode")
    parser.add_argument('--file_bs', type=int, default=10000, help="files per batch in gpt2 mode")

    # 解析命令行参数
    args = parser.parse_args()

    # 确保输出目录存在
    os.makedirs(args.output_dir, exist_ok=True)

    # 根据 tokenizer 选择执行的函数
    if args.tokenizer == 'spacy':
        tokenizeSpacy(args)
    else:
        print("GPT-2 tokenizer is not implemented in this version.")

1
python312 tokenize_text.py --input_glob "parsed/RS_2011-01/*.txt" --output_dir tokenized

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型
三、构建和训练 GPT 类似模型
下面的文件层级关系如下
1
2
3
4
5
6
7
8
9
10
11
12
13
gpt_project/
├── model/
│   ├── gpt.py
│   ├── transformer_block.py
├── data/
│   ├── dataset.py
│   ├── tokenizer.py  # 可选
│   ├── tokenized/  # 存放所有分词好的 .txt 文件
├── train/
│   ├── train.py  # 训练代码
|——train_model/
|
├── inference.py  # 生成文本

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型
1. dataset.py
用于处理数据集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#dataset.py
import torch
import os
from collections import Counter
from transformers import AutoTokenizer

class TextDataset(torch.utils.data.Dataset):
    def __init__(self, directory_path, seq_length, tokenizer):
        self.seq_length = seq_length
        self.tokenizer = tokenizer
        self.data = []
        self.vocab = {}
        self.inverse_vocab = {}
         
        # 第一步:统计所有单词的频率
        word_counter = Counter()
         
        # 遍历 directory_path 目录中的所有 .tokenized.txt 文件
        for filename in os.listdir(directory_path):
            if filename.endswith(".tokenized.txt"):
                file_path = os.path.join(directory_path, filename)
                with open(file_path, "r", encoding="utf-8") as f:
                    words = f.read().split()
                    word_counter.update(words)
         
        # 第二步:创建词汇表,给每个单词分配一个 ID
        self.vocab = {word: idx + 1 for idx, (word, _) in enumerate(word_counter.items())}
        self.vocab['<pad>'] = 0  # 为 padding 添加一个 ID
        self.vocab['<unk>'] = len(self.vocab)  # 为未知单词添加一个 ID
         
        # 创建逆词汇表
        self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
         
        # 第三步:将文本转换为 token ID
        for filename in os.listdir(directory_path):
            if filename.endswith(".tokenized.txt"):
                file_path = os.path.join(directory_path, filename)
                with open(file_path, "r", encoding="utf-8") as f:
                    words = f.read().split()
                    # 将每个单词转换为 token ID,如果不在词汇表中则使用 <unk>
                    token_ids = [self.vocab.get(word, self.vocab['<unk>']) for word in words]
                    self.data.append(token_ids)
         
        # 将数据转化为训练所需的序列形式
        self.data = [self.pad_sequence(seq) for seq in self.data]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_text = self.data[idx]
         
        # 编码输入文本
        input_ids = torch.tensor(input_text)  # 转换为 tensor
        target_ids = input_ids.clone()  # 使用输入作为目标
        return input_ids, target_ids

    def pad_sequence(self, seq):
        """填充序列到 seq_length"""
        if len(seq) < self.seq_length:
            # 使用 pad token 填充
            seq += [self.vocab['<pad>']] * (self.seq_length - len(seq))
        else:
            # 如果超出 seq_length,则截断
            seq = seq[:self.seq_length]
        return seq



    '''
    def __getitem__(self, idx):
        input_ids = torch.tensor(self.data[idx], dtype=torch.long)
         
        # 如果输入序列长度小于 seq_length,进行填充
        padding_length = self.seq_length - input_ids.size(0)
        if padding_length > 0:
            padding = torch.tensor([self.vocab['<pad>']] * padding_length, dtype=torch.long)
            input_ids = torch.cat([input_ids, padding], dim=0)
         
        # 设置 target_ids 为 input_ids 的下一个 token(即语言模型的训练目标)
        target_ids = input_ids[1:].clone()
        target_ids = torch.cat([target_ids, torch.tensor([self.vocab['<pad>']], dtype=torch.long)])

        return input_ids, target_ids
'''

2.gpt.py
实现 GPT 模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# gpt.py
import torch
import torch.nn as nn
import os
import sys
import torch.nn.functional as F

project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)

from model.transformer_block import TransformerBlock

class GPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, num_heads, hidden_dim, max_length):
        super(GPT, self).__init__()
        self.hidden_dim = hidden_dim  # 添加 hidden_dim 变量



class GPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, num_layers, max_length):
        super(GPT, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)
        self.blocks = nn.ModuleList([
            TransformerBlock(embed_size, num_heads, embed_size * 4)
            for _ in range(num_layers)
        ])
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, x):
        batch_size, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(batch_size, seq_length)
        x = self.embedding(x) + self.position_embedding(positions)
        for block in self.blocks:
            x = block(x)
        return self.fc_out(x)
     
    def generate(self, input_ids, max_length=100, temperature=1.0, top_k=50):
        self.eval()  # 设置为评估模式
         
        # 获取初始输出
        generated_ids = input_ids
        for _ in range(max_length):
            outputs = self(generated_ids)
            logits = outputs  # 假设模型的输出是 logits
            logits = logits[:, -1, :]  # 只关注最新生成的 token

            # 应用温度采样
            logits = logits / temperature
            
            # Top-K 采样
            if top_k > 0:
                top_k_values, top_k_indices = torch.topk(logits, top_k)
                top_k_probs = F.softmax(top_k_values, dim=-1)
                next_token = torch.multinomial(top_k_probs, 1)
                next_token = top_k_indices.gather(-1, next_token)
            else:
                # 默认采样
                probs = F.softmax(logits, dim=-1)
                next_token = torch.multinomial(probs, 1)

            # 添加生成的 token 到输入序列
            generated_ids = torch.cat([generated_ids, next_token], dim=-1)

        return generated_ids
   

3.transformer_block.py
用于实现 GPT 模型的一个基本组件——Transformer 块。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# transformer_block.py
import torch
import torch.nn as nn

class AttentionHead(nn.Module):
    def __init__(self, embed_size, head_size):
        super().__init__()
        self.q = nn.Linear(embed_size, head_size, bias=False)
        self.k = nn.Linear(embed_size, head_size, bias=False)
        self.v = nn.Linear(embed_size, head_size, bias=False)
        self.scale = head_size ** -0.5

    def forward(self, x):
        q = self.q(x)
        k = self.k(x)
        v = self.v(x)
        scores = torch.matmul(q, k.transpose(-2, -1)) * self.scale
        attention = torch.softmax(scores, dim=-1)
        return torch.matmul(attention, v)

class TransformerBlock(nn.Module):
    def __init__(self, embed_size, num_heads, feed_forward_size):
        super().__init__()
        self.attention = nn.MultiheadAttention(embed_dim=embed_size, num_heads=num_heads)
        self.ff = nn.Sequential(
            nn.Linear(embed_size, feed_forward_size),
            nn.ReLU(),
            nn.Linear(feed_forward_size, embed_size),
        )
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

    def forward(self, x):
        attention_out, _ = self.attention(x, x, x)
        x = self.norm1(x + attention_out)
        ff_out = self.ff(x)
        return self.norm2(x + ff_out)

4. generate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import torch
from model.gpt import GPT
from data.tokenizer import SimpleTokenizer

def generate_text(prompt, model, tokenizer, max_length=50):
    tokens = tokenizer.encode(prompt)
    input_data = torch.tensor([tokens])

    model.eval()
    with torch.no_grad():
        for _ in range(max_length):
            output = model(input_data)
            next_token = torch.argmax(output[:, -1, :], dim=-1)
            input_data = torch.cat([input_data, next_token.unsqueeze(0)], dim=1)
            if next_token.item() == tokenizer.word_to_idx["<END>"]:
                break

    return tokenizer.decode(input_data[0].tolist())

# Example
if __name__ == "__main__":
    vocab_size = 10000
    embed_size = 128
    num_heads = 8
    num_layers = 4
    max_length = 100

    # Load pretrained model
    model = GPT(vocab_size, embed_size, num_heads, num_layers, max_length)
    tokenizer = SimpleTokenizer(vocab_size)
    tokenizer.build_vocab("This is a small test corpus <END>")
     
    prompt = "This is a"
    print(generate_text(prompt, model, tokenizer))

5.train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# train/train.py
import sys
import os

project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)

import glob
import torch
from torch.utils.data import DataLoader
from data.dataset import TextDataset
from model.gpt import GPT
from torch.optim import AdamW
from torch.optim.lr_scheduler import StepLR
from transformers import AutoTokenizer
from transformers import GPT2LMHeadModel

# 在外部加载模型和分词器
model = GPT2LMHeadModel.from_pretrained('gpt2')

def train():
    # 加载分词器
    tokenizer = AutoTokenizer.from_pretrained('gpt2')
     
    # 设置设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
     
    # 配置数据集
    dataset = TextDataset(directory_path="data/tokenized", seq_length=128, tokenizer=tokenizer)
    data_loader = DataLoader(dataset, batch_size=4, shuffle=True)

    # 配置优化器
    optimizer = AdamW(model.parameters(), lr=1e-5)
    scheduler = StepLR(optimizer, step_size=5, gamma=0.5)  # 每5个epoch减少学习率

    # 训练模型
    model.train()
    for epoch in range(3):  # 假设训练3个epoch
        total_loss = 0
        for batch_idx, (input_ids, target_ids) in enumerate(data_loader):
            optimizer.zero_grad()

            input_ids = input_ids.to(device)
            target_ids = target_ids.to(device)

            # 前向传播
            outputs = model(input_ids, labels=target_ids)
            loss = outputs.loss
            total_loss += loss.item()

            # 反向传播
            loss.backward()
            # 训练过程中添加梯度裁剪
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

        # 输出每个 epoch 的损失
        print(f"Epoch {epoch+1}, Loss: {total_loss / len(data_loader)}")
         
        # 学习率调度
        scheduler.step()

    # 保存训练好的模型
    torch.save(model.state_dict(), "trained_model/model.pth")

    tokenizer.save_pretrained("trained_model")

if __name__ == "__main__":
    train()

6. inference.py
这是推理脚本,用于加载训练好的模型并进行推理(生成预测)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# inference.py
import torch
import sys
import os
import torch.nn.functional as F

project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)

from model.gpt import GPT
from transformers import AutoTokenizer

def load_model():
    # 加载模型
    model = GPT(vocab_size=50000, embed_size=256, num_layers=6, num_heads=8, max_length=512)
    model.load_state_dict(torch.load("trained_model/model.pth"), strict=False)
    model.eval()  # 设置为评估模式

    # 加载分词器
    tokenizer = AutoTokenizer.from_pretrained('gpt2')
     
    return model, tokenizer  # 返回模型和分词器

def chat():
    model, tokenizer = load_model()  # 加载模型和分词器

    while True:
        text = input("Input: ")  # 获取用户输入
        if text.lower() == "quit":
            break

        # 对输入文本进行编码
        input_ids = tokenizer.encode(text, return_tensors='pt')

        # 生成文本
        generated_ids = model.generate(input_ids, max_length=100, temperature=1.0, top_k=50)

        # 解码并打印生成的文本
        output_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
        print(f"GPT: {output_text}")


if __name__ == "__main__":
    chat()

然后运行顺序:
1
python312 train/train.py

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型
1
python312 inference.py

纯小白向从零开始手写大模型

纯小白向从零开始手写大模型
精确的模型需要深入的理论和庞大的数据训练。虽然很人机,并且一点也不智能,但是已经向未知的领域迈出了第一步不是吗?

温馨提示:
1.如果您喜欢这篇帖子,请给作者点赞评分,点赞会增加帖子的热度,评分会给作者加学币。(评分不会扣掉您的积分,系统每天都会重置您的评分额度)。
2.回复帖子不仅是对作者的认可,还可以获得学币奖励,请尊重他人的劳动成果,拒绝做伸手党!
3.发广告、灌水回复等违规行为一经发现直接禁言,如果本帖内容涉嫌违规,请点击论坛底部的举报反馈按钮,也可以在【投诉建议】板块发帖举报。
论坛交流群:672619046

小黑屋|手机版|站务邮箱|学逆向论坛 ( 粤ICP备2021023307号 )|网站地图

GMT+8, 2025-2-20 04:33 , Processed in 0.175478 second(s), 38 queries .

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表