✅博主简介:擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导,毕业论文、期刊论文经验交流。
✅成品或者定制,扫描文章底部微信二维码。
(1)植物内含子RNA数据集构建与序列编码策略研究
内含子RNA是基因转录过程中从前体信使RNA剪接去除的非编码序列片段,长期以来被认为是基因组中的无用序列。然而近年来的研究表明,内含子RNA在植物生长发育调控、逆境胁迫响应、基因表达调节等生物学过程中发挥着重要作用。利用深度学习技术实现植物内含子RNA的高效准确预测,对于揭示内含子RNA的功能机制、挖掘具有农业应用价值的调控元件具有重要意义。本研究首先开展了植物内含子RNA数据集的构建工作,为后续模型训练和验证奠定数据基础。
在物种选择方面,综合考虑了模式植物和重要作物两个维度。拟南芥作为植物分子生物学研究中使用最广泛的模式植物,其基因组注释信息完整准确,内含子序列边界界定清晰,是构建标准数据集的首选物种。水稻、玉米、棉花和油菜是重要的农业作物,具有显著的经济价值和广泛的种植面积,针对这些物种开展内含子RNA预测研究具有直接的农业应用前景。从公开的基因组数据库中提取这五种植物的基因注释信息,根据转录本结构识别内含子序列的起始和终止位置,构建正样本集。负样本的构建采用随机采样策略,从基因间区和外显子区域随机选取与内含子长度分布相似的序列片段,确保正负样本在序列长度分布上保持平衡。
在序列编码方面,系统比较了多种编码策略对深度学习模型预测效果的影响。核苷酸序列是由腺嘌呤、鸟嘌呤、胞嘧啶和胸腺嘧啶四种碱基组成的字符串,需要转换为数值向量才能作为神经网络的输入。独热编码是最常用的编码方式,将每个碱基表示为一个四维向量,其中对应碱基位置为一其余位置为零。这种编码方式保留了碱基的类别信息但未考虑碱基之间的相似性关系。虚拟编码在独热编码基础上引入了碱基之间的化学性质相似性信息,如嘌呤碱基和嘧啶碱基的区分。哈希编码通过哈希函数将碱基序列映射到固定维度的向量空间,具有计算效率高的优点。
本研究还提出了一种新的互补独热编码策略,该策略同时考虑了序列正链和互补链的编码信息。具体做法是将原始序列和其互补序列分别进行独热编码,然后将两个编码矩阵沿通道维度拼接,形成八通道的输入表示。这种编码方式的生物学依据在于,内含子序列在基因组中往往以双链形式存在,正链和互补链的序列特征共同决定了内含子的识别和剪接过程。实验结果表明,互补独热编码在大多数模型和数据集组合下取得了最优的预测准确率,相比传统独热编码提升了零点一到一点六六个百分点不等,验证了同时利用双链序列信息的有效性。
(2)基于循环神经网络的内含子RNA预测模型构建与优化
为实现对植物内含子RNA的准确预测,本研究系统构建并比较了多种深度学习模型架构。序列预测任务的核心挑战在于如何有效提取序列中的局部模式特征和长距离依赖关系。内含子序列通常包含特定的剪接位点信号序列、分支点序列和聚嘧啶束等保守基序,这些基序的识别对于准确预测内含子边界至关重要。同时,内含子的识别还依赖于上下游外显子序列的协同信息,需要模型具备捕获长程依赖的能力。
在模型架构设计方面,重点探索了循环神经网络系列模型在内含子预测任务中的应用。长短时记忆神经网络通过引入输入门、遗忘门和输出门三个门控单元,有效解决了传统循环神经网络在处理长序列时的梯度消失问题,能够学习序列中相距较远的依赖关系。门控循环单元是长短时记忆网络的简化变体,将输入门和遗忘门合并为更新门,参数量更少但保持了相近的建模能力。实验结果显示,长短时记忆网络在拟南芥数据集上取得了最优的综合表现,而门控循环单元在棉花、玉米、水稻和油菜四种作物的数据集上表现最佳。
针对不同物种数据集上最优模型不一致的现象,本研究进行了深入分析。拟南芥作为双子叶模式植物,其内含子序列特征与单子叶作物存在一定差异,基因组规模较小且内含子平均长度较短,长短时记忆网络较强的长程依赖建模能力在处理这类数据时表现出优势。而玉米、水稻等单子叶作物的基因组较大,内含子长度分布范围更广,门控循环单元在参数效率和泛化能力方面的优势得以体现。这一发现提示在实际应用中需要根据目标物种的特点选择合适的模型架构。
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score import pandas as pd class SequenceEncoder: def __init__(self, encoding_type='onehot'): self.encoding_type = encoding_type self.base_map = {'A': 0, 'T': 1, 'G': 2, 'C': 3, 'N': 4} self.complement = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G', 'N': 'N'} def onehot_encode(self, sequence): encoding = np.zeros((len(sequence), 4), dtype=np.float32) for i, base in enumerate(sequence.upper()): if base in self.base_map and self.base_map[base] < 4: encoding[i, self.base_map[base]] = 1.0 return encoding def complement_onehot_encode(self, sequence): forward = self.onehot_encode(sequence) reverse_seq = ''.join([self.complement.get(b, 'N') for b in sequence.upper()[::-1]]) reverse = self.onehot_encode(reverse_seq) return np.concatenate([forward, reverse], axis=1) def hash_encode(self, sequence, dim=64): encoding = np.zeros((len(sequence), dim), dtype=np.float32) for i, base in enumerate(sequence.upper()): if base in self.base_map: np.random.seed(self.base_map[base] + i) encoding[i] = np.random.randn(dim) return encoding def encode(self, sequence): if self.encoding_type == 'onehot': return self.onehot_encode(sequence) elif self.encoding_type == 'complement_onehot': return self.complement_onehot_encode(sequence) elif self.encoding_type == 'hash': return self.hash_encode(sequence) else: return self.onehot_encode(sequence) class IntronDataset(Dataset): def __init__(self, sequences, labels, encoder, max_len=500): self.sequences = sequences self.labels = labels self.encoder = encoder self.max_len = max_len def __len__(self): return len(self.sequences) def __getitem__(self, idx): seq = self.sequences[idx] if len(seq) > self.max_len: seq = seq[:self.max_len] encoded = self.encoder.encode(seq) if len(encoded) < self.max_len: padding = np.zeros((self.max_len - len(encoded), encoded.shape[1]), dtype=np.float32) encoded = np.concatenate([encoded, padding], axis=0) return torch.FloatTensor(encoded), torch.LongTensor([self.labels[idx]]) class LSTMPredictor(nn.Module): def __init__(self, input_size=4, hidden_size=128, num_layers=2, num_classes=2): super(LSTMPredictor, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True, dropout=0.3) self.attention = nn.Sequential( nn.Linear(hidden_size * 2, hidden_size), nn.Tanh(), nn.Linear(hidden_size, 1) ) self.fc = nn.Sequential( nn.Linear(hidden_size * 2, 128), nn.ReLU(), nn.Dropout(0.5), nn.Linear(128, num_classes) ) def forward(self, x): lstm_out, _ = self.lstm(x) attn_weights = F.softmax(self.attention(lstm_out), dim=1) context = torch.sum(attn_weights * lstm_out, dim=1) output = self.fc(context) return output class GRUPredictor(nn.Module): def __init__(self, input_size=4, hidden_size=128, num_layers=2, num_classes=2): super(GRUPredictor, self).__init__() self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True, dropout=0.3) self.attention = nn.MultiheadAttention(hidden_size * 2, num_heads=4, batch_first=True) self.fc = nn.Sequential( nn.Linear(hidden_size * 2, 128), nn.ReLU(), nn.Dropout(0.5), nn.Linear(128, num_classes) ) def forward(self, x): gru_out, _ = self.gru(x) attn_out, _ = self.attention(gru_out, gru_out, gru_out) pooled = torch.mean(attn_out, dim=1) return self.fc(pooled) class CNNPredictor(nn.Module): def __init__(self, input_size=4, num_classes=2): super(CNNPredictor, self).__init__() self.conv1 = nn.Conv1d(input_size, 64, kernel_size=7, padding=3) self.conv2 = nn.Conv1d(64, 128, kernel_size=5, padding=2) self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1) self.pool = nn.AdaptiveMaxPool1d(1) self.fc = nn.Sequential( nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.5), nn.Linear(128, num_classes) ) self.bn1 = nn.BatchNorm1d(64) self.bn2 = nn.BatchNorm1d(128) self.bn3 = nn.BatchNorm1d(256) def forward(self, x): x = x.transpose(1, 2) x = F.relu(self.bn1(self.conv1(x))) x = F.max_pool1d(x, 2) x = F.relu(self.bn2(self.conv2(x))) x = F.max_pool1d(x, 2) x = F.relu(self.bn3(self.conv3(x))) x = self.pool(x).squeeze(-1) return self.fc(x) class HybridPredictor(nn.Module): def __init__(self, input_size=4, hidden_size=128, num_classes=2): super(HybridPredictor, self).__init__() self.conv = nn.Sequential( nn.Conv1d(input_size, 64, 7, padding=3), nn.BatchNorm1d(64), nn.ReLU(), nn.MaxPool1d(2), nn.Conv1d(64, 128, 5, padding=2), nn.BatchNorm1d(128), nn.ReLU(), nn.MaxPool1d(2) ) self.lstm = nn.LSTM(128, hidden_size, 2, batch_first=True, bidirectional=True) self.fc = nn.Sequential( nn.Linear(hidden_size * 2, 128), nn.ReLU(), nn.Dropout(0.5), nn.Linear(128, num_classes) ) def forward(self, x): x = x.transpose(1, 2) x = self.conv(x) x = x.transpose(1, 2) lstm_out, (hidden, _) = self.lstm(x) hidden = torch.cat([hidden[-2], hidden[-1]], dim=1) return self.fc(hidden) class IntronRNADatabase: def __init__(self, db_path): self.db_path = db_path self.species_data = {} def load_species_data(self, species_name, fasta_file, annotation_file): sequences = self.parse_fasta(fasta_file) annotations = self.parse_annotation(annotation_file) self.species_data[species_name] = { 'sequences': sequences, 'annotations': annotations } def parse_fasta(self, fasta_file): sequences = {} current_id = None current_seq = [] with open(fasta_file, 'r') as f: for line in f: if line.startswith('>'): if current_id: sequences[current_id] = ''.join(current_seq) current_id = line[1:].strip().split()[0] current_seq = [] else: current_seq.append(line.strip()) if current_id: sequences[current_id] = ''.join(current_seq) return sequences def parse_annotation(self, annotation_file): annotations = [] with open(annotation_file, 'r') as f: for line in f: if line.startswith('#'): continue fields = line.strip().split('\t') if len(fields) >= 9 and fields[2] == 'intron': annotations.append({ 'chrom': fields[0], 'start': int(fields[3]), 'end': int(fields[4]), 'strand': fields[6], 'attributes': fields[8] }) return annotations def search_introns(self, species, gene_name=None, min_length=None, max_length=None): results = [] if species not in self.species_data: return results for intron in self.species_data[species]['annotations']: length = intron['end'] - intron['start'] if min_length and length < min_length: continue if max_length and length > max_length: continue if gene_name and gene_name not in intron['attributes']: continue results.append(intron) return results def train_model(model, train_loader, val_loader, epochs=50, lr=1e-3, device='cuda'): model = model.to(device) optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5) criterion = nn.CrossEntropyLoss() best_f1 = 0 for epoch in range(epochs): model.train() train_loss = 0 for batch_x, batch_y in train_loader: batch_x = batch_x.to(device) batch_y = batch_y.squeeze().to(device) optimizer.zero_grad() outputs = model(batch_x) loss = criterion(outputs, batch_y) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() train_loss += loss.item() model.eval() val_preds, val_labels = [], [] with torch.no_grad(): for batch_x, batch_y in val_loader: batch_x = batch_x.to(device) outputs = model(batch_x) preds = outputs.argmax(dim=1).cpu().numpy() val_preds.extend(preds) val_labels.extend(batch_y.squeeze().numpy()) val_f1 = f1_score(val_labels, val_preds) scheduler.step(1 - val_f1) if val_f1 > best_f1: best_f1 = val_f1 torch.save(model.state_dict(), 'best_model.pt') return model def evaluate_model(model, test_loader, device='cuda'): model.eval() all_preds, all_labels = [], [] with torch.no_grad(): for batch_x, batch_y in test_loader: batch_x = batch_x.to(device) outputs = model(batch_x) preds = outputs.argmax(dim=1).cpu().numpy() all_preds.extend(preds) all_labels.extend(batch_y.squeeze().numpy()) metrics = { 'accuracy': accuracy_score(all_labels, all_preds), 'precision': precision_score(all_labels, all_preds), 'recall': recall_score(all_labels, all_preds), 'f1': f1_score(all_labels, all_preds) } return metrics def cross_species_evaluation(models, datasets, device='cuda'): results = {} species_list = list(models.keys()) for source in species_list: results[source] = {} source_model = models[source] source_model.eval() for target in species_list: if source == target: continue test_loader = datasets[target]['test'] metrics = evaluate_model(source_model, test_loader, device) results[source][target] = metrics return results如有问题,可以直接沟通
👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇