Firewall Dashboard
Module Code:
import sys
import subprocess
import os
from PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout, QPushButton,
QLabel, QHBoxLayout, QFrame, QMessageBox)
from PyQt5.QtCore import Qt, QPropertyAnimation, QEasingCurve
from PyQt5.QtGui import QColor, QFont, QPalette, QPixmap, QPainter
class PythonControlApp(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
self.processes = {}
def init_ui(self):
# Fixed resolution
self.setWindowTitle("Protectify:Advance Firewall with DPI")
self.setFixedSize(1100, 720)
# Clean background
self.setStyleSheet("""
QWidget {
background-color: #FAFBFC;
font-family: 'Inter';
color: #1E2A44;
}
""")
# Main layout
main_layout = QVBoxLayout()
main_layout.setSpacing(15)
main_layout.setContentsMargins(40, 20, 40, 20)
# Title
title = QLabel("Protectify: Advance Firewall with DPI")
title.setFont(QFont("Inter", 22, QFont.Bold))
title.setStyleSheet("""
color: #FFFFFF;
background-color: #3B82F6;
padding: 10px;
border-radius: 6px;
""")
title.setAlignment(Qt.AlignCenter)
title.setFixedHeight(45)
main_layout.addWidget(title)
# Content split layout
content_frame = QFrame()
content_layout = QHBoxLayout(content_frame)
content_layout.setSpacing(20)
# Left: Team info
team_frame = QFrame()
team_frame.setStyleSheet("""
background-color: #FFFFFF;
border-radius: 5px;
border: 1px solid #E5E7EB;
padding: 8px;
""")
team_frame.setFixedWidth(600)
team_layout = QVBoxLayout(team_frame)
team_layout.setSpacing(10)
# Supervisor: Zain
zain_frame = QFrame()
zain_layout = QHBoxLayout(zain_frame)
zain_layout.setSpacing(15)
zain_pic = self.load_profile_picture("zain.jpg", 70, circular=True)
zain_layout.addWidget(zain_pic)
zain_info = QLabel("Supervised by: Engr. Zain Ul Abideen Akhter \n(Assistant Professor The Islamia University\nof Bahawalpur)")
zain_info.setFont(QFont("Inter", 12))
zain_info.setStyleSheet("color: #374151;")
zain_info.setFixedWidth(400)
zain_info.setWordWrap(True)
zain_layout.addWidget(zain_info)
team_layout.addWidget(zain_frame)
# Developers section (minimized spacing, smaller text)
developers_frame = QFrame()
developers_layout = QHBoxLayout(developers_frame)
developers_layout.setSpacing(7)
# Asad
asad_frame = QFrame()
asad_layout = QVBoxLayout(asad_frame)
asad_layout.setSpacing(1)
asad_pic = self.load_profile_picture("asad.jpg", 60, circular=False)
asad_layout.addWidget(asad_pic)
asad_info = QLabel("Asad Ali Akbar Shirazi\nF21BINCE1M03031")
asad_info.setFont(QFont("Inter", 8)) # Smaller text
asad_info.setStyleSheet("color: #374151;")
asad_info.setAlignment(Qt.AlignCenter)
asad_info.setFixedWidth(140)
asad_info.setWordWrap(True)
asad_layout.addWidget(asad_info)
developers_layout.addWidget(asad_frame)
# Mubashir
mubashir_frame = QFrame()
mubashir_layout = QVBoxLayout(mubashir_frame)
mubashir_layout.setSpacing(1)
mubashir_pic = self.load_profile_picture("mubashir.jpg", 60, circular=False)
mubashir_layout.addWidget(mubashir_pic)
mubashir_info = QLabel("Mubashir Shaheen\nF21BINCE1M03029")
mubashir_info.setFont(QFont("Inter", 8)) # Smaller text
mubashir_info.setStyleSheet("color: #374151;")
mubashir_info.setAlignment(Qt.AlignCenter)
mubashir_info.setFixedWidth(140)
mubashir_info.setWordWrap(True)
mubashir_layout.addWidget(mubashir_info)
developers_layout.addWidget(mubashir_frame)
# Hamza
hamza_frame = QFrame()
hamza_layout = QVBoxLayout(hamza_frame)
hamza_layout.setSpacing(1)
hamza_pic = self.load_profile_picture("hamza.jpg", 60, circular=False)
hamza_layout.addWidget(hamza_pic)
hamza_info = QLabel("Hafiz Muhammad Hamza Sohail\nF21BINCE1M03004")
hamza_info.setFont(QFont("Inter", 8)) # Smaller text
hamza_info.setStyleSheet("color: #374151;")
hamza_info.setAlignment(Qt.AlignCenter)
hamza_info.setFixedWidth(140)
hamza_info.setWordWrap(True)
hamza_layout.addWidget(hamza_info)
developers_layout.addWidget(hamza_frame)
team_layout.addWidget(developers_frame)
# Project info
project_info = QLabel(
"Next-Level Network Protection\n"
"Features: Deep Packet Inspection | AI Detection (Autoencoder + LSTM) \n |OSINT Blacklist | DNS Security"
)
project_info.setFont(QFont("Inter", 11))
project_info.setStyleSheet("color: #374151; line-height: 1.4;")
project_info.setAlignment(Qt.AlignCenter)
project_info.setWordWrap(True)
project_info.setFixedHeight(80)
team_layout.addWidget(project_info)
content_layout.addWidget(team_frame)
# Right: Scripts and action buttons
right_frame = QFrame()
right_frame.setStyleSheet("""
background-color: #FFFFFF;
border-radius: 8px;
border: 1px solid #E5E7EB;
padding: 15px;
""")
right_frame.setFixedWidth(450)
right_layout = QVBoxLayout(right_frame)
right_layout.setSpacing(10)
# Scripts section
self.scripts = {
1: {"script_name": "capture.py", "button_label": "DPI Detection"},
2: {"script_name": "ML.py", "button_label": "AI Detection"},
3: {"script_name": "iac.py", "button_label": "Internet Access Control"},
4: {"script_name": "track.py", "button_label": "IP Tracking on Maps"},
5: {"script_name": "block.py", "button_label": "Auto IP Blocking"},
6: {"script_name": "dns.py", "button_label": "DNS Protection"}
}
self.buttons = {}
for i in range(1, 7):
btn = QPushButton(f"{self.scripts[i]['button_label']} - OFF")
btn.setFixedHeight(45)
btn.setFont(QFont("Inter", 12, QFont.Bold))
btn.setStyleSheet("""
QPushButton {
background-color: #EF4444;
color: #FFFFFF;
border-radius: 6px;
padding: 8px;
border: 2px solid #DC2626;
}
QPushButton:hover {
background-color: #F87171;
}
""")
btn.clicked.connect(lambda checked, idx=i: self.toggle_script(idx))
self.buttons[i] = btn
right_layout.addWidget(btn)
# Action buttons
action_frame = QFrame()
action_layout = QHBoxLayout(action_frame)
action_layout.setSpacing(15)
for text, func, color in [
("Traffic Log", self.open_traffic_log, "#10B981"),
("Docs", self.open_documentation, "#3B82F6"),
]:
btn = QPushButton(text)
btn.setFixedSize(130, 40)
btn.setFont(QFont("Inter", 12, QFont.Bold))
btn.setStyleSheet(f"""
QPushButton {{
background-color: {color};
color: #FFFFFF;
border-radius: 6px;
padding: 8px;
border: 1px solid rgba(255, 255, 255, 0.4);
}}
QPushButton:hover {{
background-color: {self.lighten_color(color)};
}}
""")
btn.clicked.connect(func)
action_layout.addWidget(btn)
right_layout.addWidget(action_frame, alignment=Qt.AlignCenter)
content_layout.addWidget(right_frame)
main_layout.addWidget(content_frame)
# Footer
footer = QLabel("BS Cyber Security | Final Year Project | Fall 2021-25")
footer.setFont(QFont("Inter", 9))
footer.setStyleSheet("""
color: #6B7280;
padding: 8px;
background-color: #FFFFFF;
border-radius: 6px;
border: 1px solid #E5E7EB;
""")
footer.setAlignment(Qt.AlignCenter)
footer.setFixedHeight(30)
main_layout.addWidget(footer)
self.setLayout(main_layout)
def load_profile_picture(self, filename, size, circular=False):
pic = QLabel()
if os.path.exists(filename):
pixmap = QPixmap(filename).scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
if not pixmap.isNull():
if circular:
# Create circular mask
mask = QPixmap(size, size)
mask.fill(Qt.transparent)
painter = QPainter(mask)
painter.setRenderHint(QPainter.Antialiasing)
painter.setBrush(QColor(255, 255, 255))
painter.drawEllipse(0, 0, size, size)
painter.end()
pixmap = pixmap.scaled(size, size, Qt.KeepAspectRatioByExpanding, Qt.SmoothTransformation)
pixmap.setMask(mask.mask())
pic.setPixmap(pixmap)
else:
pic.setText("Invalid Image")
pic.setFont(QFont("Inter", 10))
pic.setStyleSheet("color: #EF4444; background-color: #F3F4F6; padding: 8px;")
pic.setFixedSize(size, size)
else:
pic.setText("Image Not Found")
pic.setFont(QFont("Inter", 10))
pic.setStyleSheet("color: #EF4444; background-color: #F3F4F6; padding: 8px;")
pic.setFixedSize(size, size)
pic.setAlignment(Qt.AlignCenter)
return pic
def toggle_script(self, script_id):
button = self.buttons[script_id]
script_name = self.scripts[script_id]["script_name"]
try:
if button.text().endswith("OFF"):
process = subprocess.Popen(
["python", script_name],
creationflags=subprocess.CREATE_NEW_CONSOLE,
shell=True
)
self.processes[script_id] = process
button.setText(f"{self.scripts[script_id]['button_label']} - ON")
self.animate_button(button, "#EF4444", "#10B981")
else:
if script_id in self.processes:
self.processes[script_id].terminate()
del self.processes[script_id]
button.setText(f"{self.scripts[script_id]['button_label']} - OFF")
self.animate_button(button, "#10B981", "#EF4444")
except FileNotFoundError:
QMessageBox.critical(self, "Error", f"Script '{script_name}' not found!")
except PermissionError:
QMessageBox.critical(self, "Error", f"Permission denied to run '{script_name}'. Run as administrator.")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to toggle '{script_name}': {str(e)}")
def animate_button(self, button, start_color, end_color):
color_anim = QPropertyAnimation(button, b"styleSheet")
color_anim.setDuration(500)
color_anim.setStartValue(f"""
background-color: {start_color};
color: #FFFFFF; border-radius: 6px; padding: 8px;
border: 2px solid {self.darken_color(start_color)};
""")
color_anim.setEndValue(f"""
background-color: {end_color};
color: #FFFFFF; border-radius: 6px; padding: 8px;
border: 2px solid {self.darken_color(end_color)};
""")
color_anim.setEasingCurve(QEasingCurve.InOutCubic)
color_anim.start()
def open_traffic_log(self):
try:
subprocess.Popen(["start", "traffic.csv"], shell=True)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to open traffic.csv: {str(e)}")
def open_documentation(self):
try:
subprocess.Popen(["start", "https://www.AdvfirwallDPI.com"], shell=True)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to open documentation: {str(e)}")
def lighten_color(self, color):
c = QColor(color)
return QColor(min(c.red() + 40, 255), min(c.green() + 40, 255), min(c.blue() + 40, 255)).name()
def darken_color(self, color):
c = QColor(color)
return QColor(max(c.red() - 40, 0), max(c.green() - 40, 0), max(c.blue() - 40, 0)).name()
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle('Fusion')
palette = QPalette()
palette.setColor(QPalette.Window, QColor(250, 251, 252))
app.setPalette(palette)
window = PythonControlApp()
window.show()
sys.exit(app.exec_())
Code Output
Code Summary
- Defines PythonControlApp class using PyQt5 for GUI
- Creates a fixed-size window (1100x720) for Protectify firewall
- Implements clean UI with custom styling and Inter font
- Organizes layout with title, team info, and script controls
- Displays supervisor and developer information with profile pictures
- Lists project features including DPI and AI detection
- Provides toggle buttons for 6 security scripts with ON/OFF states
- Includes error handling for script execution and file operations
- Features button animations and action buttons for logs/docs
- Sets up application with Fusion style and custom palette
DPI Detection
Module Code:
import scapy.all as scapy
import pandas as pd
import time
import tkinter as tk
from tkinter import ttk
import threading
from collections import deque
import os
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.animation import FuncAnimation
# Initialize data structures
columns = ["packet_id", "timestamp", "ip_src", "ip_dst", "protocol", "length", "src_port", "dst_port", "data"]
packet_data = deque(maxlen=1000) # Efficient deque for last 1000 packets
unique_ips = set() # Set for unique IPs
packet_id_counter = 1
last_save_time = time.time()
packet_counts = deque(maxlen=60) # Last 60 seconds of packet counts
# Function to safely decode payloads
def safe_decode(payload):
try:
return payload.decode(errors='replace')[:50] # Limit data length for efficiency
except Exception:
return "N/A"
# Detect active network interface
def get_active_interface():
for iface in scapy.get_if_list():
if scapy.get_if_addr(iface) != "0.0.0.0":
print(f"Using interface: {iface}")
return iface
raise Exception("No active interface found!")
# Packet processing function
def packet_callback(packet):
global packet_id_counter, packet_data, unique_ips, last_save_time, packet_counts
try:
# Extract essential packet details
ip_src = packet[scapy.IP].src if packet.haslayer(scapy.IP) else "N/A"
ip_dst = packet[scapy.IP].dst if packet.haslayer(scapy.IP) else "N/A"
length = len(packet) if packet.haslayer(scapy.IP) else 0
protocol = "TCP" if packet.haslayer(scapy.TCP) else "UDP" if packet.haslayer(scapy.UDP) else "Other"
src_port = packet[scapy.TCP].sport if packet.haslayer(scapy.TCP) else packet[scapy.UDP].sport if packet.haslayer(scapy.UDP) else "N/A"
dst_port = packet[scapy.TCP].dport if packet.haslayer(scapy.TCP) else packet[scapy.UDP].dport if packet.haslayer(scapy.UDP) else "N/A"
data = safe_decode(packet[scapy.Raw].load) if packet.haslayer(scapy.Raw) else "N/A"
# Create packet dictionary
packet_info = {
"packet_id": packet_id_counter,
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S'),
"ip_src": ip_src,
"ip_dst": ip_dst,
"protocol": protocol,
"length": length,
"src_port": src_port,
"dst_port": dst_port,
"data": data
}
# Add to deque and update unique IPs
packet_data.append(packet_info)
if ip_src != "N/A": unique_ips.add(ip_src)
if ip_dst != "N/A": unique_ips.add(ip_dst)
# Update packet counts per second
current_second = int(time.time())
if not packet_counts or packet_counts[-1]['time'] != current_second:
packet_counts.append({'time': current_second, 'count': 1})
else:
packet_counts[-1]['count'] += 1
# Save to CSV and IPs
save_to_csv(packet_info)
save_unique_ips()
# Overwrite traffic.csv every 5 minutes
current_time = time.time()
if current_time - last_save_time >= 300: # 300 seconds = 5 minutes
overwrite_traffic_csv()
last_save_time = current_time
# Update GUI
update_gui(packet_info)
packet_id_counter += 1
except Exception as e:
print(f"Error processing packet: {e}")
# Save single packet to CSV (append mode)
def save_to_csv(packet_info):
df = pd.DataFrame([packet_info])
df.to_csv('traffic.csv', mode='a', header=not os.path.exists('traffic.csv'), index=False)
# Overwrite traffic.csv with last 1000 packets
def overwrite_traffic_csv():
df = pd.DataFrame(packet_data)
df.to_csv('traffic.csv', mode='w', header=True, index=False)
print("Overwritten traffic.csv with last 1000 packets.")
# Save unique IPs to ip.txt
def save_unique_ips():
with open('ip.txt', 'w') as f:
f.write('\n'.join(sorted(unique_ips)))
# Update GUI with new packet
def update_gui(packet_info):
treeview.insert("", 0, values=[packet_info[col] for col in columns], tags=('even' if packet_id_counter % 2 == 0 else 'odd',))
if len(treeview.get_children()) > 1000:
treeview.delete(treeview.get_children()[-1])
# Update packets-per-second plot
def update_plot(frame):
ax.clear()
times = [entry['time'] for entry in packet_counts]
counts = [entry['count'] for entry in packet_counts]
# Colorful scheme based on count
colors = ['#FF6B6B' if c > 20 else '#4ECDC4' if c > 10 else '#45B7D1' for c in counts]
ax.bar(range(len(counts)), counts, color=colors)
ax.set_title("Packets per Second", fontsize=12, fontweight='bold', color='#333333')
ax.set_ylabel("Count", fontsize=10, color='#333333')
ax.set_xticks([]) # Hide x-axis labels for simplicity
ax.set_facecolor('#F5F5F5')
fig.patch.set_facecolor('#F5F5F5')
# Start sniffing in a separate thread
def start_sniffing(interface):
print(f"Sniffing on {interface}")
scapy.sniff(iface=interface, prn=packet_callback, store=0)
def sniff_thread(interface):
thread = threading.Thread(target=start_sniffing, args=(interface,), daemon=True)
thread.start()
# GUI setup
def setup_gui():
global root, treeview, fig, ax
root = tk.Tk()
root.title("Protectify: Advance Firewall with DPI")
root.geometry("1200x800")
root.configure(bg="#FFFFFF") # Bright theme
# Style configuration
style = ttk.Style()
style.theme_use("clam")
style.configure("Treeview", background="#FFFFFF", foreground="#333333", fieldbackground="#FFFFFF", rowheight=25)
style.configure("Treeview.Heading", background="#E0E0E0", foreground="#333333", font=("Arial", 10, "bold"))
style.map("Treeview", background=[('selected', '#B0E0E6')])
# Top frame for Treeview
top_frame = ttk.Frame(root)
top_frame.pack(padx=10, pady=10, fill="both", expand=True)
# Treeview setup
treeview = ttk.Treeview(top_frame, columns=columns, show="headings", height=20)
for col in columns:
treeview.heading(col, text=col.capitalize(), anchor="w")
treeview.column(col, width=130, anchor="w")
treeview.pack(side="left", fill="both", expand=True)
# Scrollbar
scrollbar = ttk.Scrollbar(top_frame, orient="vertical", command=treeview.yview)
scrollbar.pack(side="right", fill="y")
treeview.configure(yscrollcommand=scrollbar.set)
# Color tags for alternating rows
treeview.tag_configure('odd', background='#F0F0F0')
treeview.tag_configure('even', background='#FFFFFF')
# Bottom frame for plot
bottom_frame = ttk.Frame(root)
bottom_frame.pack(padx=10, pady=10, fill="x")
# Matplotlib plot setup
fig, ax = plt.subplots(figsize=(10, 3))
canvas = FigureCanvasTkAgg(fig, master=bottom_frame)
canvas.get_tk_widget().pack(fill="x")
ani = FuncAnimation(fig, update_plot, interval=1000, cache_frame_data=False) # Update every second
# Start sniffing
interface = get_active_interface()
sniff_thread(interface)
# Schedule CSV overwrite
root.after(300000, overwrite_traffic_csv)
root.mainloop()
if __name__ == "__main__":
setup_gui()
Code Output
Code Summary
- Defines packet sniffing system using Scapy and Tkinter GUI
- Initializes data structures for packet info and metrics
- Implements safe payload decoding with length limitation
- Detects active network interface automatically
- Processes packets with IP, port, and protocol details
- Saves packet data to CSV and unique IPs to text file
- Updates GUI with real-time packet display in Treeview
- Visualizes packets-per-second with animated Matplotlib plot
- Runs sniffing in separate thread to prevent GUI freeze
- Configures bright-themed GUI with alternating row colors
AI Detection
import threading
import time
import numpy as np
from scapy.all import sniff, IP, TCP, UDP, Raw
import torch
import torch.nn as nn
import pytorch_lightning as pl
import subprocess
import os
import ctypes
from collections import deque
import customtkinter as ctk
import ipaddress
# Extract features and protocol info
def extract_packet_features(packet):
if IP in packet:
ip_layer = packet[IP]
features = [
len(packet),
ip_layer.proto,
int(ip_layer.src.split('.')[-1]),
ip_layer.len,
ip_layer.ttl
]
proto = 'TCP' if TCP in packet else 'UDP' if UDP in packet else 'RAW'
return torch.tensor(features, dtype=torch.float32), packet[IP].src, proto, packet.summary()
return None, None, None, None
# Check if IP is private
def is_private_ip(ip):
try:
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private
except ValueError:
return False
# LSTM Autoencoder with dynamic threshold
class LSTMAutoencoder(pl.LightningModule):
def __init__(self, n_features=5, timesteps=4):
super().__init__()
self.timesteps = timesteps
self.n_features = n_features
self.encoder = nn.LSTM(n_features, 16, batch_first=True, num_layers=1)
self.decoder = nn.LSTM(16, 16, batch_first=True, num_layers=1)
self.fc = nn.Linear(16, n_features)
self.criterion = nn.MSELoss()
self.recon_errors = deque(maxlen=1000) # Track recent reconstruction errors
def forward(self, x):
encoded, _ = self.encoder(x)
decoded, _ = self.decoder(encoded)
return self.fc(decoded)
def training_step(self, batch, batch_idx):
x = batch
x_hat = self(x)
loss = self.criterion(x_hat, x)
with torch.no_grad():
error = torch.mean((x_hat - x) ** 2).item()
self.recon_errors.append(error)
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=0.001)
def get_dynamic_threshold(self):
if not self.recon_errors:
return 0.25 # Default threshold
mean_error = np.mean(list(self.recon_errors))
std_error = np.std(list(self.recon_errors))
return mean_error + 2 * std_error # 2 standard deviations above mean
# Background training
def train_model_in_background(data_queue, gui):
def train():
model = LSTMAutoencoder(n_features=5, timesteps=4)
trainer = pl.Trainer(
max_epochs=3,
accelerator="auto",
devices=1,
logger=False,
enable_checkpointing=False,
enable_progress_bar=False
)
while True:
if len(data_queue) >= 50:
data = list(data_queue)[-100:]
data_tensor = torch.stack([torch.stack(data[i:i+4])
for i in range(len(data) - 3)])
trainer.fit(model, torch.utils.data.DataLoader(data_tensor, batch_size=16))
model.eval()
gui.model = model
gui.monitoring_queue.append(("Model updated (Threshold: {:.4f})".format(model.get_dynamic_threshold()), "blue"))
time.sleep(5)
threading.Thread(target=train, daemon=True).start()
# Anomaly detection with explanation
def detect_anomaly(model, packet_data):
with torch.no_grad():
input_tensor = torch.stack(packet_data).unsqueeze(0)
recon = model(input_tensor)
error = torch.mean((recon - input_tensor) ** 2).item()
threshold = model.get_dynamic_threshold()
is_anomaly = error > threshold
return is_anomaly, error, threshold
# Block IP with private IP check
def block_ip(ip, malicious_ips, gui):
if is_private_ip(ip):
gui.monitoring_queue.append((f"Skipped blocking {ip} (private IP)", "gray"))
return False
if ip not in malicious_ips:
try:
subprocess.run(
f"netsh advfirewall firewall add rule name=Block_{ip} dir=in action=block remoteip={ip}",
shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
with open("malicious.txt", "a") as file:
file.write(f"{ip}\n")
malicious_ips.add(ip)
return True
except:
gui.monitoring_queue.append((f"Failed to block {ip}", "red"))
return False
return False
# Colorful GUI with persistent IP monitoring
class FirewallGUI:
def __init__(self, root):
self.root = root
self.model = None
self.root.title("AI Firewall")
self.root.geometry("900x600")
ctk.set_appearance_mode("dark")
ctk.set_default_color_theme("dark-blue")
self.main_frame = ctk.CTkFrame(root, corner_radius=10, fg_color="#1a1a1a")
self.main_frame.pack(fill="both", expand=True, padx=5, pady=5)
self.main_frame.grid_columnconfigure((0, 1, 2), weight=1)
self.main_frame.grid_rowconfigure(0, weight=1)
self.main_frame.grid_rowconfigure(1, weight=0)
# Training Frame (Purple)
self.training_frame = ctk.CTkFrame(self.main_frame, fg_color="#4a235a")
self.training_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
ctk.CTkLabel(self.training_frame, text="Training Data", font=("Arial", 14), text_color="#d7bde2").pack(pady=2)
self.training_log = ctk.CTkTextbox(self.training_frame, height=400, width=280, font=("Consolas", 10), fg_color="#2d1b36", text_color="#f2e6f7")
self.training_log.pack(fill="both", expand=True)
# Monitoring Frame (Red)
self.monitoring_frame = ctk.CTkFrame(self.main_frame, fg_color="#641e16")
self.monitoring_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
ctk.CTkLabel(self.monitoring_frame, text="IP Monitoring", font=("Arial", 14), text_color="#f5b7b1").pack(pady=2)
self.monitoring_log = ctk.CTkTextbox(self.monitoring_frame, height=400, width=280, font=("Consolas", 10), fg_color="#3b120d", text_color="#fadbd8")
self.monitoring_log.pack(fill="both", expand=True)
# Traffic Frame (Green)
self.traffic_frame = ctk.CTkFrame(self.main_frame, fg_color="#1d4d29")
self.traffic_frame.grid(row=0, column=2, padx=5, pady=5, sticky="nsew")
ctk.CTkLabel(self.traffic_frame, text="Traffic", font=("Arial", 14), text_color="#a9dfbf").pack(pady=2)
self.traffic_log = ctk.CTkTextbox(self.traffic_frame, height=400, width=280, font=("Consolas", 10), fg_color="#12361e", text_color="#d5f5e3")
self.traffic_log.pack(fill="both", expand=True)
# Stats Frame (Blue)
self.stats_frame = ctk.CTkFrame(self.main_frame, fg_color="#154360")
self.stats_frame.grid(row=1, column=0, columnspan=3, pady=5, sticky="ew")
self.tcp_label = ctk.CTkLabel(self.stats_frame, text="TCP: 0%", text_color="#a9cce3")
self.tcp_label.pack(side="left", padx=10)
self.udp_label = ctk.CTkLabel(self.stats_frame, text="UDP: 0%", text_color="#a9cce3")
self.udp_label.pack(side="left", padx=10)
self.raw_label = ctk.CTkLabel(self.stats_frame, text="RAW: 0%", text_color="#a9cce3")
self.raw_label.pack(side="left", padx=10)
# Data structures
self.training_queue = deque(maxlen=100)
self.monitoring_queue = deque(maxlen=1000)
self.traffic_queue = deque(maxlen=100)
self.packet_buffer = deque(maxlen=1000)
self.data_queue = deque(maxlen=1000)
self.unique_ips = {}
self.malicious_ips = set()
self.running = True
self.proto_counts = {'TCP': 0, 'UDP': 0, 'RAW': 0}
self.total_packets = 0
self.load_existing_ips()
self.start_operations()
self.root.after(100, self.update_logs)
self.root.after(10000, self.update_stats)
def load_existing_ips(self):
for file_name in ["ip.txt", "malicious.txt"]:
if os.path.exists(file_name):
with open(file_name, "r") as file:
ips = set(line.strip() for line in file)
if file_name == "ip.txt":
self.unique_ips = {ip: "Unknown" for ip in ips}
else:
self.malicious_ips = ips
for ip in ips:
if ip in self.unique_ips:
self.unique_ips[ip] = "UNSAFE"
def save_unique_ip(self, ip):
if ip not in self.unique_ips:
self.unique_ips[ip] = "Unknown"
with open("ip.txt", "a") as file:
file.write(f"{ip}\n")
def start_operations(self):
def packet_callback(packet):
features, src_ip, proto, summary = extract_packet_features(packet)
if features is not None:
self.data_queue.append(features)
self.packet_buffer.append(features)
self.save_unique_ip(src_ip)
self.proto_counts[proto] += 1
self.total_packets += 1
self.traffic_queue.append((f"{proto} from {src_ip} ({len(packet)}B)", "white"))
self.training_queue.append((f"Features: {features.tolist()[:2]}... ({len(self.data_queue)} samples)", "white"))
if self.model and len(self.packet_buffer) >= 4:
batch = list(self.packet_buffer)[-4:]
is_malicious, error, threshold = detect_anomaly(self.model, batch)
status = "UNSAFE" if is_malicious else "SAFE"
self.unique_ips[src_ip] = status
if is_malicious:
self.monitoring_queue.append((f"{src_ip} flagged (Error: {error:.4f} > {threshold:.4f})", "orange"))
if block_ip(src_ip, self.malicious_ips, self):
self.monitoring_queue.append((f"Blocked {src_ip}", "yellow"))
train_model_in_background(self.data_queue, self)
threading.Thread(target=lambda: sniff(prn=packet_callback, store=0, filter="ip"), daemon=True).start()
def update_logs(self):
self.training_log.delete("1.0", "end")
for msg, color in list(self.training_queue):
self.training_log.insert("end", f"{time.strftime('%H:%M:%S')} - {msg}\n", color)
self.training_log.tag_config(color, foreground=color)
self.monitoring_log.delete("1.0", "end")
self.monitoring_log.insert("end", "Unique IPs Status:\n", "white")
self.monitoring_log.tag_config("white", foreground="white")
for ip, status in sorted(self.unique_ips.items()):
color = "green" if status == "SAFE" else "red" if status == "UNSAFE" else "gray"
self.monitoring_log.insert("end", f"{ip}: {status}\n", color)
self.monitoring_log.tag_config(color, foreground=color)
for msg, color in list(self.monitoring_queue):
self.monitoring_log.insert("end", f"{time.strftime('%H:%M:%S')} - {msg}\n", color)
self.monitoring_log.tag_config(color, foreground=color)
self.traffic_log.delete("1.0", "end")
for msg, color in list(self.traffic_queue):
self.traffic_log.insert("end", f"{time.strftime('%H:%M:%S')} - {msg}\n", color)
self.traffic_log.tag_config(color, foreground=color)
if self.running:
self.root.after(100, self.update_logs)
def update_stats(self):
if self.total_packets > 0:
tcp_pct = (self.proto_counts['TCP'] / self.total_packets) * 100
udp_pct = (self.proto_counts['UDP'] / self.total_packets) * 100
raw_pct = (self.proto_counts['RAW'] / self.total_packets) * 100
self.tcp_label.configure(text=f"TCP: {tcp_pct:.1f}%")
self.udp_label.configure(text=f"UDP: {udp_pct:.1f}%")
self.raw_label.configure(text=f"RAW: {raw_pct:.1f}%")
self.root.after(10000, self.update_stats)
def on_closing(self):
self.running = False
self.root.destroy()
if __name__ == "__main__":
if os.name == 'nt' and not ctypes.windll.shell32.IsUserAnAdmin():
print("Please run as administrator!")
exit()
root = ctk.CTk()
app = FirewallGUI(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()
Code Output
Code Summary
- Defines AI-based firewall with packet sniffing and anomaly detection
- Extracts packet features (length, protocol, IP, TTL) using Scapy
- Implements LSTM Autoencoder with dynamic threshold for anomaly detection
- Trains model in background thread using PyTorch Lightning
- Detects anomalies and blocks malicious IPs via Windows firewall
- Creates colorful CustomTkinter GUI with training, monitoring, and traffic logs
- Tracks unique IPs and protocol statistics (TCP/UDP/RAW)
- Saves IP data to files and skips blocking private IPs
- Updates GUI in real-time with packet info and IP status
- Requires admin privileges on Windows for firewall rules
Internet Access Control
import customtkinter as ctk
from tkinter import messagebox
import psutil
import os
import subprocess
import threading
import time
from PIL import Image
import winreg
import sys
import ctypes
import logging
# Set up logging for debugging
logging.basicConfig(filename="firewall.log", level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
class ProtectifyFirewall:
def __init__(self, root):
self.root = root
self.root.title("Protectify: Firewall Internet Access Control")
self.root.geometry("900x600")
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
# Check if Firewall service is running
self.check_firewall_service()
# Header with improved color scheme
header_frame = ctk.CTkFrame(root, fg_color="#cce0ff", corner_radius=10)
header_frame.pack(fill="x", pady=10, padx=10)
ctk.CTkLabel(header_frame, text="Protectify: Firewall Internet Access Control", font=("Segoe UI", 24, "bold"), text_color="#004080").pack(pady=5)
# Search bar with button
search_frame = ctk.CTkFrame(root, fg_color="#ffffff", corner_radius=10)
search_frame.pack(fill="x", padx=10, pady=5)
self.search_var = ctk.StringVar()
search_entry = ctk.CTkEntry(search_frame, textvariable=self.search_var, placeholder_text="Search apps...",
corner_radius=8, border_width=0, fg_color="#f0f5ff", text_color="#333333")
search_entry.pack(side="left", fill="x", expand=True, padx=5, pady=5)
search_button = ctk.CTkButton(search_frame, text="Search", fg_color="#0066cc", hover_color="#004080",
width=80, font=("Segoe UI", 10, "bold"), command=self.filter_apps_manual)
search_button.pack(side="right", padx=5, pady=5)
search_entry.bind("", self.filter_apps)
# Main content
self.canvas = ctk.CTkCanvas(root, bg="#f0f5ff", highlightthickness=0)
self.scrollbar = ctk.CTkScrollbar(root, command=self.canvas.yview, fg_color="#d0e0ff")
self.app_frame = ctk.CTkFrame(self.canvas, fg_color="#f0f5ff")
self.canvas.configure(yscrollcommand=self.scrollbar.set)
self.scrollbar.pack(side="right", fill="y")
self.canvas.pack(side="left", fill="both", expand=True, padx=10)
self.canvas.create_window((0, 0), window=self.app_frame, anchor="nw")
self.app_frame.bind("", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all")))
# App data
self.app_data = {} # {name: {"path": path, "type": str, "status": bool, "icon": CTkImage}}
self.app_widgets = {}
# Start app detection
self.running = True
self.detect_thread = threading.Thread(target=self.detect_apps, daemon=True)
self.detect_thread.start()
# Update GUI
self.root.after(1000, self.update_gui)
def check_firewall_service(self):
"""Ensure Windows Firewall service is running"""
try:
result = subprocess.run("sc query mpssvc", capture_output=True, text=True, shell=True)
if "RUNNING" not in result.stdout:
logging.warning("Windows Firewall service is not running. Attempting to start...")
subprocess.run("net start mpssvc", shell=True, check=True)
logging.info("Windows Firewall service started successfully")
else:
logging.info("Windows Firewall service is running")
except subprocess.CalledProcessError as e:
logging.error(f"Failed to start Firewall service: {str(e)}")
messagebox.showerror("Error", "Windows Firewall service failed to start. Check system settings.")
def get_app_icon(self, exe_path):
"""Load app icon, default to icon.png if not obtained"""
try:
import win32gui
import win32ui
import win32con
large, _ = win32gui.ExtractIconEx(exe_path, 0)
if large:
hdc = win32ui.CreateDCFromHandle(win32gui.GetDC(0))
hbmp = win32ui.CreateBitmap()
hbmp.CreateCompatibleBitmap(hdc, 32, 32)
hdc = hdc.CreateCompatibleDC()
hdc.SelectObject(hbmp)
hdc.DrawIcon((0, 0), large[0])
win32gui.DestroyIcon(large[0])
bmp_info = hbmp.GetInfo()
bmp_str = hbmp.GetBitmapBits(True)
img = Image.frombuffer('RGBA', (bmp_info['bmWidth'], bmp_info['bmHeight']), bmp_str, 'raw', 'BGRA', 0, 1)
return ctk.CTkImage(light_image=img.resize((32, 32), Image.Resampling.LANCZOS), size=(32, 32))
except:
pass
# Fallback to icon.png
try:
if os.path.exists("icon.png"):
img = Image.open("icon.png").resize((32, 32), Image.Resampling.LANCZOS)
return ctk.CTkImage(light_image=img, size=(32, 32))
except:
return ctk.CTkImage(light_image=Image.new('RGBA', (32, 32)), size=(32, 32))
def get_installed_apps(self):
"""Detect installed and system apps"""
apps = {}
system_paths = {os.environ['SystemRoot'].lower(), os.environ['ProgramFiles'].lower()}
for proc in psutil.process_iter(['pid', 'name', 'exe']):
try:
if proc.info['exe'] and '.exe' in proc.info['exe'].lower():
path = proc.info['exe']
name = proc.info['name'].replace('.exe', '')
app_type = "System" if any(path.lower().startswith(p) for p in system_paths) else "Installed"
apps[name] = {"path": path, "type": app_type}
except:
continue
reg_paths = [
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall",
r"SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall"
]
for reg_path in reg_paths:
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path)
for i in range(winreg.QueryInfoKey(key)[0]):
subkey_name = winreg.EnumKey(key, i)
subkey = winreg.OpenKey(key, subkey_name)
try:
name, _ = winreg.QueryValueEx(subkey, "DisplayName")
path, _ = winreg.QueryValueEx(subkey, "InstallLocation")
if name and path and os.path.exists(path):
exe_path = next((f for f in os.listdir(path) if f.endswith('.exe')), None)
if exe_path:
full_path = os.path.join(path, exe_path)
app_type = "System" if any(full_path.lower().startswith(p) for p in system_paths) else "Installed"
apps[name] = {"path": full_path, "type": app_type}
except:
continue
except:
continue
return apps
def set_firewall_rule(self, app_name, app_path, enable):
"""Set firewall rule using netsh commands"""
try:
rule_name = f"Protectify_Block_{app_name}"
# Always delete the existing rule first to ensure a clean state
subprocess.run(f'netsh advfirewall firewall delete rule name="{rule_name}"', shell=True, capture_output=True, text=True)
logging.info(f"Attempted to remove existing rule: {rule_name}")
if not enable: # Block traffic (OFF)
cmd = f'netsh advfirewall firewall add rule name="{rule_name}" dir=out action=block program="{app_path}" enable=yes'
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
logging.info(f"Successfully added block rule for {app_name} at {app_path}")
else:
logging.error(f"Failed to add block rule for {app_name}: {result.stderr}")
messagebox.showerror("Error", f"Failed to block {app_name}: {result.stderr}")
return
else: # Allow traffic (ON)
logging.info(f"Enabled internet for {app_name} by removing block rule")
self.app_data[app_name]["status"] = enable
self.update_app_status(app_name)
except Exception as e:
logging.error(f"Exception in set_firewall_rule for {app_name}: {str(e)}")
messagebox.showerror("Error", f"Failed to update {app_name}: {str(e)}")
def update_app_status(self, app_name):
"""Update GUI status and buttons"""
widgets = self.app_widgets[app_name]
status = self.app_data[app_name]["status"]
widgets["status_label"].configure(text="ON" if status else "OFF", text_color="#00cc00" if status else "#ff5555")
widgets["type_label"].configure(text=self.app_data[app_name]["type"])
widgets["on_btn"].configure(fg_color="#00cc00" if status else "#e0e0e0", hover_color="#00aa00" if status else "#d0d0d0")
widgets["off_btn"].configure(fg_color="#ff5555" if not status else "#e0e0e0", hover_color="#ff3333" if not status else "#d0d0d0")
def detect_apps(self):
"""Real-time app detection with lower frequency"""
while self.running:
new_apps = self.get_installed_apps()
self.root.after(0, self.update_app_list, new_apps)
time.sleep(5)
def filter_apps(self, event):
"""Filter apps based on search (KeyRelease)"""
self.filter_apps_manual()
def filter_apps_manual(self):
"""Filter apps based on search (Button click)"""
search_term = self.search_var.get().lower()
for widget in self.app_frame.winfo_children():
widget.pack_forget()
for app_name, widgets in self.app_widgets.items():
if search_term in app_name.lower():
widgets["frame"].pack(fill="x", padx=10, pady=5)
def update_app_list(self, apps):
"""Update GUI with app list"""
existing_apps = set(self.app_data.keys())
new_apps = set(apps.keys())
# Remove old apps
for app_name in existing_apps - new_apps:
self.app_widgets[app_name]["frame"].destroy()
del self.app_widgets[app_name]
del self.app_data[app_name]
# Add/update apps
for app_name in new_apps:
if app_name not in self.app_data:
self.app_data[app_name] = apps[app_name]
self.app_data[app_name]["status"] = True
self.app_data[app_name]["icon"] = self.get_app_icon(apps[app_name]["path"])
frame = ctk.CTkFrame(self.app_frame, fg_color="#ffffff", corner_radius=8, border_width=1, border_color="#d0e0ff")
frame.pack(fill="x", padx=10, pady=5)
ctk.CTkLabel(frame, image=self.app_data[app_name]["icon"], text="").pack(side="left", padx=10)
ctk.CTkLabel(frame, text=app_name, font=("Segoe UI", 12, "bold"), text_color="#333333").pack(side="left", padx=5)
type_label = ctk.CTkLabel(frame, text=apps[app_name]["type"], font=("Segoe UI", 10), text_color="#666666")
type_label.pack(side="left", padx=10)
status_label = ctk.CTkLabel(frame, text="ON", font=("Segoe UI", 10, "bold"), text_color="#00cc00")
status_label.pack(side="left", padx=10)
on_btn = ctk.CTkButton(frame, text="ON", fg_color="#00cc00", width=60, font=("Segoe UI", 10, "bold"),
hover_color="#00aa00", command=lambda n=app_name, p=apps[app_name]["path"]: self.set_firewall_rule(n, p, True))
on_btn.pack(side="right", padx=5)
off_btn = ctk.CTkButton(frame, text="OFF", fg_color="#ff5555", width=60, font=("Segoe UI", 10, "bold"),
hover_color="#ff3333", command=lambda n=app_name, p=apps[app_name]["path"]: self.set_firewall_rule(n, p, False))
off_btn.pack(side="right", padx=5)
self.app_widgets[app_name] = {
"frame": frame,
"status_label": status_label,
"type_label": type_label,
"on_btn": on_btn,
"off_btn": off_btn,
"icon": self.app_data[app_name]["icon"]
}
else:
self.update_app_status(app_name)
def update_gui(self):
"""Periodic GUI update"""
if self.running:
self.root.after(1000, self.update_gui)
def __del__(self):
self.running = False
def is_admin():
"""Check if running with admin privileges on Windows"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
if __name__ == "__main__":
if not is_admin():
messagebox.showerror("Error", "Please run as administrator")
sys.exit(1)
root = ctk.CTk()
app = ProtectifyFirewall(root)
root.mainloop()
Code Output
Code Summary
- Defines ProtectifyFirewall class with CustomTkinter GUI
- Checks and starts Windows Firewall service if not running
- Creates light-themed interface with search bar and scrollable app list
- Detects running and installed apps using psutil and Windows Registry
- Loads app icons with fallback to default icon.png
- Controls internet access via netsh firewall rules (ON/OFF)
- Filters apps dynamically based on search input
- Updates GUI in real-time with app status and type (System/Installed)
- Logs actions to firewall.log for debugging
- Requires admin privileges and runs detection in a background thread
IP Tracking on Maps
import sys
import requests
from collections import deque
import webbrowser
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QPushButton, QScrollArea, QFrame
from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtGui import QFont
# IP data cache
ip_cache = {}
def fetch_ip_data(ip_address):
if ip_address in ip_cache:
return ip_cache[ip_address]
try:
url = f"https://freeipapi.com/api/json/{ip_address}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
ip_cache[ip_address] = data
return data
except requests.RequestException:
return {"error": f"Failed to retrieve data for {ip_address}"}
return {"error": "Request failed"}
# Read last 20 unique IPs from ip.txt, skipping 192.x.x.x
def read_ips():
unique_ips = deque(maxlen=20)
try:
with open("ip.txt", "r", encoding='utf-8') as file:
lines = file.readlines()
for line in lines[-20:]: # Last 20 lines
ip = line.strip()
if (ip and
not ip.startswith("192") and
ip not in unique_ips):
unique_ips.append(ip)
except FileNotFoundError:
print("ip.txt not found")
return list(unique_ips)
# Load blacklist
def load_blacklist():
try:
with open("blacklist.txt", "r", encoding='utf-8') as file:
return set(line.strip() for line in file if line.strip())
except FileNotFoundError:
print("blacklist.txt not found")
return set()
# Save malicious IP
def save_malicious_ip(ip):
try:
with open("malicious.txt", "a", encoding='utf-8') as file:
file.write(f"{ip}\n")
except IOError:
print("Error writing to malicious.txt")
def open_google_maps(lat, lon):
url = f"https://www.google.com/maps?q={lat},{lon}"
webbrowser.open(url)
class IPDetailsApp(QWidget):
def __init__(self):
super().__init__()
self.blacklist = load_blacklist()
self.init_ui()
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_ip_details)
self.timer.start(15000) # 15 seconds
def init_ui(self):
self.setWindowTitle("IP Monitor")
self.setGeometry(100, 100, 900, 700)
# Bright theme with Qt-compatible styling
self.setStyleSheet("""
QWidget {
background-color: #F5F6F5;
color: #2D3436;
font-family: Arial;
}
QPushButton {
background-color: #e3df09;
color: red;
border-radius: 8px;
padding: 8px;
font-weight: bold;
}
QPushButton:hover {
background-color: #e35509;
}
QFrame {
background-color: white;
border-radius: 10px;
border: 1px solid #DFE6E9;
margin: 5px;
}
""")
main_layout = QVBoxLayout()
self.scroll_area = QScrollArea()
self.scroll_area.setWidgetResizable(True)
self.scroll_content = QWidget()
self.scroll_layout = QVBoxLayout(self.scroll_content)
self.scroll_area.setWidget(self.scroll_content)
main_layout.addWidget(self.scroll_area)
self.setLayout(main_layout)
self.update_ip_details()
def update_ip_details(self):
unique_ips = read_ips()
# Clear old widgets
while self.scroll_layout.count():
item = self.scroll_layout.takeAt(0)
if item.widget():
item.widget().deleteLater()
for ip in unique_ips:
data = fetch_ip_data(ip)
is_malicious = ip in self.blacklist
if is_malicious:
save_malicious_ip(ip)
# Use QFrame with raised effect instead of box-shadow
ip_frame = QFrame()
ip_frame.setFrameShape(QFrame.Box)
ip_frame.setFrameShadow(QFrame.Raised)
ip_frame.setLineWidth(1)
ip_frame.setStyleSheet("""
border: 1px solid #DFE6E9;
background-color: white;
border-radius: 10px;
""")
ip_layout = QVBoxLayout(ip_frame)
# IP Header
status_color = "#E84393" if is_malicious else "#00B894"
ip_label = QLabel(f"IP: {ip} [{'Malicious' if is_malicious else 'Safe'}]")
ip_label.setFont(QFont('Arial', 12, QFont.Bold))
ip_label.setStyleSheet(f"color: {status_color}; padding: 5px;")
ip_layout.addWidget(ip_label)
# Details
details_text = ""
if "error" not in data:
details_text = "\n".join([
f"Country: {data.get('countryName', 'N/A')}",
f"City: {data.get('cityName', 'N/A')}",
f"ISP: {data.get('isp', 'N/A')}",
f"Lat: {data.get('latitude', 'N/A')}",
f"Lon: {data.get('longitude', 'N/A')}"
])
else:
details_text = data["error"]
details_label = QLabel(details_text)
details_label.setFont(QFont('Arial', 10))
details_label.setStyleSheet("color: #636E72; padding: 5px;")
ip_layout.addWidget(details_label)
# Google Maps button
if "latitude" in data and "longitude" in data:
maps_btn = QPushButton("View Location on Google Maps")
maps_btn.clicked.connect(
lambda ch, lat=data["latitude"], lon=data["longitude"]:
open_google_maps(lat, lon)
)
ip_layout.addWidget(maps_btn)
self.scroll_layout.addWidget(ip_frame)
self.scroll_layout.addStretch()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = IPDetailsApp()
window.show()
sys.exit(app.exec_())
Code Output
Code Summary
- Defines IPDetailsApp class using PyQt5 for IP monitoring
- Fetches IP geolocation data from freeipapi.com with caching
- Reads last 20 unique IPs from ip.txt, excluding 192.x.x.x
- Loads blacklist from blacklist.txt to identify malicious IPs
- Saves malicious IPs to malicious.txt
- Creates bright-themed GUI with scrollable IP details
- Displays IP status (Safe/Malicious) with country, city, ISP, and coordinates
- Provides Google Maps button for location visualization
- Updates IP details every 15 seconds using QTimer
- Styles widgets with custom colors and raised frame effects
Auto IP Blocking
import subprocess
# Path to the malicious IPs file
malicious_file_path = ".\malicious.txt"
def block_ip(ip):
"""Block a single IP address using netsh command."""
try:
# Block the IP address using the Windows netsh firewall command
command = f'netsh advfirewall firewall add rule name="Block {ip}" dir=in action=block protocol=any remoteip={ip}'
subprocess.run(command, shell=True, check=True)
print(f"Successfully blocked IP: {ip}")
except subprocess.CalledProcessError as e:
print(f"Failed to block IP {ip}: {e}")
def read_ips(file_path):
"""Read IPs from the given file."""
try:
with open(file_path, 'r') as file:
ips = file.readlines()
return [ip.strip() for ip in ips] # Remove extra spaces and newlines
except FileNotFoundError:
print(f"File not found: {file_path}")
return []
except Exception as e:
print(f"Error reading file: {e}")
return []
def main():
# Read the malicious IPs from the file
malicious_ips = read_ips(malicious_file_path)
# Check and block each IP
if malicious_ips:
for ip in malicious_ips:
if validate_ip(ip):
block_ip(ip)
else:
print(f"Invalid IP address format: {ip}")
else:
print("No malicious IPs found to block.")
def validate_ip(ip):
"""Validate the IP address format."""
import re
ip_pattern = r'^(\d{1,3}\.){3}\d{1,3}$' # Basic IPv4 regex
return bool(re.match(ip_pattern, ip))
if __name__ == "__main__":
main()
Code Output
Code Summary
- Defines script to block malicious IPs using Windows firewall
- Sets path to malicious.txt for IP list storage
- Implements block_ip function with netsh command
- Reads IPs from file with read_ips function
- Validates IP format using regex in validate_ip
- Runs main function to process and block IPs
- Handles file not found and general exceptions
- Strips whitespace from IPs during file reading
- Prints success or failure messages for each IP
- Skips invalid IP addresses with validation check
DNS Protection
import subprocess
import sys
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QPushButton, QLabel, QLineEdit, QMessageBox
from PyQt5.QtCore import QPropertyAnimation, QRect
# Function to change DNS for the specified interface
def change_dns(primary_dns, secondary_dns):
interface_name = interface_entry.text()
if not interface_name:
QMessageBox.warning(window, "Input Error", "Please select a valid network interface.")
return
try:
print(f"Changing DNS for interface: {interface_name}")
# Change primary DNS
subprocess.run(
["netsh", "interface", "ip", "set", "dns", interface_name, "static", primary_dns],
check=True
)
# Add secondary DNS
subprocess.run(
["netsh", "interface", "ip", "add", "dns", interface_name, secondary_dns, "index=2"],
check=True
)
QMessageBox.information(window, "Success", f"Protection activated successfully!")
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e}")
QMessageBox.warning(window, "Error", f"An error occurred: {e}")
# Functions for different DNS options
def apply_quad9_dns():
change_dns("9.9.9.9", "149.112.112.112")
def apply_cleanbrowsing_dns():
change_dns("185.228.168.168", "185.228.169.169")
def apply_google_dns():
change_dns("8.8.8.8", "8.8.4.4")
# Function to revert to ISP's default DNS
def revert_to_isp_dns():
interface_name = interface_entry.text()
if not interface_name:
QMessageBox.warning(window, "Input Error", "Please select a valid network interface.")
return
try:
# Revert DNS to DHCP
subprocess.run(
["netsh", "interface", "ip", "set", "dns", interface_name, "dhcp"],
check=True
)
QMessageBox.information(window, "Success", "Reverted to ISP's DNS.")
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e}")
QMessageBox.warning(window, "Error", f"An error occurred: {e}")
# Create the application window
app = QApplication(sys.argv)
window = QWidget()
window.setWindowTitle("Protectify: Advance Firewall with DPI")
# Set Window Style
window.setStyleSheet("""
QWidget {
background-color: #2E2E2E;
color: #FFFFFF;
font-family: Arial, sans-serif;
}
QPushButton {
background-color: #4CAF50;
color: white;
border: none;
padding: 10px;
font-size: 14px;
margin-bottom: 10px;
border-radius: 5px;
}
QPushButton:hover {
background-color: #45a049;
}
QLabel {
font-size: 16px;
font-weight: bold;
margin-bottom: 20px;
}
QLineEdit {
background-color: #555555;
color: white;
padding: 10px;
border-radius: 5px;
font-size: 14px;
}
QLineEdit:focus {
border: 2px solid #4CAF50;
}
""")
# Create the layout
layout = QVBoxLayout()
# Set the default active interface to "WI-FI"
active_interface = "WI-FI" # Set to "WI-FI" by default
# Create and place the interface name label
interface_name_label = f"DNS Protection on: {active_interface}"
label = QLabel(interface_name_label)
layout.addWidget(label)
# Create and place the interface input field
interface_entry = QLineEdit()
interface_entry.setText(active_interface) # Pre-fill with the "WI-FI" interface
layout.addWidget(interface_entry)
# Create the DNS buttons
quad9_button = QPushButton("Malware+Tracker Protection")
quad9_button.clicked.connect(apply_quad9_dns)
layout.addWidget(quad9_button)
cleanbrowsing_button = QPushButton("Malware + Adult Content Protection")
cleanbrowsing_button.clicked.connect(apply_cleanbrowsing_dns)
layout.addWidget(cleanbrowsing_button)
google_button = QPushButton("General DNS Encryption")
google_button.clicked.connect(apply_google_dns)
layout.addWidget(google_button)
# Button to revert to ISP DNS
revert_button = QPushButton("Revert back to ISP DNS")
revert_button.clicked.connect(revert_to_isp_dns)
layout.addWidget(revert_button)
# Set the layout and display the window
window.setLayout(layout)
# Apply an animation to the window
animation = QPropertyAnimation(window, b"geometry")
animation.setDuration(1000)
animation.setStartValue(QRect(200, 200, 300, 300))
animation.setEndValue(QRect(500, 200, 400, 400))
animation.start()
# Set the window size and display the window
window.resize(400, 300)
window.show()
# Run the application
sys.exit(app.exec_())
Code Output
Code Summary
- Defines PyQt5 GUI for DNS protection configuration
- Changes DNS settings using netsh commands
- Offers Malware protection, Adult content protections.
- Reverts to ISP DNS with DHCP setting
- Sets default interface to "WI-FI" in input field
- Implements dark theme with custom styling
- Includes error handling with message boxes
- Features animated window entrance effect
- Provides buttons for different DNS protection types
- Validates interface input before DNS changes