加密流量分类-实践1: 1D-CNN模型训练与测试
本文于 597 天之前发表,文中内容可能已经过时。
模型:model.py
论文参数:
import torch.nn as nn
import torch
import torch.nn.functional as F
class OneCNNC(nn.Module):
def __init__(self,label_num):
super(OneCNNC,self).__init__()
self.layer_1 = nn.Sequential(
# 输入784*1
nn.Conv2d(1,32,(1,25),1,padding='same'),
nn.ReLU(),
# 输出262*32
nn.MaxPool2d((1, 3), 3, padding=(0,1)),
)
self.layer_2 = nn.Sequential(
# 输入262*32
nn.Conv2d(32,64,(1,25),1,padding='same'),
nn.ReLU(),
# 输入262*64
nn.MaxPool2d((1, 3), 3, padding=(0,1))
)
self.fc1=nn.Sequential(
# 输入88*64
nn.Flatten(),
nn.Linear(88*64,1024),
# 自主加了两个dropout层
nn.Dropout(p=0.5),
nn.Linear(1024,label_num),
nn.Dropout(p=0.3)
)
def forward(self,x):
# print("x.shape:",x.shape)
x=self.layer_1(x)
# print("x.shape:",x.shape)
x=self.layer_2(x)
# print("x.shape:",x.shape)
x=self.fc1(x)
# print("x.shape:",x.shape)
return x
# x=torch.tensor([[1, 1, 0, 1, 2, 3],
# [1, 1, 4, 5, 6, 7],
# [1, 10, 8, 9, 10, 11]],dtype=torch.float32)
# x=x.reshape(1,3,-1)
# out_tensor=F.max_pool2d(x,(3,1),stride=3,padding=0)
# print(out_tensor)
数据部分:
数据读取:data.py
import os
from torch.utils.data import Dataset
import gzip
import numpy as np
class DealDataset(Dataset):
"""
读取数据、初始化数据
"""
def __init__(self, folder, data_name, label_name, transform=None):
(train_set, train_labels) = load_data(folder, data_name,label_name)
self.train_set = train_set
self.train_labels = train_labels
self.transform = transform
def __getitem__(self, index):
img, target = self.train_set[index], int(self.train_labels[index])
# 这里要copy一下不然会报错
img=img.copy()
# 28*28 -> 764
img=img.reshape(1,1,-1)
# target=target.copy()
if self.transform is not None:
img = self.transform(img)
return img, target
def __len__(self):
return len(self.train_set)
def load_data(data_folder, data_name, label_name):
with gzip.open(os.path.join(data_folder, label_name), 'rb') as lbpath:
y_train = np.frombuffer(lbpath.read(), np.uint8, offset=8)
with gzip.open(os.path.join(data_folder, data_name), 'rb') as imgpath:
x_train = np.frombuffer(
imgpath.read(), np.uint8, offset=16).reshape(len(y_train), 28, 28)
return (x_train, y_train)
模型训练主模块:
from random import shuffle
import time
import sys
import torch.nn as nn
import numpy as np
import os
import torchvision
from model import OneCNN,CNNImage,OneCNNC
from torchvision import datasets,transforms
import gzip
import torch
from data import DealDataset
def main():
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 设置超参数
batch_size = 50
lr = 1.0e-4
num_epochs = 40
label_num = 12
# 导入数据
folder_path_list=[
r"2.encrypted_traffic_classification/3.PerprocessResults/12class/FlowAllLayers",
r"2.encrypted_traffic_classification/3.PerprocessResults/12class/FlowL7",
r"2.encrypted_traffic_classification/3.PerprocessResults/12class/SessionAllLayers",
r"2.encrypted_traffic_classification/3.PerprocessResults/12class/SessionL7"
]
# 选择哪个数据集
task_index = 0
folder_path = folder_path_list[task_index]
train_data_path = "train-images-idx3-ubyte.gz"
train_label_path = "train-labels-idx1-ubyte.gz"
test_data_path = "t10k-images-idx3-ubyte.gz"
test_label_path = "t10k-labels-idx1-ubyte.gz"
trainDataset = DealDataset(folder_path,train_data_path,train_label_path)
testDataset = DealDataset(folder_path,test_data_path,test_label_path
train_loader = torch.utils.data.DataLoader(
dataset=trainDataset,
batch_size=batch_size,
shuffle=True
)
test_loader = torch.utils.data.DataLoader(
dataset=testDataset,
batch_size=batch_size,
shuffle=False
)
# 定义模型
model = OneCNNC(label_num)
model = model.to(device)
# model = CNNImage()
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
# Train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(train_loader):
# images=images.reshape(-1,1,28,28)
images = images.to(device)
labels = labels.to(device)
# Forward pass
outputs = model(images.to(torch.float32))
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i+1) % 100 == 0:
print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
.format(epoch+1, num_epochs, i+1, total_step, loss.item()))
# Test the model
model.eval()
with torch.no_grad():
correct = 0
total = 0
test_length = len(testDataset)
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images.to(torch.float32))
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Test Accuracy of the model on the {} test images: {} %'.format(test_length,100 * correct / total))
# Save the model checkpoint
torch.save(model.state_dict(), 'model.ckpt')
if __name__ == '__main__':
main()
运行结果:
项目地址:https://github.com/lulu-cloud/Pytorch-Encrypted-Traffic-Classification-with-1D_CNN