-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalgorithm.py
132 lines (111 loc) · 5.45 KB
/
algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import ExtraTreesRegressor, RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from transformers import BertModel, RobertaModel, AutoTokenizer
from lazypredict.Supervised import LazyRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.model_selection import StratifiedShuffleSplit
class SmilesDataset(Dataset):
def __init__(self, smiles_list, targets, tokenizer, max_length=512):
self.smiles_list = smiles_list
self.targets = targets
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.smiles_list)
def __getitem__(self, idx):
smiles = self.smiles_list[idx]
target = self.targets[idx]
inputs = self.tokenizer(smiles, return_tensors='pt', padding='max_length', truncation=True, max_length=self.max_length)
input_ids = inputs['input_ids'].squeeze()
attention_mask = inputs['attention_mask'].squeeze()
return {'input_ids': input_ids, 'attention_mask': attention_mask, 'target': torch.tensor(target, dtype=torch.float)}
class ChemBERTaRegressor(nn.Module):
def __init__(self, chemberta_model):
super(ChemBERTaRegressor, self).__init__()
self.chemberta_model = chemberta_model
self.regressor = nn.Linear(chemberta_model.config.hidden_size, 1)
def forward(self, input_ids, attention_mask):
outputs = self.chemberta_model(input_ids=input_ids, attention_mask=attention_mask)
cls_output = outputs.last_hidden_state[:, 0, :]
prediction = self.regressor(cls_output)
return prediction.squeeze()
def train_model(model, dataloader, optimizer, device):
model.train()
total_loss = 0
for batch in dataloader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
targets = batch['target'].to(device)
optimizer.zero_grad()
predictions = model(input_ids=input_ids, attention_mask=attention_mask)
loss = torch.nn.functional.mse_loss(predictions, targets)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def extract_features(smiles_list, model, tokenizer, device):
model.eval()
features = []
for smiles in smiles_list:
inputs = tokenizer(smiles, return_tensors='pt', padding=True, truncation=True, max_length=512)
input_ids = inputs['input_ids'].to(device)
attention_mask = inputs['attention_mask'].to(device)
with torch.no_grad():
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
cls_output = outputs.last_hidden_state[:, 0, :]
features.append(cls_output.cpu().numpy())
return np.concatenate(features, axis=0)
def evaluate_with_lazy_regressor(X, y):
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
reg = LazyRegressor(verbose=0, ignore_warnings=True, custom_metric=None)
models, predictions = reg.fit(X_train, X_test, y_train, y_test)
print(models)
def feature_selection_and_optimization(X, y):
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
rf = RandomForestRegressor(n_estimators=100, random_state=42)
rf.fit(X_scaled, y)
importances = rf.feature_importances_
n = 10
indices = np.argsort(importances)[-n:]
selected_features = X[:, indices]
X_selected_scaled = scaler.fit_transform(selected_features)
rf.fit(X_selected_scaled, y)
rf_predictions = rf.predict(X_selected_scaled).reshape(-1, 1)
kmeans = KMeans(n_clusters=5, random_state=42)
kmeans.fit(X_selected_scaled)
initial_centroids = kmeans.cluster_centers_
kmeans = KMeans(n_clusters=5, init=initial_centroids, n_init=1, random_state=42)
kmeans_labels = kmeans.fit_predict(X_selected_scaled)
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42)
for train_index, partial_index in sss.split(X, kmeans_labels):
partial_labels = kmeans_labels[partial_index]
partial_indices = partial_index
kmeans_labels[partial_indices] = partial_labels
pca = PCA(n_components=2)
pca_features = pca.fit_transform(X_selected_scaled)
X_selected_scaled = np.hstack([X_selected_scaled, pca_features])
X_train, X_test, y_train, y_test = train_test_split(X_selected_scaled, y, test_size=0.2, random_state=42)
et_model = ExtraTreesRegressor()
param_grid = {
'n_estimators': [100, 200, 300],
'max_features': ['sqrt', 'log2', 0.5, 0.7],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10]
}
grid_search = GridSearchCV(estimator=et_model, param_grid=param_grid, cv=5, n_jobs=-1, scoring='neg_mean_squared_error')
grid_search.fit(X_train, y_train)
best_et_model = grid_search.best_estimator_
y_pred = best_et_model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f'Mean Squared Error: {mse}')
print(f'R^2 Score: {r2}')