数据预处理

直接给大家看代码吧,完整代码在这 https://github.com/zong4/Kaggle,functions 的那些函数干了什么基本也就是字面意思(剔除一些没用的列然后归一化,再把字符串列进行one-hot编码)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Load data
import sys
import pandas as pd

basic_path = 'house-prices-advanced-regression-techniques'
train_data = pd.read_csv(basic_path + "/train.csv")
test_data = pd.read_csv(basic_path + "/test.csv")


# Prepare environment
import functions_pytorch

functions_pytorch.set_seed(42)


# Extract target variable
train_labels = train_data['SalePrice']
train_data.drop(['SalePrice'], axis=1, inplace=True)


# Data preprocessing
import functions

train_data = functions.drop_useless_cols(train_data, test_data)

train_data.drop(['Id'], axis=1, inplace=True)
train_data = functions.drop_cols_with_same_data(train_data, 0.9)
train_data = functions.drop_cols_with_na(train_data, 0.8)
train_data = functions.fill_na_with_mean(train_data)
train_data = functions.normalize(train_data)
train_data = functions.one_hot_encoding(train_data)
print(train_data.info())
print()
print(train_data.head())
print()

test_data.drop(['Id'], axis=1, inplace=True)
test_data = functions.fill_na_with_mean(test_data)
test_data = functions.normalize(test_data)
test_data = functions.one_hot_encoding(test_data)

test_data = functions.drop_useless_cols(test_data, train_data)
test_data = functions.add_missing_dummy_columns(test_data, train_data)

模型设置

这里主要是自定义模型和 Dataset,大家改的话也就改模型就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Set up model
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary

class HousePricesDataset(Dataset):
def __init__(self, data, labels):
self.data = data
self.labels = labels

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
try:
data = torch.tensor(self.data.iloc[idx].values.astype('float32'))
label = torch.tensor(self.labels.iloc[idx].astype('float32'))
return data, label
except KeyError as e:
print(f"KeyError: {e} at index {idx}")
raise
except Exception as e:
print(f"Unexpected error: {e} at index {idx}")
raise

class Net(nn.Module):
def __init__(self, input_dim):
super(Net, self).__init__()
self.fc1 = nn.Linear(input_dim, 128)
self.dropout1 = nn.Dropout(0.2)
self.fc2 = nn.Linear(128, 64)
self.dropout2 = nn.Dropout(0.2)
self.fc3 = nn.Linear(64, 1)

def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.dropout1(x)
x = torch.relu(self.fc2(x))
x = self.dropout2(x)
x = self.fc3(x)
return x

model = Net(train_data.shape[1])
summary(model, input_size=(train_data.shape[1],))
model.to(functions_pytorch.device)
print()

模型训练

我这里用的是K折交叉验证,大家也可以直接分割数据集然后调用 functions_pytorch.train_model()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Train model with KFold cross validation
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt

k = 5
num_epochs = 100
batch_size = 32
learning_rate = 0.01

criterion = functions_pytorch.RMSLELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

def train_kfold(train_data, train_labels, model, criterion, optimizer, k):
kf = KFold(n_splits=k, shuffle=True)

cnt = 1
for train, val in kf.split(train_data):
print(f"Fold {cnt}")

train_data_fold = train_data.iloc[train]
train_labels_fold = train_labels.iloc[train]
val_data_fold = train_data.iloc[val]
val_labels_fold = train_labels.iloc[val]

train_dataset = HousePricesDataset(train_data_fold, train_labels_fold)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataset = HousePricesDataset(val_data_fold, val_labels_fold)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

train_losses, val_losses = functions_pytorch.train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs)

# Plot losses
plt.clf()
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='val Loss')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.savefig(basic_path + f'/loss_fold_{cnt}.png')

cnt += 1
print()
train_kfold(train_data, train_labels, model, criterion, optimizer, k)

MSLELoss 是我自己定义的损失函数,大家可以看看怎么实现的。

1
2
3
4
5
6
7
8
9
# Logarithmic root mean squared error
class RMSLELoss(torch.nn.Module):
def __init__(self):
super(RMSLELoss, self).__init__()

def forward(self, pred, actual):
pred = torch.clamp(pred, min=0)
actual = torch.clamp(actual, min=0)
return torch.sqrt(torch.mean((torch.log(pred + 1) - torch.log(actual + 1)) ** 2))

训练模型的代码主要是下面这样,我是10步打印一次损失值,但是因为我可以一折画一次图,所以我也不需要把输出做的很频繁,知道在跑就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Train model
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs):
train_losses = []
val_losses = []
for epoch in range(num_epochs):
for i, (data, labels) in enumerate(train_loader):
optimizer.zero_grad()
outputs = model(data.to(device))
loss = criterion(outputs, labels.to(device).view(-1, 1))
loss.backward()
optimizer.step()

train_losses.append(loss.item())

if val_loader:
with torch.no_grad():
model.eval()
total_loss = 0
for data, labels in val_loader:
outputs = model(data.to(device))
total_loss += criterion(outputs, labels.to(device).view(-1, 1)).item()
val_losses.append(total_loss / len(val_loader))
model.train()

if (epoch + 1) % 10 == 0:
if val_loader:
print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_losses[-1]}, Val Loss: {val_losses[-1]}')
else:
print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_losses[-1]}')
return train_losses, val_losses

模型预测

这一块具体怎么写还是要看输出格式,我这不是 Kaggle 的格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Predict test data
def predict_test_data(model, test_data):
# Train model with all data
print("Train model with all data")
train_dataset = HousePricesDataset(train_data, train_labels)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
functions_pytorch.train_model(model, criterion, optimizer, train_loader, None, num_epochs)

test_dataset = HousePricesDataset(test_data, pd.Series([0] * len(test_data)))
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

predictions = []
with torch.no_grad():
model.eval()
for data, _ in test_loader:
outputs = model(data.to(functions_pytorch.device))
predictions.append(outputs.item())
model.train()

print()
return predictions
# best_model_path = 'model_fold_5.pth'
# model.load_state_dict(torch.load(best_model_path))
predictions = predict_test_data(model, test_data)

submission = pd.DataFrame({'Id': test_data.index + 1461, 'SalePrice': predictions})
submission.to_csv(basic_path + '/submission.csv', index=False)
print(submission.head())

模型的加载和保存

这一块我原本写了,但是后来发现不需要我就又删掉了,就留给大家自行思考了。