4735 字
24 分钟
Learn LLM by Debugging

前言#

好久没更新博客了,最近几个月比较忙,不忙的时候也没有闲到写博客。感觉做了很多事情,停下来想想又觉得好像什么也没做。这一篇其实很早就想写了,灵感来自于 B 站视频*强烈推荐新人入门 LLM 的方法——逐行调试一个小模型*,但硬生生的被拖到了现在。

对于刚入门 LLM 的新人,你可能读了很多论文,学习了机器学习、深度学习、transformer 等理论知识,你也许能理解 LLM 的架构图,理解 transformer 的原理,但在面临代码时也许会迷茫,会手足无措。PyTorch 中无论是卷积层、全连接层,还是整个 Transformer 编码器,都是一个 Module,而 LLM 就只是将这些模块组合起来而已。很多人会说编写代码只是在搭积木,事实也确实如此,但对于新手来说还是不好理解。因为论文中的架构图是抽象的、静态的,而代码中的模块是具体的、动态的,论文中主要是讲一个故事,介绍它的方法,不会也不可能把架构图描述的细致。另外,传统的编程思想可能是“先执行 A,然后执行 B,然后执行 C”,而深度学习中的编程思想是,定义一个结构,输入数据,它会自己学。这种抽象,再加上张量形状的变换,可能会让新人在读/写代码时蒙圈。

网络上主流的学习路线是,带着你手把手写一个简单的 transformer,也就是构建。构建(Construction)的方式固然很好,你可以掌握清晰的代码结构与核心逻辑,但是通常费时又费力,还与实现的代码脱节。而这个 B 站视频提供的是一种解构(Destruction)的方式,通过调试分析去学习实际的代码。这让我想起了我一位老师曾经说过的系统能力的敏捷之路,通过逆向、调试、猜测、验证等手段,快速掌握系统的运行机制。他最常提起的一个事情就是,只改了一个字节,大改了 VC 编译器的机制。这种思想对我造成了不小的影响,很多情况下我不会去重复造一个轮子,而是改进现成品,除非那个现成品真的很烂。说回正题,这种解构的方式对于有了理论基础,想要编写代码的新人来说是降维打击。因为我们已经掌握了模型的宏观架构,接下来只需要跟踪数据去验证即可。

准备工作#

这些环境配置只是个人喜好,你可以根据自己的环境修改。为了方便起见,我们调试一个很小且经典的模型 GPT2,在 CPU 上也能跑。

  • 安装 uv:Installing uv

    Terminal window
    # Windows
    powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
    # Linux
    curl -LsSf https://astral.sh/uv/install.sh | sh
  • 创建虚拟环境:Using Python environments | Using a virtual environment

    Terminal window
    uv venv --python 3.12
  • 安装依赖:transformers/installation

    Terminal window
    uv pip install torch --index-url https://download.pytorch.org/whl/cpu
    uv pip install transformers
  • 使用编辑器 VS Code,代码(也可以使用 cursor,方便对着源代码提问 AI)

    import torch
    from transformers import GPT2Config, GPT2LMHeadModel
    # GPT2 model config
    mini_config = GPT2Config(
    vocab_size=100, # vocabulary size
    n_positions=64, # maximum sequence length
    n_embd=64, # embedding dimension (hidden size)
    n_layer=2, # number of transformer blocks
    n_head=4, # number of attention heads
    n_inner=128, # FFN inner dimension
    activation_function="gelu", # activation function
    resid_pdrop=0.1, # dropout rate for residual connections
    embd_pdrop=0.1, # dropout rate for embedding
    attn_pdrop=0.1, # dropout rate for attention
    )
    # load the model
    model = GPT2LMHeadModel(mini_config)
    print("Model Structure:")
    print(model)
    print("#Parameters:")
    print(sum(p.numel() for p in model.parameters()))
    # generate random input
    batch_size = 2
    seq_len = 16
    input_ids = torch.randint(0, 100, (batch_size, seq_len))
    print("Input Shape:", input_ids.shape)
    # forward
    with torch.no_grad():
    outputs = model(input_ids, labels=input_ids)
    print("Logits Shape:", outputs.logits.shape)
    print("Loss:", outputs.loss)
  • 调试器设置

    .vscode/launch.json
    {
    "version": "0.2.0",
    "configurations": [
    {
    "name": "Python Debugger: Current File",
    "type": "debugpy",
    "request": "launch",
    "program": "${file}",
    "console": "integratedTerminal",
    "justMyCode": false // 重要配置
    }
    ]
    }

调试与解构#

在代码中,定义了一个非常小的 GPT2 模型,并且随机初始化了模型的权重,因为不需要模型“有用”,我们只需要关注模型的结构和数据流动。在 outputs = model(input_ids, labels=input_ids) 处打个断点,启动调试器即可。接下来的操作就是调试了,没什么好说的,你可以对着 GPT2 的模型架构图,或者代码输出的模型结构来调试。想关注什么部分就深入什么部分,遇到不懂的代码还可以请高人(Gemini、Claude、GPT)。如果你打算自己去调试,那么看到这里就可以了,后面只是我整理的一些记录而已(有 AIGC 内容)。

GPT2LMHeadModel(
(transformer): GPT2Model(
(wte): Embedding(100, 64)
(wpe): Embedding(64, 64)
(drop): Dropout(p=0.1, inplace=False)
(h): ModuleList(
(0-1): 2 x GPT2Block(
(ln_1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
(attn): GPT2Attention(
(c_attn): Conv1D(nf=192, nx=64)
(c_proj): Conv1D(nf=64, nx=64)
(attn_dropout): Dropout(p=0.1, inplace=False)
(resid_dropout): Dropout(p=0.1, inplace=False)
)
(ln_2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
(mlp): GPT2MLP(
(c_fc): Conv1D(nf=128, nx=64)
(c_proj): Conv1D(nf=64, nx=128)
(act): GELUActivation()
(dropout): Dropout(p=0.1, inplace=False)
)
)
)
(ln_f): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
)
(lm_head): Linear(in_features=64, out_features=100, bias=False)
)

GPT2LMHeadModel​ 是 Transformers 里对 GPT‑2 做语言模型微调/生成的模型类。它是在基础的 GPT2Model 上,加了一层线性输出层(LM head),把最后的隐状态转换成对每个词表 token 的概率,用来做因果语言建模(Causal Language Modeling),比如续写、对话、代码生成等。

  • 输入:input_ids​(以及可选的 attention_mask 等)
  • 输出:logits​(形状 [batch_size, seq_len, vocab_size]),以及可选的 loss

嵌入层部分#

(transformer): GPT2Model(
(wte): Embedding(100, 64)
(wpe): Embedding(64, 64)
)
  • wte: Embedding(100, 64)

    • 作用:词嵌入(word/token embedding)
    • 每个 token id ∈ [0, 99] 映射为一个 64 维向量
    • 参数量:约 (100 × 64 = 6400)
  • wpe: Embedding(64, 64)

    • 作用:位置嵌入(positional embedding)
    • 假设序列长度上限为 64,每个位置 0~63 有一个 64 维向量
    • 参数量:约 (64 × 64 = 4096)
  • 输入阶段做的事(概念上):

    • 对于输入序列 input_ids​(形状 [batch, seq_len]):

      1. wte​ 得到 token 向量:[batch, seq_len, 64]
      2. wpe​ 得到 position 向量:[batch, seq_len, 64]
      3. 两者相加:hidden = wte(input_ids) + wpe(positions)

Dropout 与层堆叠#

(drop): Dropout(p=0.1)
(h): ModuleList(
(0-1): 2 x GPT2Block(...)
)
(ln_f): LayerNorm((64,), eps=1e-05)
  • drop: Dropout(p=0.1)

    • 在输入层做一次 dropout,防止过拟合
  • h: ModuleList(2 x GPT2Block)

    • 2 个 Transformer block,每个都是 GPT‑2 的标准结构:

      • 自注意力子层
      • 前馈网络(MLP)子层
      • 各自都有 LayerNorm 和残差连接
  • ln_f: LayerNorm(64)

    • 整个 Transformer 堆叠后的 最后一层 LayerNorm
    • 输出仍然是 [batch, seq_len, 64]

GPT2Model 代码#

# GPT2Model = TokenEmbedding + PositionEmbedding (+ TokenTypeEmbedding)
# → Dropout
# → num_layers × GPT2Block
# → Final LayerNorm
# → 输出 last_hidden_state(以及可选的 past_key_values / attentions / hidden_states)
class GPT2Model(GPT2PreTrainedModel):
_supports_param_buffer_assignment = False
def __init__(self, config):
super().__init__(config)
self.embed_dim = config.hidden_size # 隐藏层维度
self.wte = nn.Embedding(config.vocab_size, self.embed_dim) # 词嵌入层 Word Token Embedding
self.wpe = nn.Embedding(config.max_position_embeddings, self.embed_dim) # 位置嵌入层 Word Position Embedding
self.drop = nn.Dropout(config.embd_pdrop) # dropout 层
self.h = nn.ModuleList([GPT2Block(config, layer_idx=i) for i in range(config.num_hidden_layers)]) # Transformer 堆叠
self.ln_f = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_epsilon) # 最终输出前的 LayerNorm
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
encoder_hidden_states: Optional[torch.Tensor] = None,
encoder_attention_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
**kwargs,
) -> Union[tuple, BaseModelOutputWithPastAndCrossAttentions]:
r"""
input_ids (`torch.LongTensor` of shape `(batch_size, input_ids_length)`):
`input_ids_length` = `sequence_length` if `past_key_values` is `None` else
`past_key_values.get_seq_length()` (`sequence_length` of input past key value states). Indices of input
sequence tokens in the vocabulary.
If `past_key_values` is used, only `input_ids` that do not have their past calculated should be passed as
`input_ids`.
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
[`PreTrainedTokenizer.__call__`] for details.
[What are input IDs?](../glossary#input-ids)
"""
# 选项初始化
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# 检查输入
if input_ids is not None and inputs_embeds is not None:
raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
elif input_ids is not None:
self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask)
# 把多维输入压平,转换成标准的批次输入格式
# [batch, segments, seq_len]
# [batch, n_sentences, seq_len]
# [batch, n_variants, seq_len]
# 统一变成 [N, seq_len],Embedding 期望的输入
input_shape = input_ids.size()
input_ids = input_ids.view(-1, input_shape[-1])
batch_size = input_ids.shape[0]
elif inputs_embeds is not None:
input_shape = inputs_embeds.size()[:-1]
batch_size = inputs_embeds.shape[0]
else:
raise ValueError("You have to specify either input_ids or inputs_embeds")
device = input_ids.device if input_ids is not None else inputs_embeds.device
if token_type_ids is not None:
token_type_ids = token_type_ids.view(-1, input_shape[-1])
# 梯度检查点与缓存冲突
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning_once(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
# based on pattern from src/transformers/models/whisper/modeling_whisper.py::WhisperDecoder
# 初始化 kv cache
if use_cache:
if past_key_values is None:
past_key_values = DynamicCache(config=self.config)
if self.config.add_cross_attention and not isinstance(past_key_values, EncoderDecoderCache):
past_key_values = EncoderDecoderCache(past_key_values, DynamicCache(config=self.config))
# 词嵌入
if inputs_embeds is None:
inputs_embeds = self.wte(input_ids) # (N, seq_len) -> (N, seq_len, hidden_size)
# 生成绝对位置编码
if cache_position is None:
# 先前token数
past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0
cache_position = torch.arange(
past_seen_tokens,
past_seen_tokens + inputs_embeds.shape[1], # inputs_embeds.shape[1] 为序列长度
device=inputs_embeds.device
) # (seq_len), eg: tensor([ 0, 1, 2, 3, 4, 5])
if position_ids is None:
# 为什么要 unsqueeze(0) ?
# 因为模型需要:(batch_size, seq_len)
# 而 cache_position 是 (seq_len)
position_ids = cache_position.unsqueeze(0) # 增加维度 (1, seq_len)
# 位置嵌入
position_embeds = self.wpe(position_ids) # (1, seq_len, hidden_size)
# (N, seq_len, hidden_size) + (1, seq_len, hidden_size),广播
hidden_states = inputs_embeds + position_embeds.to(inputs_embeds.device)
# Attention mask.
# ._update_causal_mask() and ._prepare_4d_causal_attention_mask_with_cache_position() copied from LlamaModel
if attention_mask is not None and attention_mask.ndim < 4:
attention_mask = attention_mask.view(batch_size, -1)
# 构造最终的因果注意力 mask
causal_mask = create_causal_mask(
config=self.config,
input_embeds=inputs_embeds,
attention_mask=attention_mask,
cache_position=cache_position,
past_key_values=past_key_values,
position_ids=position_ids,
)
# 交叉注意力处理
# If a 2D or 3D attention mask is provided for the cross-attention
# we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length]
_use_sdpa = self._attn_implementation == "sdpa" and output_attentions is False and head_mask is None
if self.config.add_cross_attention and encoder_hidden_states is not None:
encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size()
encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length)
if encoder_attention_mask is None:
encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device)
if _use_sdpa:
encoder_attention_mask = _prepare_4d_attention_mask_for_sdpa(
mask=encoder_attention_mask, dtype=inputs_embeds.dtype, tgt_len=input_shape[-1]
)
elif self._attn_implementation != "flash_attention_2":
encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask)
else:
encoder_attention_mask = None
# 把传入的 head 层级屏蔽参数整理成统一形状,用于在 block 内屏蔽某些注意力头。
# Prepare head mask if needed
# 1.0 in head_mask indicate we keep the head
# attention_probs has shape bsz x n_heads x N x N
# head_mask has shape n_layer x batch x n_heads x N x N
head_mask = self.get_head_mask(head_mask, self.config.n_layer)
# token 类型嵌入
if token_type_ids is not None:
token_type_embeds = self.wte(token_type_ids) # (batch_size, seq_len) -> (batch_size, seq_len, hidden_size)
hidden_states = hidden_states + token_type_embeds
# dropout
hidden_states = self.drop(hidden_states)
# 输出形状记录,主要用于最后 view 把 batch/seq 形状恢复
output_shape = (-1,) + input_shape[1:] + (hidden_states.size(-1),) # (-1, sample_shape, hidden_size)
# 如果需要输出 attentions,则初始化为 (),否则 None。其他同理
all_self_attentions = () if output_attentions else None
all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None
all_hidden_states = () if output_hidden_states else None
# 模型主体:依次通过每一层 GPT2Block
for i, block in enumerate(self.h):
# Model parallel
if self.model_parallel:
torch.cuda.set_device(hidden_states.device)
if isinstance(head_mask, torch.Tensor):
head_mask = head_mask.to(hidden_states.device)
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,) # 收集中间隐状态
# 调用单个 block 的 forward
outputs = block(
hidden_states, # (N, seq_len, hidden_size)
past_key_values if not (self.gradient_checkpointing and self.training) else None,
cache_position,
causal_mask,
head_mask[i],
encoder_hidden_states, # as a positional argument for gradient checkpointing
encoder_attention_mask=encoder_attention_mask,
use_cache=use_cache,
output_attentions=output_attentions,
**kwargs,
)
hidden_states = outputs[0] # 取隐藏层状态
# 收集中间隐状态
if output_attentions:
all_self_attentions = all_self_attentions + (outputs[1],)
if self.config.add_cross_attention:
all_cross_attentions = all_cross_attentions + (outputs[2],)
# 若启用模型并行,根据 self.device_map 判断当前层是否是该设备上的最后一层,如果是,就把 hidden_states 迁移到下一个设备上。
if self.model_parallel:
for k, v in self.device_map.items():
if i == v[-1] and "cuda:" + str(k) != self.last_device:
hidden_states = hidden_states.to("cuda:" + str(k + 1))
# 最终输出前的 LayerNorm
hidden_states = self.ln_f(hidden_states)
# 恢复输出形状 (-1, sample_shape, hidden_size)
hidden_states = hidden_states.view(output_shape)
# 收集最后一层隐状态
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
# 缓存输出
past_key_values = past_key_values if use_cache else None
if not return_dict:
# [hidden_states, past_key_values, all_hidden_states, all_self_attentions, all_cross_attentions] 中非空的按顺序返回
return tuple(
v
for v in [hidden_states, past_key_values, all_hidden_states, all_self_attentions, all_cross_attentions]
if v is not None
)
return BaseModelOutputWithPastAndCrossAttentions(
last_hidden_state=hidden_states,
past_key_values=past_key_values,
hidden_states=all_hidden_states,
attentions=all_self_attentions,
cross_attentions=all_cross_attentions,
)

GPT2Block#

GPT2Block(
(ln_1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
(attn): GPT2Attention(
(c_attn): Conv1D(nf=192, nx=64)
(c_proj): Conv1D(nf=64, nx=64)
(attn_dropout): Dropout(p=0.1)
(resid_dropout): Dropout(p=0.1)
)
(ln_2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
(mlp): GPT2MLP(
(c_fc): Conv1D(nf=128, nx=64)
(c_proj): Conv1D(nf=64, nx=128)
(act): GELUActivation()
(dropout): Dropout(p=0.1)
)
)

前置 LayerNorm:ln_1#

  • GPT‑2 使用 Pre‑LN 结构

    • x_norm = ln_1(x)
    • 然后送进自注意力层

自注意力层:attn: GPT2Attention#

  • c_attn: Conv1D(nf=192, nx=64)

    • 这是 GPT‑2 里惯用的一种写法,本质是一个 线性层

      • 输入维度 nx = 64
      • 输出维度 nf = 192
    • 192 = 3 × 64,对应 Q、K、V 三个投影拼在一起:

      • hidden(64) → [Q(64), K(64), V(64)] 合并为 192 维,然后在代码里拆开
  • 多头注意力怎么来的?

    • 没在结构里直接显示 head 数,但一般是:

      • hidden_size = num_heads × head_dim
      • 比如 64 = 4 × 16 或 8 × 8(具体取决于 config)
    • 实现中会把 Q、K、V reshape 成 [batch, num_heads, seq_len, head_dim] 做多头注意力

  • c_proj: Conv1D(nf=64, nx=64)

    • 把多头注意力计算后的输出再映射回 64 维
    • 对应标准 Transformer 里的 W_o
  • attn_dropout/ resid_dropout

    • attn_dropout: 对注意力权重做 dropout
    • resid_dropout: 对注意力输出(加回残差前)做 dropout
  • 残差连接

    • 实际计算类似:

      • x = x + resid_dropout(attn(ln_1(x)))

第二个 LayerNorm:ln_2#

  • 对经过注意力 + 残差后的结果再做 LayerNorm,送入 MLP:

    • x_norm = ln_2(x)

前馈网络 MLP:GPT2MLP#

  • c_fc: Conv1D(nf=128, nx=64)

    • 线性层:64 → 128​,相当于隐藏层扩大到 2 倍
  • act: GELU

    • 激活函数,GPT‑2 使用 GELU
  • c_proj: Conv1D(nf=64, nx=128)

    • 再从 128 → 64
  • dropout

    • 对 MLP 输出做 dropout
  • 残差连接

    • 和注意力部分类似:

      • x = x + dropout(mlp(ln_2(x)))

总结:每个 GPT2Block = LN → Self-Attn → 残差 + LN → MLP → 残差,这是标准的 GPT‑2 Block,只是参数规模很小。

代码#

# GPT2Block = LN → Self-Attn → 残差 + LN → MLP → 残差
class GPT2Block(GradientCheckpointingLayer):
def __init__(self, config, layer_idx=None):
super().__init__()
hidden_size = config.hidden_size # 每个 token 的表示维度
inner_dim = config.n_inner if config.n_inner is not None else 4 * hidden_size # MLP 隐层维度
# 自注意力前的 LayerNorm(pre‑LN 架构)
self.ln_1 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)
# 自注意力层
self.attn = GPT2Attention(config=config, layer_idx=layer_idx)
# MLP 前的第二个 LayerNorm
self.ln_2 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)
# 可选交叉注意力
if config.add_cross_attention:
self.crossattention = GPT2Attention(config=config, is_cross_attention=True, layer_idx=layer_idx)
self.ln_cross_attn = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)
# 前馈 MLP
self.mlp = GPT2MLP(inner_dim, config)
@deprecate_kwarg("past_key_value", new_name="past_key_values", version="4.58")
def forward(
self,
hidden_states: Optional[tuple[torch.FloatTensor]], # (N, seq_len, hidden_size)
past_key_values: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
encoder_hidden_states: Optional[torch.Tensor] = None,
encoder_attention_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = False,
output_attentions: Optional[bool] = False,
**kwargs,
) -> Union[tuple[torch.Tensor], Optional[tuple[torch.Tensor, tuple[torch.FloatTensor, ...]]]]:
residual = hidden_states # 保存残差
hidden_states = self.ln_1(hidden_states) # Pre LayerNorm
attn_output, # (N, seq_len, hidden_size), 自注意力层对每个 token 重新“加权聚合”后的表示,传给后面层用的真正输出。
self_attn_weights # (N, num_heads, seq_len, seq_len),仅debug用
= self.attn( # 使用 KV 缓存、mask 等信息做自注意力。
hidden_states,
past_key_values=past_key_values,
cache_position=cache_position,
attention_mask=attention_mask,
head_mask=head_mask,
use_cache=use_cache,
output_attentions=output_attentions,
**kwargs,
)
hidden_states = attn_output + residual # 残差连接
# 可选的交叉注意力
if encoder_hidden_states is not None:
# add one self-attention block for cross-attention
if not hasattr(self, "crossattention"):
raise ValueError(
f"If `encoder_hidden_states` are passed, {self} has to be instantiated with "
"cross-attention layers by setting `config.add_cross_attention=True`"
)
residual = hidden_states
hidden_states = self.ln_cross_attn(hidden_states)
cross_attn_output, cross_attn_weights = self.crossattention(
hidden_states,
past_key_values=past_key_values,
attention_mask=attention_mask,
head_mask=head_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
output_attentions=output_attentions,
)
hidden_states = residual + cross_attn_output # 残差连接
# 前馈 MLP + 残差
residual = hidden_states # 保存残差
hidden_states = self.ln_2(hidden_states) # Pre LayerNorm
feed_forward_hidden_states = self.mlp(hidden_states) # 两层线性 + 激活(通常 GELU)
hidden_states = residual + feed_forward_hidden_states # 残差连接
# 组织输出
outputs = (hidden_states,) # 本层输出
if output_attentions:
outputs += (self_attn_weights,)
if encoder_hidden_states is not None:
outputs += (cross_attn_weights,)
return outputs

最终输出层 lm_head#

(lm_head): Linear(in_features=64, out_features=100, bias=False)
  • 输入:来自 transformer​ 的输出 [batch, seq_len, 64]

  • 线性映射:每个位置的 64 维向量 → 100 维 logits(对应 100 个 token)

  • 输出形状:[batch, seq_len, 100]

  • 通常在配置里会设置 权重共享

    • lm_head.weight​ 和 wte.weight 共享,减少参数

汇总#

流程:用 GPT‑2 Transformer 得到序列隐状态 →(可选)只保留最后若干个时间步 → 通过 lm_head​ 得到对词表的 logits →(可选)根据 labels​ 计算语言模型损失 → 按 return_dict 决定返回结构。

# GPT2LMHeadModel = GPT2Model → 取 hidden_states → Linear(lm_head) → logits(+ 可选 loss)
class GPT2LMHeadModel(GPT2PreTrainedModel, GenerationMixin):
_tied_weights_keys = ["lm_head.weight"]
def __init__(self, config):
super().__init__(config)
self.transformer = GPT2Model(config) # GPT2 模型部分
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) # 任务头
# Initialize weights and apply final processing
self.post_init()
def forward(self,...,**kwargs,) -> Union[tuple, CausalLMOutputWithCrossAttentions]:
r"""
input_ids (`torch.LongTensor` of shape `(batch_size, input_ids_length)`):
`input_ids_length` = `sequence_length` if `past_key_values` is `None` else
`past_key_values.get_seq_length()` (`sequence_length` of input past key value states). Indices of input
sequence tokens in the vocabulary.
If `past_key_values` is used, only `input_ids` that do not have their past calculated should be passed as
`input_ids`.
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
[`PreTrainedTokenizer.__call__`] for details.
[What are input IDs?](../glossary#input-ids)
labels (`torch.LongTensor` of shape `(batch_size, input_ids_length)`, *optional*):
Labels for language modeling. Note that the labels **are shifted** inside the model, i.e. you can set
`labels = input_ids` Indices are selected in `[-100, 0, ..., config.vocab_size]` All labels set to `-100`
are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]`
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
transformer_outputs = self.transformer( # GPT‑2 本体
input_ids, # (batch_size, input_ids_length)
past_key_values=past_key_values,
attention_mask=attention_mask,
cache_position=cache_position,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
hidden_states = transformer_outputs[0] # (batch_size, seq_len, hidden_size),获取隐藏层,使用[0]取,因为返回的可能不是dict
# 对隐藏层切片送入 lm_head
slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep
logits = self.lm_head(hidden_states[:, slice_indices, :]) # (batch_size, kept_seq_len, vocab_size)
loss = None # 表示默认情况下(没有标签)是不计算损失的
if labels is not None:
# 把 token 维度拉平
loss = self.loss_function(
logits,
labels,
vocab_size=self.config.vocab_size,
**kwargs,
)
if not return_dict:
output = (logits,) + transformer_outputs[1:]
return ((loss,) + output) if loss is not None else output
return CausalLMOutputWithCrossAttentions(
loss=loss,
logits=logits,
past_key_values=transformer_outputs.past_key_values,
hidden_states=transformer_outputs.hidden_states,
attentions=transformer_outputs.attentions,
cross_attentions=transformer_outputs.cross_attentions,
)
  • input_ids

    tensor([[51, 93, 69, 67, 67, 64, 14, 69, 28, 48, 95, 52, 0, 43, 75, 20],
    [38, 46, 94, 7, 13, 65, 12, 77, 1, 29, 93, 14, 71, 98, 64, 81]])

    (batch_size, seq_len),分别表示 (nth sample in batch, nth token in sequence)

  • transformer_outputs

    BaseModelOutputWithPastAndCrossAttentions(last_hidden_state=tensor([[[ 0.1692, 1.6322, 0.2560, ..., 0.3967, 0.6844, -0.0801],
    [-0.3589, 0.6584, 0.1836, ..., 0.9718, 0.5690, -0.7071],
    [ 0.6927, -0.4106, 0.5513, ..., -0.2038, 0.9638, 0.3804],
    ...,
    [-1.2075, -0.9752, -2.2019, ..., -2.1254, -0.2321, 0.9713],
    [ 0.9386, -0.2055, 0.5741, ..., -0.6105, 1.7194, -0.8643],
    [-1.1196, 1.3400, -0.4640, ..., 1.0994, 0.7589, -1.0194]],
    [[-0.1582, -0.3705, -1.2539, ..., 2.6838, -1.0564, 0.3948],
    [-0.7398, 0.4874, -0.1554, ..., 0.3360, -0.9706, -1.1178],
    [ 0.8644, -1.5481, 0.7424, ..., -0.0362, 0.1190, -0.2096],
    ...,
    [-2.2812, -1.0691, -1.9750, ..., -1.6648, 0.2447, -0.9316],
    [ 1.3596, -0.1944, 0.4538, ..., 1.5388, 1.9213, -1.1869],
    [-0.9911, 2.3999, -0.3334, ..., -0.5131, 0.7391, -0.2217]]]), past_key_values=DynamicCache(layers=[DynamicLayer, DynamicLayer]), hidden_states=None, attentions=None, cross_attentions=None)
    • BaseModelOutputWithPastAndCrossAttentions 类型,本质上就是一个带字段名的 tuple,包含:

      • last_hidden_state​:最后一层的隐状态 (batch_size, seq_len, hidden_size)​,分别表示 (nth sample in batch, nth token in sequence, token hidden state)。(维度 = config.n_embd
      • past_key_values​:KV Cache,缓存已经算过的 token 的 K/V 张量。下次只要输入新 token,再带上 past_key_values,模型就不用重算前面所有 token 的 attention,速度会快很多。
      • hidden_states​:每一层的隐状态(默认 None),会是 tuple(len = num_layers + 1),包含 embedding 层 + 每一层的隐状态;
      • attentions:每一层 self-attention 的权重(None)
      • cross_attentions:有 encoder 时,会是每一层 cross-attention 的权重(None)
  • hidden_states

    就是从 transformer_outputs​ 取出的,(batch_size, seq_len, hidden_size)

  • slice_indices

    slice(-logits_to_keep, None) # 等价于 [-logits_to_keep:] 切片
    slice(0, None, None) # slice[T_start, T_stop, T_step]

    决定在时间维(序列长度那一维)上,保留哪些位置的 hidden_states 来算 logits。

    • logits_to_keep = 0​ → slice(0, None) → 保留所有时间步;
    • logits_to_keep = 5​ → slice(-5, None) → 保留最后 5 个时间步。
  • logits

    torch.Size([2, 16, 100])
    tensor([[[ 0.0545, -0.1060, -0.0190, ..., 0.0495, -0.2154, 0.0839],
    [-0.0738, -0.1690, 0.1306, ..., 0.1128, 0.1745, -0.0701],
    [-0.1737, -0.2598, 0.0013, ..., 0.0105, 0.2533, 0.0665],
    ...,
    [ 0.0656, 0.0591, -0.0530, ..., -0.3790, 0.0711, 0.1401],
    [ 0.0471, 0.0646, 0.1361, ..., -0.0941, 0.0735, 0.0579],
    [ 0.2808, 0.3964, -0.1211, ..., -0.0119, 0.2546, 0.2442]],
    [[ 0.0470, -0.0757, -0.0323, ..., 0.2119, -0.1910, 0.0868],
    [ 0.2607, 0.1443, -0.1541, ..., 0.2334, 0.1907, 0.2107],
    [ 0.0506, 0.0645, 0.0126, ..., -0.0995, 0.3498, 0.3652],
    ...,
    [ 0.1784, -0.0449, 0.1409, ..., -0.1468, 0.6051, 0.0009],
    [-0.1163, 0.1026, 0.0557, ..., -0.0361, 0.0837, 0.2053],
    [-0.0091, 0.1724, 0.0056, ..., -0.0435, 0.2960, 0.1318]]])

    表示每个被保留的时间步,对词表中每个 token 的预测分数。(batch_size, kept_seq_len, vocab_size),分别表示 (nth sample in batch, nth token in kept seq, logit for each vocabulary)

  • loss

    默认 loss_type=ForCausalLMLoss

    def ForCausalLMLoss(
    logits, # (batch_size, seq_len, vocab_size)
    labels, # (batch_size, seq_len)
    vocab_size: int, # 词表大小
    num_items_in_batch: Optional[torch.Tensor] = None,
    ignore_index: int = -100, # 忽略标签的值,不计算loss
    shift_labels: Optional[torch.Tensor] = None, # 计算好的右移后的标签
    **kwargs,
    ) -> torch.Tensor:
    # 把 logits 转成 float32,避免用 float16 直接算交叉熵时的数值精度问题。
    logits = logits.float()
    # 标签右移,让第 0 个位置去预测第 1 个 token,第 1 个位置去预测第 2 个 token,…
    if shift_labels is None:
    # Shift so that tokens < n predict n
    labels = nn.functional.pad(labels, (0, 1), value=ignore_index) # 在最后一维右侧补一个位置,值是 ignore_index。
    shift_labels = labels[..., 1:].contiguous() # 第二维度右移一位,contiguous 保持内存连续
    # 拉平 token 维度
    logits = logits.view(-1, vocab_size) # (batch * seq_len, vocab_size)
    shift_labels = shift_labels.view(-1) # (batch * seq_len)
    shift_labels = shift_labels.to(logits.device)
    loss = fixed_cross_entropy(logits, shift_labels, num_items_in_batch, ignore_index, **kwargs)
    # source: [N, C] target: [N]
    return loss # tensor(float)
Learn LLM by Debugging
https://kasuha.com/posts/learn-llm-by-debugging/
作者
霞葉
发布于
2026-01-16
许可协议
CC BY-NC-SA 4.0
评论加载中...