comments added

This commit is contained in:
2025-04-06 16:18:20 -04:00
parent 2e511ca428
commit f027d9105d
9 changed files with 237 additions and 42 deletions

View File

@@ -1,4 +1,4 @@
from flask import current_app
from flask import current_app, request, redirect, url_for, flash
import mysql.connector
from datetime import datetime, timedelta, timezone
import requests
@@ -7,8 +7,8 @@ import os
import pytz
from db_connection import get_connection
def get_all_users():
"""Retrieve all users with associated group and vendor information."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
@@ -26,10 +26,8 @@ def get_all_users():
conn.close()
return users
def get_all_groups():
"""Retrieve all groups along with user count for each group."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
@@ -44,19 +42,8 @@ def get_all_groups():
conn.close()
return available_groups
def get_group_by_name(name):
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM groups WHERE name = %s", (name,))
group = cursor.fetchone()
cursor.close()
conn.close()
return group
def add_group(vlan_id, description):
"""Insert a new group with a specified VLAN ID and description."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("INSERT INTO groups (vlan_id, description) VALUES (%s, %s)", (vlan_id, description))
@@ -64,8 +51,8 @@ def add_group(vlan_id, description):
cursor.close()
conn.close()
def update_group_description(vlan_id, description):
"""Update the description for a given MAC address in the users table."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE groups SET description = %s WHERE vlan_id = %s", (description, vlan_id))
@@ -73,17 +60,24 @@ def update_group_description(vlan_id, description):
cursor.close()
conn.close()
def delete_group(vlan_id):
def delete_group(vlan_id, force_delete=False):
"""Delete a group, and optionally its associated users if force_delete=True."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM groups WHERE vlan_id = %s", (vlan_id,))
conn.commit()
cursor.close()
conn.close()
try:
if force_delete:
cursor.execute("DELETE FROM users WHERE vlan_id = %s", (vlan_id,))
cursor.execute("DELETE FROM groups WHERE vlan_id = %s", (vlan_id,))
conn.commit()
except mysql.connector.IntegrityError as e:
print(f"❌ Cannot delete group '{vlan_id}': it is still in use. Error: {e}")
raise
finally:
cursor.close()
conn.close()
def duplicate_group(vlan_id):
"""Create a duplicate of a group with an incremented VLAN ID."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT vlan_id, description FROM groups WHERE vlan_id = %s", (vlan_id,))
@@ -98,8 +92,8 @@ def duplicate_group(vlan_id):
cursor.close()
conn.close()
def add_user(mac_address, description, vlan_id):
"""Insert a new user with MAC address, description, and VLAN assignment."""
print(f"→ Adding to DB: mac={mac_address}, desc={description}, vlan={vlan_id}")
conn = get_connection()
cursor = conn.cursor()
@@ -111,8 +105,8 @@ def add_user(mac_address, description, vlan_id):
cursor.close()
conn.close()
def update_user_description(mac_address, description):
"""Update the description field of a user identified by MAC address."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE users SET description = %s WHERE mac_address = %s", (description, mac_address.lower()))
@@ -120,8 +114,8 @@ def update_user_description(mac_address, description):
cursor.close()
conn.close()
def update_user_vlan(mac_address, vlan_id):
"""Update the VLAN ID for a given MAC address in the users table."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE users SET vlan_id = %s WHERE mac_address = %s", (vlan_id, mac_address.lower()))
@@ -129,8 +123,8 @@ def update_user_vlan(mac_address, vlan_id):
cursor.close()
conn.close()
def delete_user(mac_address):
"""Remove a user from the database by their MAC address."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM users WHERE mac_address = %s", (mac_address.lower(),))
@@ -138,8 +132,8 @@ def delete_user(mac_address):
cursor.close()
conn.close()
def get_latest_auth_logs(reply_type=None, limit=5, time_range=None, offset=0):
"""Retrieve recent authentication logs filtered by reply type and time range."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
@@ -181,6 +175,7 @@ def get_latest_auth_logs(reply_type=None, limit=5, time_range=None, offset=0):
return logs
def count_auth_logs(reply_type=None, time_range=None):
"""Count the number of authentication logs matching a reply type and time."""
conn = get_connection()
cursor = conn.cursor()
@@ -218,6 +213,7 @@ def count_auth_logs(reply_type=None, time_range=None):
return count
def get_summary_counts():
"""Return total counts of users and groups from the database."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
@@ -232,6 +228,7 @@ def get_summary_counts():
return total_users, total_groups
def update_description(mac_address, description):
"""Update the description for a given MAC address in the users table."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
@@ -243,6 +240,7 @@ def update_description(mac_address, description):
conn.close()
def update_vlan(mac_address, vlan_id):
"""Update the VLAN ID for a given MAC address in the users table."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
@@ -254,6 +252,7 @@ def update_vlan(mac_address, vlan_id):
conn.close()
def refresh_vendors():
"""Fetch and cache vendor info for unknown MAC prefixes using the API."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
@@ -328,6 +327,7 @@ def refresh_vendors():
conn.close()
def lookup_mac_verbose(mac):
"""Look up vendor info for a MAC with verbose output, querying API if needed."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
output = []
@@ -382,6 +382,7 @@ def lookup_mac_verbose(mac):
return "\n".join(output)
def get_user_by_mac(mac_address):
"""Retrieve a user record from the database by MAC address."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
@@ -394,6 +395,7 @@ def get_user_by_mac(mac_address):
return user
def get_known_mac_vendors():
"""Fetch all known MAC prefixes and their vendor info from the local database."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT mac_prefix, vendor_name, status FROM mac_vendors")
@@ -410,6 +412,7 @@ def get_known_mac_vendors():
}
def get_vendor_info(mac, insert_if_found=True):
"""Get vendor info for a MAC address, optionally inserting into the database."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
prefix = mac.lower().replace(":", "").replace("-", "")[:6]
@@ -522,3 +525,45 @@ def get_vendor_info(mac, insert_if_found=True):
cursor.close()
conn.close()
def delete_group_route():
"""Handle deletion of a group and optionally its users via form POST."""
vlan_id = request.form.get("group_id")
force = request.form.get("force_delete") == "true"
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM users WHERE vlan_id = %s", (vlan_id,))
user_count = cursor.fetchone()[0]
if user_count > 0 and not force:
conn.close()
flash("Group has users. Please confirm deletion or reassign users.", "error")
return redirect(url_for("group.group_list"))
try:
if force:
cursor.execute("DELETE FROM users WHERE vlan_id = %s", (vlan_id,))
cursor.execute("DELETE FROM groups WHERE vlan_id = %s", (vlan_id,))
conn.commit()
flash(f"Group {vlan_id} and associated users deleted." if force else f"Group {vlan_id} deleted.", "success")
except mysql.connector.IntegrityError as e:
flash(f"Cannot delete group {vlan_id}: it is still in use. Error: {e}", "error")
except Exception as e:
flash(f"Error deleting group: {e}", "error")
finally:
cursor.close()
conn.close()
return redirect(url_for("group.group_list"))
def get_users_by_vlan_id(vlan_id):
"""Fetch users assigned to a specific VLAN ID."""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT mac_address, description FROM users WHERE vlan_id = %s", (vlan_id,))
users = cursor.fetchall()
cursor.close()
conn.close()
return users