前言
好久没更新博客了,最近几个月比较忙,不忙的时候也没有闲到写博客。感觉做了很多事情,停下来想想又觉得好像什么也没做。这一篇其实很早就想写了,灵感来自于 B 站视频*强烈推荐新人入门 LLM 的方法——逐行调试一个小模型*,但硬生生的被拖到了现在。
对于刚入门 LLM 的新人,你可能读了很多论文,学习了机器学习、深度学习、transformer 等理论知识,你也许能理解 LLM 的架构图,理解 transformer 的原理,但在面临代码时也许会迷茫,会手足无措。PyTorch 中无论是卷积层、全连接层,还是整个 Transformer 编码器,都是一个 Module,而 LLM 就只是将这些模块组合起来而已。很多人会说编写代码只是在搭积木,事实也确实如此,但对于新手来说还是不好理解。因为论文中的架构图是抽象的、静态的,而代码中的模块是具体的、动态的,论文中主要是讲一个故事,介绍它的方法,不会也不可能把架构图描述的细致。另外,传统的编程思想可能是“先执行 A,然后执行 B,然后执行 C”,而深度学习中的编程思想是,定义一个结构,输入数据,它会自己学。这种抽象,再加上张量形状的变换,可能会让新人在读/写代码时蒙圈。
网络上主流的学习路线是,带着你手把手写一个简单的 transformer,也就是构建。构建(Construction)的方式固然很好,你可以掌握清晰的代码结构与核心逻辑,但是通常费时又费力,还与实现的代码脱节。而这个 B 站视频提供的是一种解构(Destruction)的方式,通过调试分析去学习实际的代码。这让我想起了我一位老师曾经说过的系统能力的敏捷之路,通过逆向、调试、猜测、验证等手段,快速掌握系统的运行机制。他最常提起的一个事情就是,只改了一个字节,大改了 VC 编译器的机制。这种思想对我造成了不小的影响,很多情况下我不会去重复造一个轮子,而是改进现成品,除非那个现成品真的很烂。说回正题,这种解构的方式对于有了理论基础,想要编写代码的新人来说是降维打击。因为我们已经掌握了模型的宏观架构,接下来只需要跟踪数据去验证即可。
准备工作
这些环境配置只是个人喜好,你可以根据自己的环境修改。为了方便起见,我们调试一个很小且经典的模型 GPT2,在 CPU 上也能跑。
-
安装 uv:Installing uv
Terminal window # Windowspowershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"# Linuxcurl -LsSf https://astral.sh/uv/install.sh | sh -
创建虚拟环境:Using Python environments | Using a virtual environment
Terminal window uv venv --python 3.12 -
安装依赖:transformers/installation
Terminal window uv pip install torch --index-url https://download.pytorch.org/whl/cpuuv pip install transformers -
使用编辑器 VS Code,代码(也可以使用 cursor,方便对着源代码提问 AI)
import torchfrom transformers import GPT2Config, GPT2LMHeadModel# GPT2 model configmini_config = GPT2Config(vocab_size=100, # vocabulary sizen_positions=64, # maximum sequence lengthn_embd=64, # embedding dimension (hidden size)n_layer=2, # number of transformer blocksn_head=4, # number of attention headsn_inner=128, # FFN inner dimensionactivation_function="gelu", # activation functionresid_pdrop=0.1, # dropout rate for residual connectionsembd_pdrop=0.1, # dropout rate for embeddingattn_pdrop=0.1, # dropout rate for attention)# load the modelmodel = GPT2LMHeadModel(mini_config)print("Model Structure:")print(model)print("#Parameters:")print(sum(p.numel() for p in model.parameters()))# generate random inputbatch_size = 2seq_len = 16input_ids = torch.randint(0, 100, (batch_size, seq_len))print("Input Shape:", input_ids.shape)# forwardwith torch.no_grad():outputs = model(input_ids, labels=input_ids)print("Logits Shape:", outputs.logits.shape)print("Loss:", outputs.loss) -
调试器设置
.vscode/launch.json {"version": "0.2.0","configurations": [{"name": "Python Debugger: Current File","type": "debugpy","request": "launch","program": "${file}","console": "integratedTerminal","justMyCode": false // 重要配置}]}
调试与解构
在代码中,定义了一个非常小的 GPT2 模型,并且随机初始化了模型的权重,因为不需要模型“有用”,我们只需要关注模型的结构和数据流动。在 outputs = model(input_ids, labels=input_ids) 处打个断点,启动调试器即可。接下来的操作就是调试了,没什么好说的,你可以对着 GPT2 的模型架构图,或者代码输出的模型结构来调试。想关注什么部分就深入什么部分,遇到不懂的代码还可以请高人(Gemini、Claude、GPT)。如果你打算自己去调试,那么看到这里就可以了,后面只是我整理的一些记录而已(有 AIGC 内容)。
GPT2LMHeadModel( (transformer): GPT2Model( (wte): Embedding(100, 64) (wpe): Embedding(64, 64) (drop): Dropout(p=0.1, inplace=False) (h): ModuleList( (0-1): 2 x GPT2Block( (ln_1): LayerNorm((64,), eps=1e-05, elementwise_affine=True) (attn): GPT2Attention( (c_attn): Conv1D(nf=192, nx=64) (c_proj): Conv1D(nf=64, nx=64) (attn_dropout): Dropout(p=0.1, inplace=False) (resid_dropout): Dropout(p=0.1, inplace=False) ) (ln_2): LayerNorm((64,), eps=1e-05, elementwise_affine=True) (mlp): GPT2MLP( (c_fc): Conv1D(nf=128, nx=64) (c_proj): Conv1D(nf=64, nx=128) (act): GELUActivation() (dropout): Dropout(p=0.1, inplace=False) ) ) ) (ln_f): LayerNorm((64,), eps=1e-05, elementwise_affine=True) ) (lm_head): Linear(in_features=64, out_features=100, bias=False))GPT2LMHeadModel 是 Transformers 里对 GPT‑2 做语言模型微调/生成的模型类。它是在基础的 GPT2Model 上,加了一层线性输出层(LM head),把最后的隐状态转换成对每个词表 token 的概率,用来做因果语言建模(Causal Language Modeling),比如续写、对话、代码生成等。
- 输入:
input_ids(以及可选的attention_mask等) - 输出:
logits(形状[batch_size, seq_len, vocab_size]),以及可选的 loss
嵌入层部分
(transformer): GPT2Model( (wte): Embedding(100, 64) (wpe): Embedding(64, 64))-
wte: Embedding(100, 64)- 作用:词嵌入(word/token embedding)
- 每个 token id ∈ [0, 99] 映射为一个 64 维向量
- 参数量:约 (100 × 64 = 6400)
-
wpe: Embedding(64, 64)- 作用:位置嵌入(positional embedding)
- 假设序列长度上限为 64,每个位置 0~63 有一个 64 维向量
- 参数量:约 (64 × 64 = 4096)
-
输入阶段做的事(概念上):
-
对于输入序列
input_ids(形状[batch, seq_len]):- 查
wte 得到 token 向量:[batch, seq_len, 64] - 查
wpe 得到 position 向量:[batch, seq_len, 64] - 两者相加:
hidden = wte(input_ids) + wpe(positions)
- 查
-
Dropout 与层堆叠
(drop): Dropout(p=0.1)(h): ModuleList( (0-1): 2 x GPT2Block(...))(ln_f): LayerNorm((64,), eps=1e-05)-
drop: Dropout(p=0.1)- 在输入层做一次 dropout,防止过拟合
-
h: ModuleList(2 x GPT2Block)-
有 2 个 Transformer block,每个都是 GPT‑2 的标准结构:
- 自注意力子层
- 前馈网络(MLP)子层
- 各自都有 LayerNorm 和残差连接
-
-
ln_f: LayerNorm(64)- 整个 Transformer 堆叠后的 最后一层 LayerNorm
- 输出仍然是
[batch, seq_len, 64]
GPT2Model 代码
# GPT2Model = TokenEmbedding + PositionEmbedding (+ TokenTypeEmbedding)# → Dropout# → num_layers × GPT2Block# → Final LayerNorm# → 输出 last_hidden_state(以及可选的 past_key_values / attentions / hidden_states)class GPT2Model(GPT2PreTrainedModel): _supports_param_buffer_assignment = False
def __init__(self, config): super().__init__(config) self.embed_dim = config.hidden_size # 隐藏层维度 self.wte = nn.Embedding(config.vocab_size, self.embed_dim) # 词嵌入层 Word Token Embedding self.wpe = nn.Embedding(config.max_position_embeddings, self.embed_dim) # 位置嵌入层 Word Position Embedding
self.drop = nn.Dropout(config.embd_pdrop) # dropout 层 self.h = nn.ModuleList([GPT2Block(config, layer_idx=i) for i in range(config.num_hidden_layers)]) # Transformer 堆叠 self.ln_f = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_epsilon) # 最终输出前的 LayerNorm # Initialize weights and apply final processing self.post_init()
def forward( self, input_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.FloatTensor] = None, token_type_ids: Optional[torch.LongTensor] = None, position_ids: Optional[torch.LongTensor] = None, head_mask: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, **kwargs, ) -> Union[tuple, BaseModelOutputWithPastAndCrossAttentions]: r""" input_ids (`torch.LongTensor` of shape `(batch_size, input_ids_length)`): `input_ids_length` = `sequence_length` if `past_key_values` is `None` else `past_key_values.get_seq_length()` (`sequence_length` of input past key value states). Indices of input sequence tokens in the vocabulary.
If `past_key_values` is used, only `input_ids` that do not have their past calculated should be passed as `input_ids`.
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and [`PreTrainedTokenizer.__call__`] for details.
[What are input IDs?](../glossary#input-ids) """ # 选项初始化 output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) use_cache = use_cache if use_cache is not None else self.config.use_cache return_dict = return_dict if return_dict is not None else self.config.use_return_dict # 检查输入 if input_ids is not None and inputs_embeds is not None: raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") elif input_ids is not None: self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask) # 把多维输入压平,转换成标准的批次输入格式 # [batch, segments, seq_len] # [batch, n_sentences, seq_len] # [batch, n_variants, seq_len] # 统一变成 [N, seq_len],Embedding 期望的输入 input_shape = input_ids.size() input_ids = input_ids.view(-1, input_shape[-1]) batch_size = input_ids.shape[0] elif inputs_embeds is not None: input_shape = inputs_embeds.size()[:-1] batch_size = inputs_embeds.shape[0] else: raise ValueError("You have to specify either input_ids or inputs_embeds")
device = input_ids.device if input_ids is not None else inputs_embeds.device
if token_type_ids is not None: token_type_ids = token_type_ids.view(-1, input_shape[-1]) # 梯度检查点与缓存冲突 if self.gradient_checkpointing and self.training: if use_cache: logger.warning_once( "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." ) use_cache = False
# based on pattern from src/transformers/models/whisper/modeling_whisper.py::WhisperDecoder # 初始化 kv cache if use_cache: if past_key_values is None: past_key_values = DynamicCache(config=self.config) if self.config.add_cross_attention and not isinstance(past_key_values, EncoderDecoderCache): past_key_values = EncoderDecoderCache(past_key_values, DynamicCache(config=self.config)) # 词嵌入 if inputs_embeds is None: inputs_embeds = self.wte(input_ids) # (N, seq_len) -> (N, seq_len, hidden_size) # 生成绝对位置编码 if cache_position is None: # 先前token数 past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 cache_position = torch.arange( past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], # inputs_embeds.shape[1] 为序列长度 device=inputs_embeds.device ) # (seq_len), eg: tensor([ 0, 1, 2, 3, 4, 5]) if position_ids is None: # 为什么要 unsqueeze(0) ? # 因为模型需要:(batch_size, seq_len) # 而 cache_position 是 (seq_len) position_ids = cache_position.unsqueeze(0) # 增加维度 (1, seq_len) # 位置嵌入 position_embeds = self.wpe(position_ids) # (1, seq_len, hidden_size) # (N, seq_len, hidden_size) + (1, seq_len, hidden_size),广播 hidden_states = inputs_embeds + position_embeds.to(inputs_embeds.device)
# Attention mask. # ._update_causal_mask() and ._prepare_4d_causal_attention_mask_with_cache_position() copied from LlamaModel if attention_mask is not None and attention_mask.ndim < 4: attention_mask = attention_mask.view(batch_size, -1) # 构造最终的因果注意力 mask causal_mask = create_causal_mask( config=self.config, input_embeds=inputs_embeds, attention_mask=attention_mask, cache_position=cache_position, past_key_values=past_key_values, position_ids=position_ids, ) # 交叉注意力处理 # If a 2D or 3D attention mask is provided for the cross-attention # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length] _use_sdpa = self._attn_implementation == "sdpa" and output_attentions is False and head_mask is None if self.config.add_cross_attention and encoder_hidden_states is not None: encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) if encoder_attention_mask is None: encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device) if _use_sdpa: encoder_attention_mask = _prepare_4d_attention_mask_for_sdpa( mask=encoder_attention_mask, dtype=inputs_embeds.dtype, tgt_len=input_shape[-1] ) elif self._attn_implementation != "flash_attention_2": encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask) else: encoder_attention_mask = None # 把传入的 head 层级屏蔽参数整理成统一形状,用于在 block 内屏蔽某些注意力头。 # Prepare head mask if needed # 1.0 in head_mask indicate we keep the head # attention_probs has shape bsz x n_heads x N x N # head_mask has shape n_layer x batch x n_heads x N x N head_mask = self.get_head_mask(head_mask, self.config.n_layer) # token 类型嵌入 if token_type_ids is not None: token_type_embeds = self.wte(token_type_ids) # (batch_size, seq_len) -> (batch_size, seq_len, hidden_size) hidden_states = hidden_states + token_type_embeds # dropout hidden_states = self.drop(hidden_states) # 输出形状记录,主要用于最后 view 把 batch/seq 形状恢复 output_shape = (-1,) + input_shape[1:] + (hidden_states.size(-1),) # (-1, sample_shape, hidden_size) # 如果需要输出 attentions,则初始化为 (),否则 None。其他同理 all_self_attentions = () if output_attentions else None all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None all_hidden_states = () if output_hidden_states else None # 模型主体:依次通过每一层 GPT2Block for i, block in enumerate(self.h): # Model parallel if self.model_parallel: torch.cuda.set_device(hidden_states.device) if isinstance(head_mask, torch.Tensor): head_mask = head_mask.to(hidden_states.device) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) # 收集中间隐状态 # 调用单个 block 的 forward outputs = block( hidden_states, # (N, seq_len, hidden_size) past_key_values if not (self.gradient_checkpointing and self.training) else None, cache_position, causal_mask, head_mask[i], encoder_hidden_states, # as a positional argument for gradient checkpointing encoder_attention_mask=encoder_attention_mask, use_cache=use_cache, output_attentions=output_attentions, **kwargs, ) hidden_states = outputs[0] # 取隐藏层状态 # 收集中间隐状态 if output_attentions: all_self_attentions = all_self_attentions + (outputs[1],) if self.config.add_cross_attention: all_cross_attentions = all_cross_attentions + (outputs[2],) # 若启用模型并行,根据 self.device_map 判断当前层是否是该设备上的最后一层,如果是,就把 hidden_states 迁移到下一个设备上。 if self.model_parallel: for k, v in self.device_map.items(): if i == v[-1] and "cuda:" + str(k) != self.last_device: hidden_states = hidden_states.to("cuda:" + str(k + 1)) # 最终输出前的 LayerNorm hidden_states = self.ln_f(hidden_states) # 恢复输出形状 (-1, sample_shape, hidden_size) hidden_states = hidden_states.view(output_shape) # 收集最后一层隐状态 if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) # 缓存输出 past_key_values = past_key_values if use_cache else None if not return_dict: # [hidden_states, past_key_values, all_hidden_states, all_self_attentions, all_cross_attentions] 中非空的按顺序返回 return tuple( v for v in [hidden_states, past_key_values, all_hidden_states, all_self_attentions, all_cross_attentions] if v is not None )
return BaseModelOutputWithPastAndCrossAttentions( last_hidden_state=hidden_states, past_key_values=past_key_values, hidden_states=all_hidden_states, attentions=all_self_attentions, cross_attentions=all_cross_attentions, )GPT2Block
GPT2Block( (ln_1): LayerNorm((64,), eps=1e-05, elementwise_affine=True) (attn): GPT2Attention( (c_attn): Conv1D(nf=192, nx=64) (c_proj): Conv1D(nf=64, nx=64) (attn_dropout): Dropout(p=0.1) (resid_dropout): Dropout(p=0.1) ) (ln_2): LayerNorm((64,), eps=1e-05, elementwise_affine=True) (mlp): GPT2MLP( (c_fc): Conv1D(nf=128, nx=64) (c_proj): Conv1D(nf=64, nx=128) (act): GELUActivation() (dropout): Dropout(p=0.1) ))前置 LayerNorm:ln_1
-
GPT‑2 使用 Pre‑LN 结构:
x_norm = ln_1(x)- 然后送进自注意力层
自注意力层:attn: GPT2Attention
-
c_attn: Conv1D(nf=192, nx=64)-
这是 GPT‑2 里惯用的一种写法,本质是一个 线性层:
- 输入维度
nx = 64 - 输出维度
nf = 192
- 输入维度
-
192 = 3 × 64,对应 Q、K、V 三个投影拼在一起:
hidden(64) → [Q(64), K(64), V(64)]合并为 192 维,然后在代码里拆开
-
-
多头注意力怎么来的?
-
没在结构里直接显示 head 数,但一般是:
hidden_size = num_heads × head_dim- 比如 64 = 4 × 16 或 8 × 8(具体取决于 config)
-
实现中会把 Q、K、V reshape 成
[batch, num_heads, seq_len, head_dim]做多头注意力
-
-
c_proj: Conv1D(nf=64, nx=64)- 把多头注意力计算后的输出再映射回 64 维
- 对应标准 Transformer 里的
W_o
-
attn_dropout / resid_dropoutattn_dropout: 对注意力权重做 dropoutresid_dropout: 对注意力输出(加回残差前)做 dropout
-
残差连接
-
实际计算类似:
x = x + resid_dropout(attn(ln_1(x)))
-
第二个 LayerNorm:ln_2
-
对经过注意力 + 残差后的结果再做 LayerNorm,送入 MLP:
x_norm = ln_2(x)
前馈网络 MLP:GPT2MLP
-
c_fc: Conv1D(nf=128, nx=64)- 线性层:
64 → 128,相当于隐藏层扩大到 2 倍
- 线性层:
-
act: GELU- 激活函数,GPT‑2 使用 GELU
-
c_proj: Conv1D(nf=64, nx=128)- 再从
128 → 64
- 再从
-
dropout- 对 MLP 输出做 dropout
-
残差连接
-
和注意力部分类似:
x = x + dropout(mlp(ln_2(x)))
-
总结:每个 GPT2Block = LN → Self-Attn → 残差 + LN → MLP → 残差,这是标准的 GPT‑2 Block,只是参数规模很小。
代码
# GPT2Block = LN → Self-Attn → 残差 + LN → MLP → 残差class GPT2Block(GradientCheckpointingLayer): def __init__(self, config, layer_idx=None): super().__init__() hidden_size = config.hidden_size # 每个 token 的表示维度 inner_dim = config.n_inner if config.n_inner is not None else 4 * hidden_size # MLP 隐层维度 # 自注意力前的 LayerNorm(pre‑LN 架构) self.ln_1 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon) # 自注意力层 self.attn = GPT2Attention(config=config, layer_idx=layer_idx) # MLP 前的第二个 LayerNorm self.ln_2 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon) # 可选交叉注意力 if config.add_cross_attention: self.crossattention = GPT2Attention(config=config, is_cross_attention=True, layer_idx=layer_idx) self.ln_cross_attn = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon) # 前馈 MLP self.mlp = GPT2MLP(inner_dim, config)
@deprecate_kwarg("past_key_value", new_name="past_key_values", version="4.58") def forward( self, hidden_states: Optional[tuple[torch.FloatTensor]], # (N, seq_len, hidden_size) past_key_values: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.FloatTensor] = None, head_mask: Optional[torch.FloatTensor] = None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = False, output_attentions: Optional[bool] = False, **kwargs, ) -> Union[tuple[torch.Tensor], Optional[tuple[torch.Tensor, tuple[torch.FloatTensor, ...]]]]: residual = hidden_states # 保存残差 hidden_states = self.ln_1(hidden_states) # Pre LayerNorm attn_output, # (N, seq_len, hidden_size), 自注意力层对每个 token 重新“加权聚合”后的表示,传给后面层用的真正输出。 self_attn_weights # (N, num_heads, seq_len, seq_len),仅debug用 = self.attn( # 使用 KV 缓存、mask 等信息做自注意力。 hidden_states, past_key_values=past_key_values, cache_position=cache_position, attention_mask=attention_mask, head_mask=head_mask, use_cache=use_cache, output_attentions=output_attentions, **kwargs, ) hidden_states = attn_output + residual # 残差连接 # 可选的交叉注意力 if encoder_hidden_states is not None: # add one self-attention block for cross-attention if not hasattr(self, "crossattention"): raise ValueError( f"If `encoder_hidden_states` are passed, {self} has to be instantiated with " "cross-attention layers by setting `config.add_cross_attention=True`" ) residual = hidden_states hidden_states = self.ln_cross_attn(hidden_states) cross_attn_output, cross_attn_weights = self.crossattention( hidden_states, past_key_values=past_key_values, attention_mask=attention_mask, head_mask=head_mask, encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_attention_mask, output_attentions=output_attentions, ) hidden_states = residual + cross_attn_output # 残差连接 # 前馈 MLP + 残差 residual = hidden_states # 保存残差 hidden_states = self.ln_2(hidden_states) # Pre LayerNorm feed_forward_hidden_states = self.mlp(hidden_states) # 两层线性 + 激活(通常 GELU) hidden_states = residual + feed_forward_hidden_states # 残差连接 # 组织输出 outputs = (hidden_states,) # 本层输出 if output_attentions: outputs += (self_attn_weights,) if encoder_hidden_states is not None: outputs += (cross_attn_weights,)
return outputs最终输出层 lm_head
(lm_head): Linear(in_features=64, out_features=100, bias=False)-
输入:来自
transformer 的输出[batch, seq_len, 64] -
线性映射:每个位置的 64 维向量 → 100 维 logits(对应 100 个 token)
-
输出形状:
[batch, seq_len, 100] -
通常在配置里会设置 权重共享:
lm_head.weight 和wte.weight共享,减少参数
汇总
流程:用 GPT‑2 Transformer 得到序列隐状态 →(可选)只保留最后若干个时间步 → 通过 lm_head 得到对词表的 logits →(可选)根据 labels 计算语言模型损失 → 按 return_dict 决定返回结构。
# GPT2LMHeadModel = GPT2Model → 取 hidden_states → Linear(lm_head) → logits(+ 可选 loss)class GPT2LMHeadModel(GPT2PreTrainedModel, GenerationMixin): _tied_weights_keys = ["lm_head.weight"]
def __init__(self, config): super().__init__(config) self.transformer = GPT2Model(config) # GPT2 模型部分 self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) # 任务头
# Initialize weights and apply final processing self.post_init()
def forward(self,...,**kwargs,) -> Union[tuple, CausalLMOutputWithCrossAttentions]: r""" input_ids (`torch.LongTensor` of shape `(batch_size, input_ids_length)`): `input_ids_length` = `sequence_length` if `past_key_values` is `None` else `past_key_values.get_seq_length()` (`sequence_length` of input past key value states). Indices of input sequence tokens in the vocabulary.
If `past_key_values` is used, only `input_ids` that do not have their past calculated should be passed as `input_ids`.
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and [`PreTrainedTokenizer.__call__`] for details.
[What are input IDs?](../glossary#input-ids) labels (`torch.LongTensor` of shape `(batch_size, input_ids_length)`, *optional*): Labels for language modeling. Note that the labels **are shifted** inside the model, i.e. you can set `labels = input_ids` Indices are selected in `[-100, 0, ..., config.vocab_size]` All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict
transformer_outputs = self.transformer( # GPT‑2 本体 input_ids, # (batch_size, input_ids_length) past_key_values=past_key_values, attention_mask=attention_mask, cache_position=cache_position, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_attention_mask, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = transformer_outputs[0] # (batch_size, seq_len, hidden_size),获取隐藏层,使用[0]取,因为返回的可能不是dict # 对隐藏层切片送入 lm_head slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep logits = self.lm_head(hidden_states[:, slice_indices, :]) # (batch_size, kept_seq_len, vocab_size)
loss = None # 表示默认情况下(没有标签)是不计算损失的 if labels is not None: # 把 token 维度拉平 loss = self.loss_function( logits, labels, vocab_size=self.config.vocab_size, **kwargs, )
if not return_dict: output = (logits,) + transformer_outputs[1:] return ((loss,) + output) if loss is not None else output
return CausalLMOutputWithCrossAttentions( loss=loss, logits=logits, past_key_values=transformer_outputs.past_key_values, hidden_states=transformer_outputs.hidden_states, attentions=transformer_outputs.attentions, cross_attentions=transformer_outputs.cross_attentions, )-
input_idstensor([[51, 93, 69, 67, 67, 64, 14, 69, 28, 48, 95, 52, 0, 43, 75, 20],[38, 46, 94, 7, 13, 65, 12, 77, 1, 29, 93, 14, 71, 98, 64, 81]])(batch_size, seq_len),分别表示 (nth sample in batch, nth token in sequence) -
transformer_outputsBaseModelOutputWithPastAndCrossAttentions(last_hidden_state=tensor([[[ 0.1692, 1.6322, 0.2560, ..., 0.3967, 0.6844, -0.0801],[-0.3589, 0.6584, 0.1836, ..., 0.9718, 0.5690, -0.7071],[ 0.6927, -0.4106, 0.5513, ..., -0.2038, 0.9638, 0.3804],...,[-1.2075, -0.9752, -2.2019, ..., -2.1254, -0.2321, 0.9713],[ 0.9386, -0.2055, 0.5741, ..., -0.6105, 1.7194, -0.8643],[-1.1196, 1.3400, -0.4640, ..., 1.0994, 0.7589, -1.0194]],[[-0.1582, -0.3705, -1.2539, ..., 2.6838, -1.0564, 0.3948],[-0.7398, 0.4874, -0.1554, ..., 0.3360, -0.9706, -1.1178],[ 0.8644, -1.5481, 0.7424, ..., -0.0362, 0.1190, -0.2096],...,[-2.2812, -1.0691, -1.9750, ..., -1.6648, 0.2447, -0.9316],[ 1.3596, -0.1944, 0.4538, ..., 1.5388, 1.9213, -1.1869],[-0.9911, 2.3999, -0.3334, ..., -0.5131, 0.7391, -0.2217]]]), past_key_values=DynamicCache(layers=[DynamicLayer, DynamicLayer]), hidden_states=None, attentions=None, cross_attentions=None)-
BaseModelOutputWithPastAndCrossAttentions类型,本质上就是一个带字段名的 tuple,包含:last_hidden_state:最后一层的隐状态(batch_size, seq_len, hidden_size),分别表示 (nth sample in batch, nth token in sequence, token hidden state)。(维度 =config.n_embd)past_key_values:KV Cache,缓存已经算过的 token 的 K/V 张量。下次只要输入新 token,再带上 past_key_values,模型就不用重算前面所有 token 的 attention,速度会快很多。hidden_states:每一层的隐状态(默认 None),会是tuple(len = num_layers + 1),包含 embedding 层 + 每一层的隐状态;attentions:每一层 self-attention 的权重(None)cross_attentions:有 encoder 时,会是每一层 cross-attention 的权重(None)
-
-
hidden_states就是从
transformer_outputs 取出的,(batch_size, seq_len, hidden_size) -
slice_indicesslice(-logits_to_keep, None) # 等价于 [-logits_to_keep:] 切片slice(0, None, None) # slice[T_start, T_stop, T_step]决定在时间维(序列长度那一维)上,保留哪些位置的 hidden_states 来算 logits。
logits_to_keep = 0 →slice(0, None)→ 保留所有时间步;logits_to_keep = 5 →slice(-5, None)→ 保留最后 5 个时间步。
-
logitstorch.Size([2, 16, 100])tensor([[[ 0.0545, -0.1060, -0.0190, ..., 0.0495, -0.2154, 0.0839],[-0.0738, -0.1690, 0.1306, ..., 0.1128, 0.1745, -0.0701],[-0.1737, -0.2598, 0.0013, ..., 0.0105, 0.2533, 0.0665],...,[ 0.0656, 0.0591, -0.0530, ..., -0.3790, 0.0711, 0.1401],[ 0.0471, 0.0646, 0.1361, ..., -0.0941, 0.0735, 0.0579],[ 0.2808, 0.3964, -0.1211, ..., -0.0119, 0.2546, 0.2442]],[[ 0.0470, -0.0757, -0.0323, ..., 0.2119, -0.1910, 0.0868],[ 0.2607, 0.1443, -0.1541, ..., 0.2334, 0.1907, 0.2107],[ 0.0506, 0.0645, 0.0126, ..., -0.0995, 0.3498, 0.3652],...,[ 0.1784, -0.0449, 0.1409, ..., -0.1468, 0.6051, 0.0009],[-0.1163, 0.1026, 0.0557, ..., -0.0361, 0.0837, 0.2053],[-0.0091, 0.1724, 0.0056, ..., -0.0435, 0.2960, 0.1318]]])表示每个被保留的时间步,对词表中每个 token 的预测分数。
(batch_size, kept_seq_len, vocab_size),分别表示 (nth sample in batch, nth token in kept seq, logit for each vocabulary) -
loss默认
loss_type=ForCausalLMLossdef ForCausalLMLoss(logits, # (batch_size, seq_len, vocab_size)labels, # (batch_size, seq_len)vocab_size: int, # 词表大小num_items_in_batch: Optional[torch.Tensor] = None,ignore_index: int = -100, # 忽略标签的值,不计算lossshift_labels: Optional[torch.Tensor] = None, # 计算好的右移后的标签**kwargs,) -> torch.Tensor:# 把 logits 转成 float32,避免用 float16 直接算交叉熵时的数值精度问题。logits = logits.float()# 标签右移,让第 0 个位置去预测第 1 个 token,第 1 个位置去预测第 2 个 token,…if shift_labels is None:# Shift so that tokens < n predict nlabels = nn.functional.pad(labels, (0, 1), value=ignore_index) # 在最后一维右侧补一个位置,值是 ignore_index。shift_labels = labels[..., 1:].contiguous() # 第二维度右移一位,contiguous 保持内存连续# 拉平 token 维度logits = logits.view(-1, vocab_size) # (batch * seq_len, vocab_size)shift_labels = shift_labels.view(-1) # (batch * seq_len)shift_labels = shift_labels.to(logits.device)loss = fixed_cross_entropy(logits, shift_labels, num_items_in_batch, ignore_index, **kwargs)# source: [N, C] target: [N]return loss # tensor(float)