|
from transformers import MistralPreTrainedModel |
|
import torch |
|
import numpy as np |
|
from typing import List, Optional, Tuple, Union |
|
from torch import nn |
|
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss |
|
from transformers.modeling_outputs import SequenceClassifierOutputWithPast |
|
from transformers.modeling_attn_mask_utils import AttentionMaskConverter |
|
from transformers import ( |
|
MistralModel, |
|
MistralPreTrainedModel, |
|
MistralForCausalLM, |
|
MistralConfig, |
|
) |
|
from transformers.modeling_outputs import BaseModelOutputWithPast |
|
from transformers.cache_utils import Cache, DynamicCache |
|
from transformers.models.mistral.modeling_mistral import ( |
|
MistralDecoderLayer, |
|
MistralRMSNorm, |
|
MistralAttention, |
|
MistralFlashAttention2, |
|
MistralSdpaAttention, |
|
MistralMLP, |
|
) |
|
from torch import nn |
|
from transformers.utils import logging |
|
|
|
|
|
def _prepare_4d_causal_attention_mask( |
|
attention_mask: Optional[torch.Tensor], |
|
input_shape: Union[torch.Size, Tuple, List], |
|
inputs_embeds: torch.Tensor, |
|
past_key_values_length: int, |
|
sliding_window: Optional[int] = None, |
|
): |
|
""" |
|
Creates a causal 4D mask of shape `(batch_size, 1, query_length, key_value_length)` from a 2D mask of shape |
|
`(batch_size, key_value_length)` |
|
|
|
Args: |
|
attention_mask (`torch.Tensor` or `None`): |
|
A 2D attention mask of shape `(batch_size, key_value_length)` |
|
input_shape (`tuple(int)` or `list(int)` or `torch.Size`): |
|
The input shape should be a tuple that defines `(batch_size, query_length)`. |
|
inputs_embeds (`torch.Tensor`): |
|
The embedded inputs as a torch Tensor. |
|
past_key_values_length (`int`): |
|
The length of the key value cache. |
|
sliding_window (`int`, *optional*): |
|
If the model uses windowed attention, a sliding window should be passed. |
|
""" |
|
attn_mask_converter = AttentionMaskConverter( |
|
is_causal=False, sliding_window=sliding_window |
|
) |
|
|
|
key_value_length = input_shape[-1] + past_key_values_length |
|
|
|
|
|
if attention_mask is not None and len(attention_mask.shape) == 2: |
|
attention_mask = attn_mask_converter.to_4d( |
|
attention_mask, |
|
input_shape[-1], |
|
key_value_length=key_value_length, |
|
dtype=inputs_embeds.dtype, |
|
) |
|
elif attention_mask is not None and len(attention_mask.shape) == 4: |
|
expected_shape = (input_shape[0], 1, input_shape[1], key_value_length) |
|
if tuple(attention_mask.shape) != expected_shape: |
|
raise ValueError( |
|
f"Incorrect 4D attention_mask shape: {tuple(attention_mask.shape)}; expected: {expected_shape}." |
|
) |
|
else: |
|
|
|
inverted_mask = 1.0 - attention_mask |
|
attention_mask = inverted_mask.masked_fill( |
|
inverted_mask.to(torch.bool), torch.finfo(inputs_embeds.dtype).min |
|
) |
|
else: |
|
attention_mask = attn_mask_converter.to_causal_4d( |
|
input_shape[0], |
|
input_shape[-1], |
|
key_value_length, |
|
dtype=inputs_embeds.dtype, |
|
device=inputs_embeds.device, |
|
) |
|
|
|
return attention_mask |
|
|
|
|
|
|
|
def _prepare_4d_causal_attention_mask_for_sdpa( |
|
attention_mask: Optional[torch.Tensor], |
|
input_shape: Union[torch.Size, Tuple, List], |
|
inputs_embeds: torch.Tensor, |
|
past_key_values_length: int, |
|
sliding_window: Optional[int] = None, |
|
): |
|
""" |
|
Prepares the correct `attn_mask` argument to be used by `torch.nn.functional.scaled_dot_product_attention`. |
|
|
|
In case no token is masked in the `attention_mask` argument, we simply set it to `None` for the cases `query_length == 1` and |
|
`key_value_length == query_length`, and rely instead on SDPA `is_causal` argument to use causal/non-causal masks, |
|
allowing to dispatch to the flash attention kernel (that can otherwise not be used if a custom `attn_mask` is passed). |
|
""" |
|
attn_mask_converter = AttentionMaskConverter( |
|
is_causal=False, sliding_window=sliding_window |
|
) |
|
|
|
key_value_length = input_shape[-1] + past_key_values_length |
|
batch_size, query_length = input_shape |
|
|
|
|
|
|
|
|
|
|
|
is_tracing = ( |
|
torch.jit.is_tracing() |
|
or isinstance(inputs_embeds, torch.fx.Proxy) |
|
or (hasattr(torch, "_dynamo") and torch._dynamo.is_compiling()) |
|
) |
|
|
|
if attention_mask is not None: |
|
|
|
if len(attention_mask.shape) == 4: |
|
expected_shape = (input_shape[0], 1, input_shape[1], key_value_length) |
|
if tuple(attention_mask.shape) != expected_shape: |
|
raise ValueError( |
|
f"Incorrect 4D attention_mask shape: {tuple(attention_mask.shape)}; expected: {expected_shape}." |
|
) |
|
else: |
|
|
|
inverted_mask = 1.0 - attention_mask.to(inputs_embeds.dtype) |
|
attention_mask = inverted_mask.masked_fill( |
|
inverted_mask.to(torch.bool), torch.finfo(inputs_embeds.dtype).min |
|
) |
|
return attention_mask |
|
|
|
elif not is_tracing and torch.all(attention_mask == 1): |
|
if query_length == 1: |
|
|
|
attention_mask = None |
|
elif key_value_length == query_length: |
|
attention_mask = None |
|
else: |
|
|
|
|
|
|
|
pass |
|
elif query_length > 1 and key_value_length != query_length: |
|
|
|
|
|
attention_mask = True |
|
elif is_tracing: |
|
raise ValueError( |
|
'Attention using SDPA can not be traced with torch.jit.trace when no attention_mask is provided. To solve this issue, please either load your model with the argument `attn_implementation="eager"` or pass an attention_mask input when tracing the model.' |
|
) |
|
|
|
if attention_mask is None: |
|
expanded_4d_mask = None |
|
elif attention_mask is True: |
|
expanded_4d_mask = attn_mask_converter.to_causal_4d( |
|
input_shape[0], |
|
input_shape[-1], |
|
key_value_length, |
|
dtype=inputs_embeds.dtype, |
|
device=inputs_embeds.device, |
|
) |
|
else: |
|
expanded_4d_mask = attn_mask_converter.to_4d( |
|
attention_mask, |
|
input_shape[-1], |
|
dtype=inputs_embeds.dtype, |
|
key_value_length=key_value_length, |
|
) |
|
|
|
|
|
|
|
|
|
if not is_tracing and expanded_4d_mask.device.type == "cuda": |
|
expanded_4d_mask = AttentionMaskConverter._unmask_unattended( |
|
expanded_4d_mask, min_dtype=torch.finfo(inputs_embeds.dtype).min |
|
) |
|
|
|
return expanded_4d_mask |
|
|
|
class ModifiedMistralAttention(MistralAttention): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.is_causal = False |
|
|
|
|
|
class ModifiedMistralFlashAttention2(MistralFlashAttention2): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.is_causal = False |
|
|
|
|
|
class ModifiedMistralSdpaAttention(MistralSdpaAttention): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.is_causal = False |
|
|
|
|
|
MISTRAL_ATTENTION_CLASSES = { |
|
"eager": ModifiedMistralAttention, |
|
"flash_attention_2": ModifiedMistralFlashAttention2, |
|
"sdpa": ModifiedMistralSdpaAttention, |
|
} |
|
|
|
|
|
class ModifiedMistralDecoderLayer(MistralDecoderLayer): |
|
def __init__(self, config: MistralConfig, layer_idx: int): |
|
nn.Module.__init__(self) |
|
self.hidden_size = config.hidden_size |
|
|
|
self.self_attn = MISTRAL_ATTENTION_CLASSES[config._attn_implementation]( |
|
config, layer_idx |
|
) |
|
|
|
self.mlp = MistralMLP(config) |
|
self.input_layernorm = MistralRMSNorm( |
|
config.hidden_size, eps=config.rms_norm_eps |
|
) |
|
self.post_attention_layernorm = MistralRMSNorm( |
|
config.hidden_size, eps=config.rms_norm_eps |
|
) |
|
|
|
|
|
class MistralBiModel(MistralModel): |
|
def __init__(self, config: MistralConfig): |
|
MistralPreTrainedModel.__init__(self, config) |
|
self.padding_idx = config.pad_token_id |
|
self.vocab_size = config.vocab_size |
|
|
|
self.embed_tokens = nn.Embedding( |
|
config.vocab_size, config.hidden_size, self.padding_idx |
|
) |
|
self.layers = nn.ModuleList( |
|
[ |
|
ModifiedMistralDecoderLayer(config, layer_idx) |
|
for layer_idx in range(config.num_hidden_layers) |
|
] |
|
) |
|
self._attn_implementation = config._attn_implementation |
|
self.norm = MistralRMSNorm(config.hidden_size, eps=config.rms_norm_eps) |
|
|
|
self.gradient_checkpointing = False |
|
|
|
self.post_init() |
|
|
|
|
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
position_ids: Optional[torch.LongTensor] = None, |
|
past_key_values: Optional[List[torch.FloatTensor]] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
) -> Union[Tuple, BaseModelOutputWithPast]: |
|
output_attentions = ( |
|
output_attentions |
|
if output_attentions is not None |
|
else self.config.output_attentions |
|
) |
|
output_hidden_states = ( |
|
output_hidden_states |
|
if output_hidden_states is not None |
|
else self.config.output_hidden_states |
|
) |
|
use_cache = use_cache if use_cache is not None else self.config.use_cache |
|
|
|
return_dict = ( |
|
return_dict if return_dict is not None else self.config.use_return_dict |
|
) |
|
|
|
|
|
if input_ids is not None and inputs_embeds is not None: |
|
raise ValueError( |
|
"You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time" |
|
) |
|
elif input_ids is not None: |
|
batch_size, seq_length = input_ids.shape |
|
elif inputs_embeds is not None: |
|
batch_size, seq_length, _ = inputs_embeds.shape |
|
else: |
|
raise ValueError( |
|
"You have to specify either decoder_input_ids or decoder_inputs_embeds" |
|
) |
|
|
|
if self.gradient_checkpointing and self.training: |
|
if use_cache: |
|
logger.warning_once( |
|
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." |
|
) |
|
use_cache = False |
|
|
|
past_key_values_length = 0 |
|
|
|
if use_cache: |
|
use_legacy_cache = not isinstance(past_key_values, Cache) |
|
if use_legacy_cache: |
|
past_key_values = DynamicCache.from_legacy_cache(past_key_values) |
|
past_key_values_length = past_key_values.get_usable_length(seq_length) |
|
|
|
if position_ids is None: |
|
device = input_ids.device if input_ids is not None else inputs_embeds.device |
|
position_ids = torch.arange( |
|
past_key_values_length, |
|
seq_length + past_key_values_length, |
|
dtype=torch.long, |
|
device=device, |
|
) |
|
position_ids = position_ids.unsqueeze(0).view(-1, seq_length) |
|
else: |
|
position_ids = position_ids.view(-1, seq_length).long() |
|
|
|
if inputs_embeds is None: |
|
inputs_embeds = self.embed_tokens(input_ids) |
|
|
|
if ( |
|
attention_mask is not None |
|
and self._attn_implementation == "flash_attention_2" |
|
and use_cache |
|
): |
|
is_padding_right = attention_mask[:, -1].sum().item() != batch_size |
|
if is_padding_right: |
|
raise ValueError( |
|
"You are attempting to perform batched generation with padding_side='right'" |
|
" this may lead to unexpected behaviour for Flash Attention version of Mistral. Make sure to " |
|
" call `tokenizer.padding_side = 'left'` before tokenizing the input. ") |
|
|
|
if self._attn_implementation == "flash_attention_2": |
|
|
|
attention_mask = ( |
|
attention_mask |
|
if (attention_mask is not None and 0 in attention_mask) |
|
else None |
|
) |
|
elif self._attn_implementation == "sdpa" and not output_attentions: |
|
|
|
attention_mask = _prepare_4d_causal_attention_mask_for_sdpa( |
|
attention_mask, |
|
(batch_size, seq_length), |
|
inputs_embeds, |
|
past_key_values_length, |
|
) |
|
else: |
|
|
|
attention_mask = _prepare_4d_causal_attention_mask( |
|
attention_mask, |
|
(batch_size, seq_length), |
|
inputs_embeds, |
|
past_key_values_length, |
|
sliding_window=self.config.sliding_window, |
|
) |
|
|
|
hidden_states = inputs_embeds |
|
|
|
|
|
all_hidden_states = () if output_hidden_states else None |
|
all_self_attns = () if output_attentions else None |
|
next_decoder_cache = None |
|
|
|
for decoder_layer in self.layers: |
|
if output_hidden_states: |
|
all_hidden_states += (hidden_states,) |
|
|
|
if self.gradient_checkpointing and self.training: |
|
layer_outputs = self._gradient_checkpointing_func( |
|
decoder_layer.__call__, |
|
hidden_states, |
|
attention_mask, |
|
position_ids, |
|
past_key_values, |
|
output_attentions, |
|
use_cache, |
|
) |
|
else: |
|
layer_outputs = decoder_layer( |
|
hidden_states, |
|
attention_mask=attention_mask, |
|
position_ids=position_ids, |
|
past_key_value=past_key_values, |
|
output_attentions=output_attentions, |
|
use_cache=use_cache, |
|
) |
|
|
|
hidden_states = layer_outputs[0] |
|
|
|
if use_cache: |
|
next_decoder_cache = layer_outputs[2 if output_attentions else 1] |
|
|
|
if output_attentions: |
|
all_self_attns += (layer_outputs[1],) |
|
|
|
hidden_states = self.norm(hidden_states) |
|
|
|
|
|
if output_hidden_states: |
|
all_hidden_states += (hidden_states,) |
|
|
|
next_cache = None |
|
if use_cache: |
|
next_cache = ( |
|
next_decoder_cache.to_legacy_cache() |
|
if use_legacy_cache |
|
else next_decoder_cache |
|
) |
|
|
|
if not return_dict: |
|
return tuple( |
|
v |
|
for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] |
|
if v is not None |
|
) |
|
return BaseModelOutputWithPast( |
|
last_hidden_state=hidden_states, |
|
past_key_values=next_cache, |
|
hidden_states=all_hidden_states, |
|
attentions=all_self_attns, |
|
) |
|
|
|
|
|
class MistralBiForMNTP(MistralForCausalLM): |
|
def __init__(self, config): |
|
MistralPreTrainedModel.__init__(self, config) |
|
self.model = MistralBiModel(config) |
|
self.vocab_size = config.vocab_size |
|
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
|
|
|
|
|
self.post_init() |
|
|
|
class MistralForSequenceClassification(MistralPreTrainedModel): |
|
def __init__(self, config): |
|
super().__init__(config) |
|
self.num_labels = config.num_labels |
|
self.model = MistralBiModel(config) |
|
self.score = nn.Linear(config.hidden_size, self.num_labels, bias=False) |
|
|
|
|
|
self.post_init() |
|
|
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
position_ids: Optional[torch.LongTensor] = None, |
|
past_key_values: Optional[List[torch.FloatTensor]] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
labels: Optional[torch.LongTensor] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
): |
|
r""" |
|
labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
|
Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., |
|
config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If |
|
`config.num_labels > 1` a classification loss is computed (Cross-Entropy). |
|
""" |
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
transformer_outputs = self.model( |
|
input_ids, |
|
attention_mask=attention_mask, |
|
position_ids=position_ids, |
|
past_key_values=past_key_values, |
|
inputs_embeds=inputs_embeds, |
|
use_cache=use_cache, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
) |
|
pooled_output = transformer_outputs[0][:, 0] |
|
logits = self.score(pooled_output) |
|
|
|
loss = None |
|
if labels is not None: |
|
if self.config.problem_type is None: |
|
if self.num_labels == 1: |
|
self.config.problem_type = "regression" |
|
elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): |
|
self.config.problem_type = "single_label_classification" |
|
else: |
|
self.config.problem_type = "multi_label_classification" |
|
|
|
if self.config.problem_type == "regression": |
|
loss_fct = MSELoss() |
|
if self.num_labels == 1: |
|
loss = loss_fct(logits.squeeze(), labels.squeeze()) |
|
else: |
|
loss = loss_fct(logits, labels) |
|
elif self.config.problem_type == "single_label_classification": |
|
loss_fct = CrossEntropyLoss() |
|
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) |
|
elif self.config.problem_type == "multi_label_classification": |
|
loss_fct = BCEWithLogitsLoss() |
|
loss = loss_fct(logits, labels) |
|
if not return_dict: |
|
output = (logits,) + transformer_outputs[2:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return SequenceClassifierOutputWithPast( |
|
loss=loss, |
|
logits=logits, |
|
past_key_values=transformer_outputs.past_key_values, |
|
hidden_states=transformer_outputs.hidden_states, |
|
attentions=transformer_outputs.attentions, |
|
) |