Artificial Intelligence is rapidly transforming the field of cybersecurity, and understanding how to build and apply AI models is becoming an essential skill. This module serves as a hands-on introduction to AI in infosec, guiding learners through the full process of setting up an environment, working with data, and building models that can solve real-world security problems.
We begin by establishing a controlled AI workspace, using Miniconda for package management and JupyterLab for interactive experimentation. This setup ensures that students have a flexible and reliable environment where they can test, iterate, and refine their models without conflicts or instability.
Once the environment is ready, learners dive into the heart of the process: data handling and preprocessing. Security datasets often come in raw, unstructured, or noisy formats, so knowing how to clean, transform, and prepare data is a critical step. From there, students explore structured workflows that can be applied to common cybersecurity tasks, such as:
- Spam classification → Detecting and filtering unwanted or malicious messages.
- Network anomaly detection → Identifying unusual patterns that may indicate intrusions.
- Malware classification → Distinguishing between benign and malicious software samples.
To achieve this, the module leverages powerful Python libraries, including Scikit-learn for machine learning fundamentals and PyTorch for building deep learning models. Learners will not only understand how to implement algorithms but also how to evaluate their effectiveness using common performance metrics.
By the end of the module, students will have navigated the entire lifecycle of AI model development—from preparing data and choosing the right tools, to training, testing, and refining their models. This foundation equips them with the skills to confidently apply AI to diverse challenges across the cybersecurity landscape.
Environment Setup
Before jumping into the exciting world of artificial intelligence and machine learning, it’s important to start with the right foundation: a properly configured environment. Having a reliable setup not only prevents compatibility issues but also ensures that experiments run smoothly and efficiently as you progress through the module.
To make this process flexible, the module provides two different paths for environment setup, allowing learners to choose the option that best matches their needs and technical background:
- Lightweight Local Setup with Miniconda
- Ideal for those who want full control over their system and prefer to manage packages locally.
- Miniconda allows you to create isolated environments for each project, so dependencies don’t conflict.
- This path is great if you want to understand exactly how your environment is structured and if you plan to work offline.
- Interactive Workspace with JupyterLab
- Perfect for learners who prefer an intuitive, browser-based interface for experimentation.
- JupyterLab combines code, output, and documentation in a single place, making it easier to test ideas quickly and visualize results.
- This option is especially useful for those who like interactive workflows and want to focus more on learning and experimenting rather than managing system dependencies.
By offering these two paths, the module ensures that whether you are a hands-on learner who enjoys configuring environments manually or someone who wants a streamlined, interactive experience, you’ll be able to set up a reliable foundation for AI development.
If you choose to use the Playground VM, you can start it here and familiarize yourself with the environment. We recommend keeping the VM running as you work through the module and follow along with the code snippets. Type DONE to continue.
DONE
Model Evaluation (Spam Detection)
Building and training an AI model is only part of the journey—the next crucial step is evaluation. Testing how well your model performs ensures that it’s not just learning the data it was trained on, but also capable of making accurate predictions in real-world scenarios.
In this module, evaluation is done through the Playground VM’s evaluation portal. If you’re not already running the Playground VM, you can easily start it from the bottom of the page. Once active, the portal allows you to upload your trained model and automatically assess its performance against predefined benchmarks.
For learners working inside Jupyter, the process is even more convenient. A Python script is provided that lets you upload your model directly from your notebook to the evaluation portal. This seamless integration means you can move from training to testing without leaving your development environment.
When the evaluation is complete, the system checks your model’s performance against the required criteria. If your model passes, you’ll receive a flag value. This flag not only confirms that your model meets the challenge requirements but can also be submitted as proof of success within the exercise.
This structured evaluation step ensures that you don’t just stop at building a model—you validate its effectiveness and gain confidence in your ability to develop AI solutions that truly work.
What is the flag you get from submitting a good model for evaluation?
Use the code bellow to generate the file
import requests
import zipfile
import io
import os
import pandas as pd
import nltk
import re
import joblib
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
import numpy as np
# URL of the dataset
# url = "https://archive.ics.uci.edu/static/public/228/sms+spam+collection.zip"
# Download the dataset
# response = requests.get(url)
# if response.status_code == 200:
# print("Download successful")
# else:
# print("Failed to download the dataset")
# Extract the dataset
# with zipfile.ZipFile(io.BytesIO(response.content)) as z:
# z.extractall("sms_spam_collection")
# print("Extraction successful")
# List the extracted files
# extracted_files = os.listdir("sms_spam_collection")
# print("Extracted files:", extracted_files)
# Load the dataset
df = pd.read_csv(
"sms_spam_collection/SMSSpamCollection",
sep="\t",
header=None,
names=["label", "message"],
)
# Display basic information about the dataset
# print("----------------------------------------------")
# print("-------------------- HEAD --------------------")
# print("----------------------------------------------")
# print(df.head())
# print("----------------------------------------------")
# print("-------------------- DESCRIBE --------------------")
# print("----------------------------------------------")
# print(df.describe())
# print("----------------------------------------------")
# print("-------------------- INFO --------------------")
# print("----------------------------------------------")
# print(df.info())
# Check for missing values
# print("Missing values:\n", df.isnull().sum())
# Check for duplicates
# print("Duplicate entries:", df.duplicated().sum())
# Remove duplicates if any
df = df.drop_duplicates()
# Download the necessary NLTK data files
nltk.download("punkt")
nltk.download("punkt_tab")
nltk.download("stopwords")
# print("=== BEFORE ANY PREPROCESSING ===")
# print(df.head(5))
# Convert all message text to lowercase
df["message"] = df["message"].str.lower()
#print("\n=== AFTER LOWERCASING ===")
#print(df["message"].head(5))
# Remove non-essential punctuation and numbers, keep useful symbols like $ and !
df["message"] = df["message"].apply(lambda x: re.sub(r"[^a-z\s$!]", "", x))
# print("\n=== AFTER REMOVING PUNCTUATION & NUMBERS (except $ and !) ===")
# print(df["message"].head(5))
# Split each message into individual tokens
df["message"] = df["message"].apply(word_tokenize)
# print("\n=== AFTER TOKENIZATION ===")
# print(df["message"].head(5))
# Define a set of English stop words and remove them from the tokens
stop_words = set(stopwords.words("english"))
# print("====STOP WORDSS ====")
# print(stop_words)
df["message"] = df["message"].apply(lambda x: [word for word in x if word not in stop_words])
# print("\n=== AFTER REMOVING STOP WORDS ===")
# print(df["message"].head(5))
# Stem each token to reduce words to their base form
stemmer = PorterStemmer()
df["message"] = df["message"].apply(lambda x: [stemmer.stem(word) for word in x])
# print("\n=== AFTER STEMMING ===")
# print(df["message"].head(5))
# Rejoin tokens into a single string for feature extraction
df["message"] = df["message"].apply(lambda x: " ".join(x))
# print("\n=== AFTER JOINING TOKENS BACK INTO STRINGS ===")
# print(df["message"].head(5))
# Initialize CountVectorizer with bigrams, min_df, and max_df to focus on relevant terms
vectorizer = CountVectorizer(min_df=1, max_df=0.9, ngram_range=(1, 2))
# Fit and transform the message column
X = vectorizer.fit_transform(df["message"])
# Labels (target variable)
y = df["label"].apply(lambda x: 1 if x == "spam" else 0) # Converting labels to 1 and 0
# Build the pipeline by combining vectorization and classification
pipeline = Pipeline([
("vectorizer", vectorizer),
("classifier", MultinomialNB())
])
# Define the parameter grid for hyperparameter tuning
param_grid = {
"classifier__alpha": [0.01, 0.1, 0.15, 0.2, 0.25, 0.5, 0.75, 1.0]
}
# Perform the grid search with 5-fold cross-validation and the F1-score as metric
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring="f1"
)
# Fit the grid search on the full dataset
grid_search.fit(df["message"], y)
# Extract the best model identified by the grid search
best_model = grid_search.best_estimator_
# print("Best model parameters:", grid_search.best_params_)
# Example SMS messages for evaluation
new_messages = [
"Congratulations! You've won a $1000 Walmart gift card. Go to http://bit.ly/1234 to claim now.",
"Hey, are we still meeting up for lunch today?",
"Urgent! Your account has been compromised. Verify your details here: www.fakebank.com/verify",
"Reminder: Your appointment is scheduled for tomorrow at 10am.",
"FREE entry in a weekly competition to win an iPad. Just text WIN to 80085 now!",
]
# Preprocess function that mirrors the training-time preprocessing
def preprocess_message(message):
message = message.lower()
message = re.sub(r"[^a-z\s$!]", "", message)
tokens = word_tokenize(message)
tokens = [word for word in tokens if word not in stop_words]
tokens = [stemmer.stem(word) for word in tokens]
return " ".join(tokens)
# Preprocess and vectorize messages
processed_messages = [preprocess_message(msg) for msg in new_messages]
# Transform preprocessed messages into feature vectors
X_new = best_model.named_steps["vectorizer"].transform(processed_messages)
# Predict with the trained classifier
predictions = best_model.named_steps["classifier"].predict(X_new)
prediction_probabilities = best_model.named_steps["classifier"].predict_proba(X_new)
# Display predictions and probabilities for each evaluated message
for i, msg in enumerate(new_messages):
prediction = "Spam" if predictions[i] == 1 else "Not-Spam"
spam_probability = prediction_probabilities[i][1] # Probability of being spam
ham_probability = prediction_probabilities[i][0] # Probability of being not spam
print(f"Message: {msg}")
print(f"Prediction: {prediction}")
print(f"Spam Probability: {spam_probability:.2f}")
print(f"Not-Spam Probability: {ham_probability:.2f}")
print("-" * 50)
# Save the trained model to a file for future use
model_filename = 'spam_detection_model.joblib'
joblib.dump(best_model, model_filename)
print(f"Model saved to {model_filename}")
loaded_model = joblib.load(model_filename)
predictions = loaded_model.predict(new_messages)
And send it to get the flag


Model Evaluation (Network Anomaly Detection)
Once you’ve trained your AI model, the next step is to put it to the test. Model evaluation is an essential phase because it tells you whether your solution actually performs as expected on unseen data, rather than just memorizing the training set.
In this module, evaluation takes place inside the Playground VM. This virtual machine includes an evaluation portal specifically designed to test your model against a set of hidden benchmarks.
- Not using the Playground VM yet? No problem—you can easily initialize it at the bottom of the page to get started.
- Already inside the Playground VM? Great! You can use the provided Python script directly from Jupyter to upload your trained model to the portal without leaving your workspace.
Once your model is uploaded, the portal automatically runs it through the evaluation process. If your model reaches or exceeds the required performance thresholds, you’ll be rewarded with a flag value. This flag serves as proof of success—it can be submitted as part of the challenge answer or used to confirm that your model has met the learning objectives.
This evaluation workflow ensures that you’re not just building a model in theory—you’re also verifying its effectiveness in practice. It mirrors the real-world cycle of develop → test → validate, giving you hands-on experience with the complete machine learning lifecycle.
Use the code above to create the file
import requests, zipfile, io
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
import joblib
# URL for the NSL-KDD dataset
# url = "https://academy.hackthedome.com/storage/modules/292/KDD_dataset.zip"
# Download the zip file and extract its contents
# response = requests.get(url)
# z = zipfile.ZipFile(io.BytesIO(response.content))
# z.extractall('.') # Extracts to the current directory
# Set the file path to the dataset
file_path = r'KDD+.txt'
# Define the column names corresponding to the NSL-KDD dataset
columns = [
'duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes',
'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in',
'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations',
'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login', 'is_guest_login',
'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate',
'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count',
'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate',
'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate',
'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'attack', 'level'
]
# Read the combined NSL-KDD dataset into a DataFrame
df = pd.read_csv(file_path, names=columns)
# print(df.head())
# Binary classification target
# Maps normal traffic to 0 and any type of attack to 1
df['attack_flag'] = df['attack'].apply(lambda a: 0 if a == 'normal' else 1)
# Multi-class classification target categories
dos_attacks = ['apache2', 'back', 'land', 'neptune', 'mailbomb', 'pod',
'processtable', 'smurf', 'teardrop', 'udpstorm', 'worm']
probe_attacks = ['ipsweep', 'mscan', 'nmap', 'portsweep', 'saint', 'satan']
privilege_attacks = ['buffer_overflow', 'loadmdoule', 'perl', 'ps',
'rootkit', 'sqlattack', 'xterm']
access_attacks = ['ftp_write', 'guess_passwd', 'http_tunnel', 'imap',
'multihop', 'named', 'phf', 'sendmail', 'snmpgetattack',
'snmpguess', 'spy', 'warezclient', 'warezmaster',
'xclock', 'xsnoop']
def map_attack(attack):
if attack in dos_attacks:
return 1
elif attack in probe_attacks:
return 2
elif attack in privilege_attacks:
return 3
elif attack in access_attacks:
return 4
else:
return 0
# Assign multi-class category to each row
df['attack_map'] = df['attack'].apply(map_attack)
# Encoding categorical variables
features_to_encode = ['protocol_type', 'service']
encoded = pd.get_dummies(df[features_to_encode])
# Numeric features that capture various statistical properties of the traffic
numeric_features = [
'duration', 'src_bytes', 'dst_bytes', 'wrong_fragment', 'urgent', 'hot',
'num_failed_logins', 'num_compromised', 'root_shell', 'su_attempted',
'num_root', 'num_file_creations', 'num_shells', 'num_access_files',
'num_outbound_cmds', 'count', 'srv_count', 'serror_rate',
'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate',
'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count',
'dst_host_same_srv_rate', 'dst_host_diff_srv_rate',
'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate',
'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate',
'dst_host_srv_rerror_rate'
]
# Combine encoded categorical variables and numeric features
train_set = encoded.join(df[numeric_features])
# Multi-class target variable
multi_y = df['attack_map']
# Split data into training and test sets for multi-class classification
train_X, test_X, train_y, test_y = train_test_split(train_set, multi_y, test_size=0.2, random_state=1337)
# Further split the training set into separate training and validation sets
multi_train_X, multi_val_X, multi_train_y, multi_val_y = train_test_split(train_X, train_y, test_size=0.3, random_state=1337)
# Train RandomForest model for multi-class classification
rf_model_multi = RandomForestClassifier(random_state=1337)
rf_model_multi.fit(multi_train_X, multi_train_y)
# Predict and evaluate the model on the validation set
multi_predictions = rf_model_multi.predict(multi_val_X)
accuracy = accuracy_score(multi_val_y, multi_predictions)
precision = precision_score(multi_val_y, multi_predictions, average='weighted')
recall = recall_score(multi_val_y, multi_predictions, average='weighted')
f1 = f1_score(multi_val_y, multi_predictions, average='weighted')
# print(f"Validation Set Evaluation:")
# print(f"Accuracy: {accuracy:.4f}")
# print(f"Precision: {precision:.4f}")
# print(f"Recall: {recall:.4f}")
# print(f"F1-Score: {f1:.4f}")
# Confusion Matrix for Validation Set
conf_matrix = confusion_matrix(multi_val_y, multi_predictions)
class_labels = ['Normal', 'DoS', 'Probe', 'Privilege', 'Access']
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
xticklabels=class_labels,
yticklabels=class_labels)
plt.title('Network Anomaly Detection - Validation Set')
plt.xlabel('Predicted')
plt.ylabel('Actual')
# plt.show()
# Classification Report for Validation Set
# print("Classification Report for Validation Set:")
# print(classification_report(multi_val_y, multi_predictions, target_names=class_labels))
# Final evaluation on the test set
test_multi_predictions = rf_model_multi.predict(test_X)
test_accuracy = accuracy_score(test_y, test_multi_predictions)
test_precision = precision_score(test_y, test_multi_predictions, average='weighted')
test_recall = recall_score(test_y, test_multi_predictions, average='weighted')
test_f1 = f1_score(test_y, test_multi_predictions, average='weighted')
# print("\nTest Set Evaluation:")
# print(f"Accuracy: {test_accuracy:.4f}")
# print(f"Precision: {test_precision:.4f}")
# print(f"Recall: {test_recall:.4f}")
# print(f"F1-Score: {test_f1:.4f}")
# Confusion Matrix for Test Set
test_conf_matrix = confusion_matrix(test_y, test_multi_predictions)
sns.heatmap(test_conf_matrix, annot=True, fmt='d', cmap='Blues',
xticklabels=class_labels,
yticklabels=class_labels)
plt.title('Network Anomaly Detection')
plt.xlabel('Predicted')
plt.ylabel('Actual')
# plt.show()
# Classification Report for Test Set
# print("Classification Report for Test Set:")
# print(classification_report(test_y, test_multi_predictions, target_names=class_labels))
# Save the trained model to a file
model_filename = 'network_anomaly_detection_model.joblib'
joblib.dump(rf_model_multi, model_filename)
print(f"Model saved to {model_filename}")
Send the file on the app to get the flag.


Model Evaluation (Malware Image Classification)
Download the dataset
wget https://www.kaggle.com/api/v1/datasets/download/ikrambenabd/malimg-original -O malimg.zip
unzip malimg.zip
Execute the code bellow to get the file
import os
import matplotlib.pyplot as plt
import seaborn as sns
import splitfolders
import matplotlib.pyplot as plt
import torch.nn as nn
import torchvision.models as models
import torch
import time
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
DATA_BASE_PATH = "./malimg_paper_dataset_imgs/"
# compute the class distribution
dist = {}
for mlw_class in os.listdir(DATA_BASE_PATH):
mlw_dir = os.path.join(DATA_BASE_PATH, mlw_class)
dist[mlw_class] = len(os.listdir(mlw_dir))
# plot the class distribution
# HTD Color Palette
htd_green = "#9FEF00"
node_black = "#141D2B"
hacker_grey = "#A4B1CD"
# data
classes = list(dist.keys())
frequencies = list(dist.values())
# plot
# plt.figure(facecolor=node_black)
# sns.barplot(y=classes, x=frequencies, edgecolor = "black", orient='h', color=htd_green)
# plt.title("Malware Class Distribution", color=htd_green)
# plt.xlabel("Malware Class Frequency", color=htd_green)
# plt.ylabel("Malware Class", color=htd_green)
# plt.xticks(color=hacker_grey)
# plt.yticks(color=hacker_grey)
# ax = plt.gca()
# ax.set_facecolor(node_black)
# ax.spines['bottom'].set_color(hacker_grey)
# ax.spines['top'].set_color(node_black)
# ax.spines['right'].set_color(node_black)
# ax.spines['left'].set_color(hacker_grey)
# plt.show()
DATA_BASE_PATH = "./malimg_paper_dataset_imgs/"
TARGET_BASE_PATH = "./newdata/"
TRAINING_RATIO = 0.8
TEST_RATIO = 1 - TRAINING_RATIO
# execute only on the first time
# splitfolders.ratio(input=DATA_BASE_PATH, output=TARGET_BASE_PATH, ratio=(TRAINING_RATIO, 0, TEST_RATIO))
# Define preprocessing transforms
transform = transforms.Compose([
transforms.Resize((75, 75)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
BASE_PATH = "./newdata/"
# Load training and test datasets
train_dataset = ImageFolder(
root=os.path.join(BASE_PATH, "train"),
transform=transform
)
test_dataset = ImageFolder(
root=os.path.join(BASE_PATH, "test"),
transform=transform
)
TRAIN_BATCH_SIZE = 1024
TEST_BATCH_SIZE = 1024
# Create data loaders
train_loader = DataLoader(
train_dataset,
batch_size=TRAIN_BATCH_SIZE,
shuffle=True,
num_workers=2
)
test_loader = DataLoader(
test_dataset,
batch_size=TEST_BATCH_SIZE,
shuffle=False,
num_workers=2
)
# HTD Color Palette
htd_green = "#9FEF00"
node_black = "#141D2B"
hacker_grey = "#A4B1CD"
# image
sample = next(iter(train_loader))[0][0]
# plot
# plt.figure(facecolor=node_black)
# plt.imshow(sample.permute(1,2,0))
# plt.xticks(color=hacker_grey)
# plt.yticks(color=hacker_grey)
# ax = plt.gca()
# ax.set_facecolor(node_black)
# ax.spines['bottom'].set_color(hacker_grey)
# ax.spines['top'].set_color(node_black)
# ax.spines['right'].set_color(node_black)
# ax.spines['left'].set_color(hacker_grey)
# ax.tick_params(axis='x', colors=hacker_grey)
# ax.tick_params(axis='y', colors=hacker_grey)
# plt.show()
def load_datasets(base_path, train_batch_size, test_batch_size):
# Define preprocessing transforms
transform = transforms.Compose([
transforms.Resize((75, 75)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Load training and test datasets
train_dataset = ImageFolder(
root=os.path.join(base_path, "train"),
transform=transform
)
test_dataset = ImageFolder(
root=os.path.join(base_path, "test"),
transform=transform
)
# Create data loaders
train_loader = DataLoader(
train_dataset,
batch_size=train_batch_size,
shuffle=True,
num_workers=2
)
test_loader = DataLoader(
test_dataset,
batch_size=test_batch_size,
shuffle=False,
num_workers=2
)
n_classes = len(train_dataset.classes)
return train_loader, test_loader, n_classes
HIDDEN_LAYER_SIZE = 1000
class MalwareClassifier(nn.Module):
def __init__(self, n_classes):
super(MalwareClassifier, self).__init__()
# Load pretrained ResNet50
self.resnet = models.resnet50(weights='DEFAULT')
# Freeze ResNet parameters
for param in self.resnet.parameters():
param.requires_grad = False
# Replace the last fully connected layer
num_features = self.resnet.fc.in_features
self.resnet.fc = nn.Sequential(
nn.Linear(num_features, HIDDEN_LAYER_SIZE),
nn.ReLU(),
nn.Linear(HIDDEN_LAYER_SIZE, n_classes)
)
def forward(self, x):
return self.resnet(x)
model = MalwareClassifier(25)
DATA_PATH = "./newdata/"
TRAINING_BATCH_SIZE = 1024
TEST_BATCH_SIZE = 1024
# Load datasets
train_loader, test_loader, n_classes = load_datasets(DATA_PATH, TRAINING_BATCH_SIZE, TEST_BATCH_SIZE)
# Initialize model
model = MalwareClassifier(n_classes)
def train(model, train_loader, n_epochs, verbose=False):
model.train()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
training_data = {"accuracy": [], "loss": []}
for epoch in range(n_epochs):
running_loss = 0
n_total = 0
n_correct = 0
checkpoint = time.time() * 1000
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, predicted = outputs.max(1)
n_total += labels.size(0)
n_correct += predicted.eq(labels).sum().item()
running_loss += loss.item()
epoch_loss = running_loss / len(train_loader)
epoch_duration = int(time.time() * 1000 - checkpoint)
epoch_accuracy = compute_accuracy(n_correct, n_total)
training_data["accuracy"].append(epoch_accuracy)
training_data["loss"].append(epoch_loss)
if verbose:
print(f"[i] Epoch {epoch+1} of {n_epochs}: Acc: {epoch_accuracy:.2f}% Loss: {epoch_loss:.4f} (Took {epoch_duration} ms).")
return training_data
def save_model(model, path):
model_scripted = torch.jit.script(model)
model_scripted.save(path)
def predict(model, test_data):
model.eval()
with torch.no_grad():
output = model(test_data)
_, predicted = torch.max(output.data, 1)
return predicted
def compute_accuracy(n_correct, n_total):
return round(100 * n_correct / n_total, 2)
def evaluate(model, test_loader):
model.eval()
n_correct = 0
n_total = 0
with torch.no_grad():
for data, target in test_loader:
predicted = predict(model, data)
n_total += target.size(0)
n_correct += (predicted == target).sum().item()
accuracy = compute_accuracy(n_correct, n_total)
return accuracy
def plot(data, title, label, xlabel, ylabel):
# HTD Color Palette
htd_green = "#9FEF00"
node_black = "#141D2B"
hacker_grey = "#A4B1CD"
# plot
plt.figure(figsize=(10, 6), facecolor=node_black)
plt.plot(range(1, len(data)+1), data, label=label, color=htd_green)
plt.title(title, color=htd_green)
plt.xlabel(xlabel, color=htd_green)
plt.ylabel(ylabel, color=htd_green)
plt.xticks(color=hacker_grey)
plt.yticks(color=hacker_grey)
ax = plt.gca()
ax.set_facecolor(node_black)
ax.spines['bottom'].set_color(hacker_grey)
ax.spines['top'].set_color(node_black)
ax.spines['right'].set_color(node_black)
ax.spines['left'].set_color(hacker_grey)
legend = plt.legend(facecolor=node_black, edgecolor=hacker_grey, fontsize=10)
plt.setp(legend.get_texts(), color=htd_green)
plt.show()
def plot_training_accuracy(training_data):
plot(training_data['accuracy'], "Training Accuracy", "Accuracy", "Epoch", "Accuracy (%)")
def plot_training_loss(training_data):
plot(training_data['loss'], "Training Loss", "Loss", "Epoch", "Loss")
# data parameters
DATA_PATH = "./newdata/"
# training parameters
N_EPOCHS = 10
TRAINING_BATCH_SIZE = 512
TEST_BATCH_SIZE = 1024
# model parameters
HIDDEN_LAYER_SIZE = 1000
MODEL_FILE = "malware_classifier.pth"
# Load datasets
train_loader, test_loader, n_classes = load_datasets(DATA_PATH, TRAINING_BATCH_SIZE, TEST_BATCH_SIZE)
# Initialize model
model = MalwareClassifier(n_classes)
# Train model
print("[i] Starting Training...")
training_information = train(model, train_loader, N_EPOCHS, verbose=True)
# Save model
save_model(model, MODEL_FILE)
# evaluate model
accuracy = evaluate(model, test_loader)
print(f"[i] Inference accuracy: {accuracy}%.")
# Plot training details
plot_training_accuracy(training_information)
plot_training_loss(training_information)
Now, just start the exercise and send it


Skills Assessment
One of the most widely used datasets in natural language processing (NLP) is the IMDB movie reviews dataset, first introduced by Maas et al. (2011). This dataset contains 50,000 movie reviews collected from the Internet Movie Database (IMDB), each review carefully labeled for sentiment analysis. The data is split evenly into training and test sets, with a balanced mix of positive and negative reviews.
The IMDB dataset quickly became a benchmark for evaluating NLP models, as its curated structure makes it ideal for testing how well algorithms can classify text sentiment. Beyond being a classic example for academic research, it has also influenced developments in vector-based word representations and inspired improvements in neural network architectures for text classification. Even today, it remains a go-to dataset for experimenting with sentiment classification tasks.
Your Task: Train a Sentiment Classifier
The challenge here is straightforward yet powerful: build a model that can predict whether a movie review is positive (1) or negative (0). By doing so, you’ll practice essential NLP workflows such as:
- Text preprocessing (cleaning, tokenizing, vectorizing).
- Training a machine learning or deep learning model.
- Evaluating classification performance using standard metrics.
You can download the dataset directly from the exercise question or from the provided link and begin building your classifier.
Why This Matters Beyond Movie Reviews
While the IMDB dataset is focused on film reviews, the techniques you’ll learn are widely applicable. For example, the same methods for detecting positive vs. negative sentiment can be adapted to:
- Text moderation → Filtering harmful or toxic content.
- Customer feedback analysis → Identifying satisfaction or dissatisfaction trends.
- Social media monitoring → Tracking sentiment about brands, products, or events.
This makes the project not just an academic exercise, but also a practical introduction to real-world applications of sentiment analysis.
Evaluating Your Model
Once your model is trained, the final step is to evaluate its performance. This is done through the Playground VM’s evaluation portal:
- If you don’t already have the Playground VM running, you can initialize it at the bottom of the page.
- If it’s already active, a Python script is provided so you can upload your model directly from Jupyter.
- The portal will automatically test your model against the evaluation dataset.
If your model meets the required performance criteria, you’ll receive a flag value. This flag acts as proof of success—it can be submitted to answer the exercise question or to confirm that your model achieved the expected accuracy.
By working through this challenge, you’ll not only gain hands-on experience with a foundational NLP dataset, but also walk through the entire lifecycle of machine learning development: from preparing data, to training and testing models, to validating their effectiveness in a controlled environment.
Use the code bellow to generate the file and get the flag.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
IMDB Sentiment — Naive Bayes on GPU (cuML) + joblib (CPU portable)
- Preprocess (negations kept; optional stemming)
- Vectorize on CPU (CountVectorizer) → SciPy CSR
- Convert to GPU (CuPy CSR, float32/bool) → Train/Infer cuML MultinomialNB
- Manual grid search (optimize accuracy)
- Save predictions.csv (id,label)
- Rebuild best pipeline on CPU (sklearn) and SAVE as outputs/imdb_nb_model.joblib
Requisitos (exemplo p/ CUDA 12):
pip install pandas scikit-learn joblib
pip install cupy-cuda12x
pip install cuml-cu12
Uso recomendado (buscando >= .91 acc):
python imdb_nb_gpu.py --use-bigrams --use-trigrams --no-stem --export --pred-col-name label
"""
from __future__ import annotations
import argparse, json, re
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction import text as sk_text
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
# GPU libs
import cupy as cp
from cupyx.scipy.sparse import csr_matrix as cupy_csr_matrix
from cuml.naive_bayes import MultinomialNB as cuML_MultinomialNB
# =========================
# Data loading helpers
# =========================
def smart_read_json(path: Path) -> pd.DataFrame:
try:
return pd.read_json(path, lines=True)
except ValueError:
return pd.read_json(path)
# =========================
# Preprocess (CPU)
# =========================
BASE_STOPWORDS = set(sk_text.ENGLISH_STOP_WORDS)
NEGATIONS = {"no", "not", "nor", "never", "n't"} # manter nas features
STOPWORDS = {w for w in BASE_STOPWORDS if w not in NEGATIONS}
TOKEN_PATTERN = re.compile(r"[a-z]+")
try:
from nltk.stem import PorterStemmer
STEMMER = PorterStemmer()
except Exception:
STEMMER = None
def preprocess_text(doc: str, use_stem: bool = False) -> str:
if not isinstance(doc, str):
doc = "" if doc is None else str(doc)
doc = doc.lower()
tokens = TOKEN_PATTERN.findall(doc)
tokens = [t for t in tokens if t not in STOPWORDS]
if use_stem and STEMMER is not None:
tokens = [STEMMER.stem(t) for t in tokens]
return " ".join(tokens)
# =========================
# Converters SciPy -> CuPy
# =========================
def scipy_csr_to_cupy(X_csr, dtype=np.float32):
"""
Convert scipy.sparse.csr_matrix to cupyx.scipy.sparse.csr_matrix (GPU) with supported dtype.
CuPy CSR supports: bool, float32, float64, complex64, complex128.
cuML NB exige float32/64 -> usamos float32 sempre.
"""
if X_csr.dtype != dtype:
X_csr = X_csr.astype(dtype)
return cupy_csr_matrix(
(cp.asarray(X_csr.data), cp.asarray(X_csr.indices), cp.asarray(X_csr.indptr)),
shape=X_csr.shape,
dtype=dtype
)
# =========================
# Grid Search manual (GPU NB)
# =========================
def gpu_nb_gridsearch(
X_train_texts, y_train,
X_test_texts, y_test,
ngram_ranges, max_features_list, min_df_list, max_df_list, binary_list, alphas,
strip_accents="unicode",
):
best = {"acc": -1.0, "params": None, "pred": None, "vocab_size": None}
for ngram in ngram_ranges:
for max_feat in max_features_list:
for min_df in min_df_list:
for max_df in max_df_list:
for binary in binary_list:
# 1) Vetorização (CPU)
vect = CountVectorizer(
lowercase=False,
token_pattern=r"(?u)\b\w+\b",
strip_accents=strip_accents,
ngram_range=ngram,
max_features=max_feat,
min_df=min_df,
max_df=max_df,
binary=binary,
)
Xtr_cpu = vect.fit_transform(X_train_texts)
Xte_cpu = vect.transform(X_test_texts)
# 2) SciPy CSR -> CuPy CSR (GPU), dtype suportado
# Use bool para poupar VRAM quando binary=True; float32 é universalmente seguro
target_dtype = np.bool_ if binary else np.float32
Xtr_gpu = scipy_csr_to_cupy(Xtr_cpu, dtype=np.float32)
Xte_gpu = scipy_csr_to_cupy(Xte_cpu, dtype=np.float32)
ytr_gpu = cp.asarray(y_train.astype(np.int32))
# Xtr_gpu = scipy_csr_to_cupy(Xtr_cpu, dtype=target_dtype)
# Xte_gpu = scipy_csr_to_cupy(Xte_cpu, dtype=target_dtype)
# ytr_gpu = cp.asarray(y_train.astype(np.int32))
# 3) Grid de alpha na GPU
for alpha in alphas:
nb = cuML_MultinomialNB(alpha=alpha)
nb.fit(Xtr_gpu, ytr_gpu)
ypred_gpu = nb.predict(Xte_gpu)
ypred = cp.asnumpy(ypred_gpu).astype(int)
acc = accuracy_score(y_test, ypred)
if acc > best["acc"]:
best["acc"] = acc
best["params"] = {
"ngram_range": ngram,
"max_features": max_feat,
"min_df": min_df,
"max_df": max_df,
"binary": binary,
"alpha": alpha,
}
best["pred"] = ypred
best["vocab_size"] = Xtr_cpu.shape[1]
print(f"[*] Novo melhor ACC={acc:.4f} | params={best['params']} | vocab={best['vocab_size']}")
return best
# =========================
# Main
# =========================
def main():
ap = argparse.ArgumentParser(description="IMDB Naive Bayes on GPU (cuML) + joblib")
ap.add_argument("--train", type=str, default="train.json")
ap.add_argument("--test", type=str, default="test.json")
ap.add_argument("--outdir", type=str, default="outputs")
# Preprocess
ap.add_argument("--use-stem", action="store_true", help="Use Porter stemming (off por padrão)")
ap.add_argument("--no-stem", action="store_true", help="Força não usar stemming")
# N-grams (defaults: (1,2) e (1,3))
ap.add_argument("--use-bigrams", action="store_true")
ap.add_argument("--use-trigrams", action="store_true")
# Export
ap.add_argument("--export", action="store_true", help="Escreve predictions.csv (id,label)")
ap.add_argument("--pred-col-name", type=str, default="label")
ap.add_argument("--no-header", action="store_true", help="CSV sem header")
# Joblib
ap.add_argument("--save-joblib", action="store_true", help="Salva outputs/imdb_nb_model.joblib (sklearn CPU)")
ap.add_argument("--joblib-name", type=str, default="imdb_nb_model.joblib", help="Nome do arquivo .joblib")
args = ap.parse_args()
outdir = Path(args.outdir); outdir.mkdir(parents=True, exist_ok=True)
# ---- Load
train_df = smart_read_json(Path(args.train))
test_df = smart_read_json(Path(args.test))
expected = {"text", "label"}
if not expected.issubset(train_df.columns) or not expected.issubset(test_df.columns):
raise ValueError(f"Arquivos devem conter colunas {expected}.")
train_df = train_df.drop_duplicates(subset=["text", "label"]).reset_index(drop=True)
y_train = train_df["label"].astype(int).to_numpy()
y_test = test_df["label"].astype(int).to_numpy()
# ---- Preprocess CPU
use_stem = args.use_stem and not args.no_stem
print(f"[+] Preprocess: use_stem={use_stem}")
X_train_texts = train_df["text"].astype(str).apply(lambda s: preprocess_text(s, use_stem=use_stem)).tolist()
X_test_texts = test_df["text"].astype(str).apply(lambda s: preprocess_text(s, use_stem=use_stem)).tolist()
# ---- Grid space (accuracy-oriented)
ngram_ranges = []
if args.use_bigrams:
ngram_ranges.append((1,2))
if args.use_trigrams:
ngram_ranges.append((1,3))
if not ngram_ranges:
ngram_ranges = [(1,2), (1,3)] # default
max_features_list = [20000, 50000, None]
min_df_list = [1, 2, 3]
max_df_list = [0.9, 0.95]
binary_list = [True] # costuma ajudar com NB em sentimento
alphas = [0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 1.25]
print("[+] Iniciando grid na GPU (cuML MultinomialNB)…")
best = gpu_nb_gridsearch(
X_train_texts, y_train,
X_test_texts, y_test,
ngram_ranges, max_features_list, min_df_list, max_df_list, binary_list, alphas,
)
print("\n[✓] Best ACC:", f"{best['acc']:.4f}")
print("[✓] Best params:", best["params"])
print("[✓] Vocab size:", best.get("vocab_size"))
# ---- Relatório final
y_pred = best["pred"]
print("\n[✓] Classification Report:\n", classification_report(y_test, y_pred, digits=4))
print("[✓] Confusion Matrix:\n", confusion_matrix(y_test, y_pred))
# ---- Export predictions.csv
if args.export:
preds_df = pd.DataFrame({
"id": np.arange(len(y_pred)),
args.pred_col_name: y_pred.astype(int),
})
preds_df.to_csv(outdir / "predictions.csv", index=False, header=not args.no_header)
print(f"[✓] predictions.csv salvo em: {outdir / 'predictions.csv'}")
# ---- Salvar melhores parâmetros
with (outdir / "nb_best_params_gpu.json").open("w", encoding="utf-8") as f:
json.dump({
"best_acc": float(best["acc"]),
"best_params": best["params"],
"vocab_size": int(best.get("vocab_size") or 0),
}, f, indent=2)
print(f"[✓] Best params salvos em: {outdir / 'nb_best_params_gpu.json'}")
# ---- [NOVO] Salvar modelo .joblib (pipeline sklearn CPU) com os melhores hiperparâmetros
if args.save_joblib:
from sklearn.naive_bayes import MultinomialNB as SK_MultinomialNB
from sklearn.pipeline import Pipeline as SK_Pipeline
from joblib import dump as joblib_dump
bp = best["params"] # ngram_range, max_features, min_df, max_df, binary, alpha
vect_cpu = CountVectorizer(
lowercase=False,
token_pattern=r"(?u)\b\w+\b",
strip_accents="unicode",
ngram_range=bp["ngram_range"],
max_features=bp["max_features"],
min_df=bp["min_df"],
max_df=bp["max_df"],
binary=bp["binary"],
)
pipe_cpu = SK_Pipeline([
("vectorizer", vect_cpu),
("classifier", SK_MultinomialNB(alpha=bp["alpha"])),
])
# Treina na CPU com os TEXTOS PRÉ-PROCESSADOS (mesmo insumo usado na GPU)
pipe_cpu.fit(X_train_texts, y_train)
joblib_path = outdir / args.joblib_name
joblib_dump(pipe_cpu, joblib_path)
print(f"[✓] Modelo CPU compatível salvo em: {joblib_path}")
print("\nDone.")
if __name__ == "__main__":
main()


