LOTS of changes

This commit is contained in:
2025-04-01 10:12:38 -04:00
parent 519aabc0a6
commit 173c8c2c99
27 changed files with 1548 additions and 1684 deletions

3
app/views/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .index_views import index # not index_views
from .user_views import user
from .group_views import group

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

201
app/views/group_views.py Normal file
View File

@@ -0,0 +1,201 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from database import get_db
import mysql.connector
group = Blueprint('group', __name__)
@group.route('/groups')
def groups():
db = get_db()
if db:
cursor = db.cursor()
try:
cursor.execute("SELECT DISTINCT groupname FROM radgroupcheck")
group_names = [row[0] for row in cursor.fetchall()]
grouped_results = {}
for groupname in group_names:
cursor.execute("SELECT id, attribute, op, value FROM radgroupreply WHERE groupname = %s", (groupname,))
attributes = cursor.fetchall()
grouped_results[groupname] = [
{'id': row[0], 'attribute': row[1], 'op': row[2], 'value': row[3]}
for row in attributes
]
cursor.close()
db.close()
return render_template('group_list_nested.html', grouped_results=grouped_results)
except mysql.connector.Error as err:
print(f"Database Error: {err}")
cursor.close()
db.close()
return render_template('group_list_nested.html', grouped_results={})
return "Database Connection Failed"
@group.route('/save_group', methods=['POST'])
def save_group():
data = request.get_json()
groupname = data.get('groupname')
attributes = data.get('attributes')
if not groupname or not attributes:
return jsonify({'error': 'Group name and attributes are required'}), 400
db = get_db()
cursor = db.cursor()
try:
# Prevent duplicates
cursor.execute("SELECT 1 FROM radgroupcheck WHERE groupname = %s", (groupname,))
if cursor.fetchone():
return jsonify({'error': f'Group name "{groupname}" already exists'}), 400
# Insert baseline group rule
cursor.execute("""
INSERT INTO radgroupcheck (groupname, attribute, op, value)
VALUES (%s, 'Auth-Type', ':=', 'Accept')
""", (groupname,))
# Insert attributes
for attr in attributes:
cursor.execute("""
INSERT INTO radgroupreply (groupname, attribute, op, value)
VALUES (%s, %s, %s, %s)
""", (groupname, attr['attribute'], attr['op'], attr['value']))
db.commit()
cursor.close()
db.close()
return jsonify({'success': True})
except Exception as e:
db.rollback()
cursor.close()
db.close()
return jsonify({'error': str(e)}), 500
@group.route('/update_group_name', methods=['POST'])
def update_group_name():
old_groupname = request.form.get('oldGroupName')
new_groupname = request.form.get('newGroupName')
if not old_groupname or not new_groupname:
return jsonify({'error': 'Both old and new group names are required'}), 400
db = get_db()
cursor = db.cursor()
try:
cursor.execute("UPDATE radgroupcheck SET groupname=%s WHERE groupname=%s", (new_groupname, old_groupname))
cursor.execute("UPDATE radgroupreply SET groupname=%s WHERE groupname=%s", (new_groupname, old_groupname))
cursor.execute("UPDATE radusergroup SET groupname=%s WHERE groupname=%s", (new_groupname, old_groupname))
db.commit()
cursor.close()
db.close()
return jsonify({'success': True}), 200
except Exception as e:
db.rollback()
cursor.close()
db.close()
return jsonify({'error': str(e)}), 500
@group.route('/update_attribute', methods=['POST'])
def update_attribute():
attribute_id = request.form.get('attributeId')
attribute = request.form.get('attribute')
op = request.form.get('op')
value = request.form.get('value')
if not attribute_id or not attribute or not op or not value:
return jsonify({'error': 'All fields are required'}), 400
db = get_db()
cursor = db.cursor()
try:
cursor.execute("""
UPDATE radgroupreply
SET attribute=%s, op=%s, value=%s
WHERE id=%s
""", (attribute, op, value, attribute_id))
db.commit()
cursor.close()
db.close()
return jsonify({'success': True}), 200
except Exception as e:
db.rollback()
cursor.close()
db.close()
return jsonify({'error': str(e)}), 500
@group.route('/delete_group_rows/<groupname>')
def delete_group_rows(groupname):
db = get_db()
if db:
cursor = db.cursor()
try:
cursor.execute("DELETE FROM radgroupreply WHERE groupname = %s", (groupname,))
cursor.execute("DELETE FROM radusergroup WHERE groupname = %s", (groupname,))
cursor.execute("DELETE FROM radgroupcheck WHERE groupname = %s", (groupname,))
db.commit()
cursor.close()
db.close()
return redirect(url_for('group.groups'))
except mysql.connector.Error as err:
db.rollback()
cursor.close()
db.close()
return redirect(url_for('group.groups'))
return "Database Connection Failed"
@group.route('/delete_group/<int:group_id>')
def delete_group(group_id):
db = get_db()
if db:
cursor = db.cursor()
try:
cursor.execute("DELETE FROM radgroupreply WHERE id = %s", (group_id,))
cursor.execute("DELETE FROM radgroupcheck WHERE id = %s", (group_id,))
db.commit()
cursor.close()
db.close()
return redirect(url_for('group.groups'))
except mysql.connector.Error as err:
db.rollback()
cursor.close()
db.close()
return redirect(url_for('group.groups'))
return "Database Connection Failed"
@group.route('/duplicate_group', methods=['POST'])
def duplicate_group():
groupname = request.form.get('groupname')
if not groupname:
return jsonify({'error': 'Group name is required'}), 400
db = get_db()
cursor = db.cursor()
try:
cursor.execute("SELECT attribute, op, value FROM radgroupreply WHERE groupname = %s", (groupname,))
attributes = cursor.fetchall()
if not attributes:
return jsonify({'error': f'Group "{groupname}" not found or has no attributes'}), 404
new_groupname = f"Copy of {groupname}"
count = 1
while True:
cursor.execute("SELECT 1 FROM radgroupcheck WHERE groupname = %s", (new_groupname,))
if not cursor.fetchone():
break
count += 1
new_groupname = f"Copy of {groupname} ({count})"
attr_list = [{'attribute': row[0], 'op': row[1], 'value': row[2]} for row in attributes]
cursor.close()
db.close()
return jsonify({'new_groupname': new_groupname, 'attributes': attr_list})
except Exception as e:
cursor.close()
db.close()
return jsonify({'error': str(e)}), 500

187
app/views/index_views.py Normal file
View File

@@ -0,0 +1,187 @@
from flask import Blueprint, render_template, request, jsonify
from database import get_db
from datetime import datetime
import requests
index = Blueprint('index', __name__)
OUI_API_URL = 'https://api.maclookup.app/v2/macs/{}'
def time_ago(dt):
now = datetime.now()
diff = now - dt
seconds = int(diff.total_seconds())
if seconds < 60:
return f"{seconds}s ago"
elif seconds < 3600:
return f"{seconds//60}m{seconds%60}s ago"
elif seconds < 86400:
return f"{seconds//3600}h{(seconds%3600)//60}m ago"
else:
return f"{seconds//86400}d{(seconds%86400)//3600}h ago"
def lookup_vendor(mac):
prefix = mac.replace(":", "").replace("-", "").upper()[:6]
db = get_db()
cursor = db.cursor(dictionary=True)
# Try local DB first
cursor.execute("SELECT vendor_name FROM mac_vendor_cache WHERE mac_prefix = %s", (prefix,))
result = cursor.fetchone()
if result and result['vendor_name'] != "Unknown Vendor":
return {"source": "local", "prefix": prefix, "vendor": result['vendor_name']}
# Try API fallback
try:
api_url = OUI_API_URL.format(mac)
r = requests.get(api_url, timeout=3)
if r.status_code == 200:
data = r.json()
vendor = data.get("company", "Unknown Vendor")
# Save to DB
cursor.execute("""
INSERT INTO mac_vendor_cache (mac_prefix, vendor_name, last_updated)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE vendor_name = VALUES(vendor_name), last_updated = NOW()
""", (prefix, vendor))
db.commit()
return {"source": "api", "prefix": prefix, "vendor": vendor, "raw": data}
else:
return {"source": "api", "prefix": prefix, "error": f"API returned status {r.status_code}", "raw": r.text}
except Exception as e:
return {"source": "api", "prefix": prefix, "error": str(e)}
finally:
cursor.close()
@index.route('/')
def homepage():
db = get_db()
latest_accept = []
latest_reject = []
total_users = 0
total_groups = 0
if db:
cursor = db.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) AS count FROM radcheck")
total_users = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(DISTINCT groupname) AS count FROM radgroupcheck")
total_groups = cursor.fetchone()['count']
cursor.execute("""
SELECT p.username, d.description, p.reply, p.authdate
FROM radpostauth p
LEFT JOIN rad_description d ON p.username = d.username
WHERE p.reply = 'Access-Accept'
ORDER BY p.authdate DESC LIMIT 5
""")
latest_accept = cursor.fetchall()
for row in latest_accept:
row['ago'] = time_ago(row['authdate'])
cursor.execute("""
SELECT p.username, d.description, p.reply, p.authdate
FROM radpostauth p
LEFT JOIN rad_description d ON p.username = d.username
WHERE p.reply = 'Access-Reject'
ORDER BY p.authdate DESC LIMIT 5
""")
latest_reject = cursor.fetchall()
for row in latest_reject:
row['ago'] = time_ago(row['authdate'])
cursor.close()
db.close()
return render_template('index.html',
total_users=total_users,
total_groups=total_groups,
latest_accept=latest_accept,
latest_reject=latest_reject)
@index.route('/stats')
def stats():
db = get_db()
accept_entries = []
reject_entries = []
available_groups = []
if db:
cursor = db.cursor(dictionary=True)
# Fetch available VLANs
cursor.execute("SELECT DISTINCT groupname FROM radgroupcheck ORDER BY groupname")
available_groups = [row['groupname'] for row in cursor.fetchall()]
# Get existing users and map to group
cursor.execute("""
SELECT r.username, g.groupname
FROM radcheck r
LEFT JOIN radusergroup g ON r.username = g.username
""")
existing_user_map = {
row['username'].replace(":", "").replace("-", "").upper(): row['groupname']
for row in cursor.fetchall()
}
# Access-Reject entries
cursor.execute("""
SELECT p.username, d.description, p.reply, p.authdate
FROM radpostauth p
LEFT JOIN rad_description d ON p.username = d.username
WHERE p.reply = 'Access-Reject'
ORDER BY p.authdate DESC LIMIT 25
""")
reject_entries = cursor.fetchall()
for row in reject_entries:
normalized = row['username'].replace(":", "").replace("-", "").upper()
row['vendor'] = lookup_vendor(row['username'])['vendor']
row['ago'] = time_ago(row['authdate'])
if normalized in existing_user_map:
row['already_exists'] = True
row['existing_vlan'] = existing_user_map[normalized]
else:
row['already_exists'] = False
row['existing_vlan'] = None
print(f"⚠ Not found in radcheck: {row['username']}{normalized}")
# Access-Accept entries
cursor.execute("""
SELECT p.username, d.description, p.reply, p.authdate
FROM radpostauth p
LEFT JOIN rad_description d ON p.username = d.username
WHERE p.reply = 'Access-Accept'
ORDER BY p.authdate DESC LIMIT 25
""")
accept_entries = cursor.fetchall()
for row in accept_entries:
row['vendor'] = lookup_vendor(row['username'])['vendor']
row['ago'] = time_ago(row['authdate'])
cursor.close()
db.close()
return render_template('stats.html',
accept_entries=accept_entries,
reject_entries=reject_entries,
available_groups=available_groups)
@index.route('/lookup_mac', methods=['POST'])
def lookup_mac():
mac = request.form.get('mac', '').strip()
if not mac:
return jsonify({"error": "MAC address is required"}), 400
return jsonify(lookup_vendor(mac))

276
app/views/user_views.py Normal file
View File

@@ -0,0 +1,276 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify, flash
from database import get_db
import mysql.connector, os, time, requests
user = Blueprint('user', __name__) # ✅ Blueprint name = "user"
@user.route('/user_list')
def user_list():
db = get_db()
if db is None:
return "Database connection failed", 500
cursor = db.cursor(dictionary=True)
try:
# Get user info
cursor.execute("""
SELECT
r.username AS mac_address,
rd.description AS description,
ug.groupname AS vlan_id,
mvc.vendor_name AS vendor
FROM radcheck r
LEFT JOIN radusergroup ug ON r.username = ug.username
LEFT JOIN rad_description rd ON r.username = rd.username
LEFT JOIN mac_vendor_cache mvc ON UPPER(REPLACE(REPLACE(r.username, ':', ''), '-', '')) LIKE CONCAT(mvc.mac_prefix, '%')
""")
results = cursor.fetchall()
# Get available groups
cursor.execute("SELECT groupname FROM radgroupcheck")
groups = [{'groupname': row['groupname']} for row in cursor.fetchall()]
cursor.close()
db.close()
return render_template('user_list_inline_edit.html', results=results, groups=groups)
except mysql.connector.Error as e:
print(f"Database error: {e}")
cursor.close()
db.close()
return "Database error", 500
@user.route('/update_user', methods=['POST'])
def update_user():
mac_address = request.form['mac_address']
description = request.form['description']
vlan_id = request.form['vlan_id']
new_mac_address = request.form.get('new_mac_address')
db = get_db()
if db:
cursor = db.cursor()
try:
db.autocommit = False
if new_mac_address and new_mac_address != mac_address:
cursor.execute("""
UPDATE radcheck
SET username = %s, value = %s
WHERE username = %s
""", (new_mac_address, new_mac_address, mac_address))
cursor.execute("""
UPDATE rad_description
SET username = %s, description = %s
WHERE username = %s
""", (new_mac_address, description, mac_address))
cursor.execute("""
UPDATE radusergroup
SET username = %s, groupname = %s
WHERE username = %s
""", (new_mac_address, vlan_id, mac_address))
else:
cursor.execute("""
UPDATE rad_description
SET description = %s
WHERE username = %s
""", (description, mac_address))
cursor.execute("""
UPDATE radusergroup
SET groupname = %s
WHERE username = %s
""", (vlan_id, mac_address))
db.commit()
db.autocommit = True
cursor.close()
return "success"
except Exception as e:
db.rollback()
db.autocommit = True
cursor.close()
return str(e)
finally:
db.close()
return "Database Connection Failed"
@user.route('/delete_user/<mac_address>')
def delete_user(mac_address):
db = get_db()
if db:
cursor = db.cursor()
try:
db.autocommit = False
cursor.execute("DELETE FROM rad_description WHERE username = %s", (mac_address,))
cursor.execute("DELETE FROM radcheck WHERE username = %s", (mac_address,))
cursor.execute("DELETE FROM radusergroup WHERE username = %s", (mac_address,))
db.commit()
cursor.close()
db.close()
return redirect(url_for('user.user_list'))
except mysql.connector.Error as err:
print(f"Database Error: {err}")
db.rollback()
cursor.close()
db.close()
return redirect(url_for('user.user_list'))
return "Database Connection Failed"
@user.route('/add_user', methods=['POST'])
def add_user():
try:
data = request.get_json()
mac_address = data.get('mac_address')
description = data.get('description')
vlan_id = data.get('vlan_id')
if not mac_address:
return jsonify({'success': False, 'message': 'MAC Address is required'}), 400
db = get_db()
if db is None:
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
cursor = db.cursor()
try:
db.autocommit = False
cursor.execute("SELECT username FROM radcheck WHERE username = %s", (mac_address,))
if cursor.fetchone():
return jsonify({'success': False, 'message': 'User already exists'}), 400
cursor.execute("""
INSERT INTO radcheck (username, attribute, op, value)
VALUES (%s, 'Cleartext-Password', ':=', %s)
""", (mac_address, mac_address))
cursor.execute("""
INSERT INTO rad_description (username, description)
VALUES (%s, %s)
""", (mac_address, description))
cursor.execute("""
INSERT INTO radusergroup (username, groupname)
VALUES (%s, %s)
""", (mac_address, vlan_id))
db.commit()
db.autocommit = True
cursor.close()
db.close()
return jsonify({'success': True, 'message': 'User added successfully'})
except Exception as e:
db.rollback()
db.autocommit = True
cursor.close()
db.close()
return jsonify({'success': False, 'message': str(e)}), 500
except Exception:
return jsonify({'success': False, 'message': 'Unknown error'}), 500
@user.route('/add_from_reject', methods=['POST'])
def add_from_reject():
username = request.form.get('username')
groupname = request.form.get('groupname')
if not username or not groupname:
flash("Missing MAC address or group", "error")
return redirect(url_for('index.stats'))
db = get_db()
cursor = db.cursor()
try:
db.autocommit = False
# Check if already exists
cursor.execute("SELECT username FROM radcheck WHERE username = %s", (username,))
if cursor.fetchone():
flash(f"{username} already exists", "info")
else:
cursor.execute("""
INSERT INTO radcheck (username, attribute, op, value)
VALUES (%s, 'Cleartext-Password', ':=', %s)
""", (username, username))
cursor.execute("""
INSERT INTO rad_description (username, description)
VALUES (%s, '')
""", (username,))
cursor.execute("""
INSERT INTO radusergroup (username, groupname)
VALUES (%s, %s)
""", (username, groupname))
db.commit()
flash(f"{username} added to group {groupname}", "success")
except Exception as e:
db.rollback()
flash(f"Error: {str(e)}", "error")
finally:
db.autocommit = True
cursor.close()
db.close()
return redirect(url_for('index.stats'))
@user.route('/refresh_vendors', methods=['POST'])
def refresh_vendors():
db = get_db()
cursor = db.cursor(dictionary=True)
api_url = os.getenv('MACLOOKUP_API_URL', 'https://api.maclookup.app/v2/macs/{}').strip('"')
api_key = os.getenv('MACLOOKUP_API_KEY', '').strip('"')
limit = int(os.getenv('MACLOOKUP_RATE_LIMIT', 2))
headers = {'Authorization': f'Bearer {api_key}'} if api_key else {}
cursor.execute("""
SELECT r.username
FROM radcheck r
LEFT JOIN mac_vendor_cache m ON UPPER(REPLACE(REPLACE(r.username, ':', ''), '-', '')) LIKE CONCAT(m.mac_prefix, '%')
WHERE m.vendor_name IS NULL OR m.vendor_name = 'Unknown Vendor'
LIMIT 5
""")
entries = cursor.fetchall()
if not entries:
cursor.close()
db.close()
return jsonify({"success": True, "updated": 0, "remaining": False})
updated = 0
for entry in entries:
mac = entry['username']
prefix = mac.replace(':', '').replace('-', '').upper()[:6]
try:
r = requests.get(api_url.format(mac), headers=headers, timeout=3)
if r.status_code == 200:
data = r.json()
vendor = data.get("company", "not found")
cursor.execute("""
INSERT INTO mac_vendor_cache (mac_prefix, vendor_name, last_updated)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE vendor_name = VALUES(vendor_name), last_updated = NOW()
""", (prefix, vendor))
db.commit()
updated += 1
except Exception as e:
print(f"Error for {mac}: {e}")
time.sleep(1 / limit)
cursor.close()
db.close()
return jsonify({"success": True, "updated": updated, "remaining": True})