getting there
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,201 +1,33 @@
|
||||
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
|
||||
from database import get_db
|
||||
import mysql.connector
|
||||
from flask import Blueprint, render_template, request, redirect, url_for
|
||||
from db_interface import get_all_groups, add_group, update_group_description, delete_group
|
||||
|
||||
group = Blueprint('group', __name__)
|
||||
group = Blueprint('group', __name__, url_prefix='/group')
|
||||
|
||||
@group.route('/groups')
|
||||
def groups():
|
||||
db = get_db()
|
||||
if db:
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
cursor.execute("SELECT DISTINCT groupname FROM radgroupcheck")
|
||||
group_names = [row[0] for row in cursor.fetchall()]
|
||||
grouped_results = {}
|
||||
|
||||
for groupname in group_names:
|
||||
cursor.execute("SELECT id, attribute, op, value FROM radgroupreply WHERE groupname = %s", (groupname,))
|
||||
attributes = cursor.fetchall()
|
||||
grouped_results[groupname] = [
|
||||
{'id': row[0], 'attribute': row[1], 'op': row[2], 'value': row[3]}
|
||||
for row in attributes
|
||||
]
|
||||
@group.route('/')
|
||||
def group_list():
|
||||
groups = get_all_groups()
|
||||
return render_template('group_list.html', groups=groups)
|
||||
|
||||
cursor.close()
|
||||
db.close()
|
||||
return render_template('group_list.html', grouped_results=grouped_results)
|
||||
|
||||
except mysql.connector.Error as err:
|
||||
print(f"Database Error: {err}")
|
||||
cursor.close()
|
||||
db.close()
|
||||
return render_template('group_list.html', grouped_results={})
|
||||
return "Database Connection Failed"
|
||||
@group.route('/add', methods=['POST'])
|
||||
def add_group_route():
|
||||
vlan_id = request.form['vlan_id']
|
||||
desc = request.form.get('description', '')
|
||||
add_group(vlan_id, desc)
|
||||
return redirect(url_for('group.group_list'))
|
||||
|
||||
@group.route('/save_group', methods=['POST'])
|
||||
def save_group():
|
||||
data = request.get_json()
|
||||
groupname = data.get('groupname')
|
||||
attributes = data.get('attributes')
|
||||
|
||||
if not groupname or not attributes:
|
||||
return jsonify({'error': 'Group name and attributes are required'}), 400
|
||||
@group.route('/update_description', methods=['POST'])
|
||||
def update_description_route():
|
||||
group_id = request.form['group_id']
|
||||
desc = request.form.get('description', '')
|
||||
update_group_description(group_id, desc)
|
||||
return redirect(url_for('group.group_list'))
|
||||
|
||||
db = get_db()
|
||||
cursor = db.cursor()
|
||||
|
||||
try:
|
||||
# Prevent duplicates
|
||||
cursor.execute("SELECT 1 FROM radgroupcheck WHERE groupname = %s", (groupname,))
|
||||
if cursor.fetchone():
|
||||
return jsonify({'error': f'Group name "{groupname}" already exists'}), 400
|
||||
|
||||
# Insert baseline group rule
|
||||
cursor.execute("""
|
||||
INSERT INTO radgroupcheck (groupname, attribute, op, value)
|
||||
VALUES (%s, 'Auth-Type', ':=', 'Accept')
|
||||
""", (groupname,))
|
||||
|
||||
# Insert attributes
|
||||
for attr in attributes:
|
||||
cursor.execute("""
|
||||
INSERT INTO radgroupreply (groupname, attribute, op, value)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
""", (groupname, attr['attribute'], attr['op'], attr['value']))
|
||||
|
||||
db.commit()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'success': True})
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
@group.route('/update_group_name', methods=['POST'])
|
||||
def update_group_name():
|
||||
old_groupname = request.form.get('oldGroupName')
|
||||
new_groupname = request.form.get('newGroupName')
|
||||
|
||||
if not old_groupname or not new_groupname:
|
||||
return jsonify({'error': 'Both old and new group names are required'}), 400
|
||||
|
||||
db = get_db()
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
cursor.execute("UPDATE radgroupcheck SET groupname=%s WHERE groupname=%s", (new_groupname, old_groupname))
|
||||
cursor.execute("UPDATE radgroupreply SET groupname=%s WHERE groupname=%s", (new_groupname, old_groupname))
|
||||
cursor.execute("UPDATE radusergroup SET groupname=%s WHERE groupname=%s", (new_groupname, old_groupname))
|
||||
db.commit()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'success': True}), 200
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
@group.route('/update_attribute', methods=['POST'])
|
||||
def update_attribute():
|
||||
attribute_id = request.form.get('attributeId')
|
||||
attribute = request.form.get('attribute')
|
||||
op = request.form.get('op')
|
||||
value = request.form.get('value')
|
||||
|
||||
if not attribute_id or not attribute or not op or not value:
|
||||
return jsonify({'error': 'All fields are required'}), 400
|
||||
|
||||
db = get_db()
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
cursor.execute("""
|
||||
UPDATE radgroupreply
|
||||
SET attribute=%s, op=%s, value=%s
|
||||
WHERE id=%s
|
||||
""", (attribute, op, value, attribute_id))
|
||||
db.commit()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'success': True}), 200
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
@group.route('/delete_group_rows/<groupname>')
|
||||
def delete_group_rows(groupname):
|
||||
db = get_db()
|
||||
if db:
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
cursor.execute("DELETE FROM radgroupreply WHERE groupname = %s", (groupname,))
|
||||
cursor.execute("DELETE FROM radusergroup WHERE groupname = %s", (groupname,))
|
||||
cursor.execute("DELETE FROM radgroupcheck WHERE groupname = %s", (groupname,))
|
||||
db.commit()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return redirect(url_for('group.groups'))
|
||||
except mysql.connector.Error as err:
|
||||
db.rollback()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return redirect(url_for('group.groups'))
|
||||
return "Database Connection Failed"
|
||||
|
||||
@group.route('/delete_group/<int:group_id>')
|
||||
def delete_group(group_id):
|
||||
db = get_db()
|
||||
if db:
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
cursor.execute("DELETE FROM radgroupreply WHERE id = %s", (group_id,))
|
||||
cursor.execute("DELETE FROM radgroupcheck WHERE id = %s", (group_id,))
|
||||
db.commit()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return redirect(url_for('group.groups'))
|
||||
except mysql.connector.Error as err:
|
||||
db.rollback()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return redirect(url_for('group.groups'))
|
||||
return "Database Connection Failed"
|
||||
|
||||
@group.route('/duplicate_group', methods=['POST'])
|
||||
def duplicate_group():
|
||||
groupname = request.form.get('groupname')
|
||||
if not groupname:
|
||||
return jsonify({'error': 'Group name is required'}), 400
|
||||
|
||||
db = get_db()
|
||||
cursor = db.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute("SELECT attribute, op, value FROM radgroupreply WHERE groupname = %s", (groupname,))
|
||||
attributes = cursor.fetchall()
|
||||
if not attributes:
|
||||
return jsonify({'error': f'Group "{groupname}" not found or has no attributes'}), 404
|
||||
|
||||
new_groupname = f"Copy of {groupname}"
|
||||
count = 1
|
||||
while True:
|
||||
cursor.execute("SELECT 1 FROM radgroupcheck WHERE groupname = %s", (new_groupname,))
|
||||
if not cursor.fetchone():
|
||||
break
|
||||
count += 1
|
||||
new_groupname = f"Copy of {groupname} ({count})"
|
||||
|
||||
attr_list = [{'attribute': row[0], 'op': row[1], 'value': row[2]} for row in attributes]
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'new_groupname': new_groupname, 'attributes': attr_list})
|
||||
|
||||
except Exception as e:
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
@group.route('/delete', methods=['POST'])
|
||||
def delete_group_route():
|
||||
group_id = request.form['group_id']
|
||||
delete_group(group_id)
|
||||
return redirect(url_for('group.group_list'))
|
||||
|
||||
@@ -1,26 +1,27 @@
|
||||
from flask import Blueprint, render_template, request, jsonify, current_app
|
||||
from database import get_db
|
||||
from datetime import datetime
|
||||
import requests, pytz
|
||||
from db_interface import (
|
||||
get_connection,
|
||||
get_vendor_info,
|
||||
get_all_groups,
|
||||
get_latest_auth_logs,
|
||||
)
|
||||
import pytz
|
||||
|
||||
index = Blueprint('index', __name__)
|
||||
OUI_API_URL = 'https://api.maclookup.app/v2/macs/{}'
|
||||
|
||||
|
||||
import pytz # make sure it's imported if not already
|
||||
|
||||
def time_ago(dt):
|
||||
if not dt:
|
||||
return "n/a"
|
||||
|
||||
local_tz = current_app.config.get('TZ', pytz.utc)
|
||||
# Use configured timezone
|
||||
tz_name = current_app.config.get('APP_TIMEZONE', 'UTC')
|
||||
local_tz = pytz.timezone(tz_name)
|
||||
|
||||
# If the DB datetime is naive, assume it's already in local server time
|
||||
if dt.tzinfo is None:
|
||||
server_tz = pytz.timezone('America/Toronto') # Or your DB server's real timezone
|
||||
dt = server_tz.localize(dt)
|
||||
dt = dt.replace(tzinfo=pytz.utc)
|
||||
|
||||
# Convert to the app's configured timezone (from .env)
|
||||
dt = dt.astimezone(local_tz)
|
||||
now = datetime.now(local_tz)
|
||||
diff = now - dt
|
||||
@@ -29,91 +30,21 @@ def time_ago(dt):
|
||||
if seconds < 60:
|
||||
return f"{seconds}s ago"
|
||||
elif seconds < 3600:
|
||||
return f"{seconds//60}m{seconds%60}s ago"
|
||||
return f"{seconds // 60}m{seconds % 60}s ago"
|
||||
elif seconds < 86400:
|
||||
return f"{seconds//3600}h{(seconds%3600)//60}m ago"
|
||||
return f"{seconds // 3600}h{(seconds % 3600) // 60}m ago"
|
||||
else:
|
||||
return f"{seconds//86400}d{(seconds%86400)//3600}h ago"
|
||||
|
||||
|
||||
|
||||
def lookup_vendor(mac):
|
||||
prefix = mac.replace(":", "").replace("-", "").upper()[:6]
|
||||
db = get_db()
|
||||
cursor = db.cursor(dictionary=True)
|
||||
|
||||
# Try local DB first
|
||||
cursor.execute("SELECT vendor_name FROM mac_vendor_cache WHERE mac_prefix = %s", (prefix,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result and result['vendor_name'] != "Unknown Vendor":
|
||||
return {"source": "local", "prefix": prefix, "vendor": result['vendor_name']}
|
||||
|
||||
# Try API fallback
|
||||
try:
|
||||
api_url = OUI_API_URL.format(mac)
|
||||
r = requests.get(api_url, timeout=3)
|
||||
if r.status_code == 200:
|
||||
data = r.json()
|
||||
vendor = data.get("company", "Unknown Vendor")
|
||||
|
||||
# Save to DB
|
||||
cursor.execute("""
|
||||
INSERT INTO mac_vendor_cache (mac_prefix, vendor_name, last_updated)
|
||||
VALUES (%s, %s, NOW())
|
||||
ON DUPLICATE KEY UPDATE vendor_name = VALUES(vendor_name), last_updated = NOW()
|
||||
""", (prefix, vendor))
|
||||
db.commit()
|
||||
return {"source": "api", "prefix": prefix, "vendor": vendor, "raw": data}
|
||||
else:
|
||||
return {"source": "api", "prefix": prefix, "error": f"API returned status {r.status_code}", "raw": r.text}
|
||||
except Exception as e:
|
||||
return {"source": "api", "prefix": prefix, "error": str(e)}
|
||||
finally:
|
||||
cursor.close()
|
||||
return f"{seconds // 86400}d{(seconds % 86400) // 3600}h ago"
|
||||
|
||||
|
||||
@index.route('/')
|
||||
def homepage():
|
||||
db = get_db()
|
||||
latest_accept = []
|
||||
latest_reject = []
|
||||
total_users = 0
|
||||
total_groups = 0
|
||||
total_users, total_groups = get_summary_counts()
|
||||
latest_accept = get_latest_auth_logs('Access-Accept', limit=5)
|
||||
latest_reject = get_latest_auth_logs('Access-Reject', limit=5)
|
||||
|
||||
if db:
|
||||
cursor = db.cursor(dictionary=True)
|
||||
|
||||
cursor.execute("SELECT COUNT(*) AS count FROM radcheck")
|
||||
total_users = cursor.fetchone()['count']
|
||||
|
||||
cursor.execute("SELECT COUNT(DISTINCT groupname) AS count FROM radgroupcheck")
|
||||
total_groups = cursor.fetchone()['count']
|
||||
|
||||
cursor.execute("""
|
||||
SELECT p.username, d.description, p.reply, p.authdate
|
||||
FROM radpostauth p
|
||||
LEFT JOIN rad_description d ON p.username = d.username
|
||||
WHERE p.reply = 'Access-Accept'
|
||||
ORDER BY p.authdate DESC LIMIT 5
|
||||
""")
|
||||
latest_accept = cursor.fetchall()
|
||||
for row in latest_accept:
|
||||
row['ago'] = time_ago(row['authdate'])
|
||||
|
||||
cursor.execute("""
|
||||
SELECT p.username, d.description, p.reply, p.authdate
|
||||
FROM radpostauth p
|
||||
LEFT JOIN rad_description d ON p.username = d.username
|
||||
WHERE p.reply = 'Access-Reject'
|
||||
ORDER BY p.authdate DESC LIMIT 5
|
||||
""")
|
||||
latest_reject = cursor.fetchall()
|
||||
for row in latest_reject:
|
||||
row['ago'] = time_ago(row['authdate'])
|
||||
|
||||
cursor.close()
|
||||
db.close()
|
||||
for row in latest_accept + latest_reject:
|
||||
row['ago'] = time_ago(row['timestamp'])
|
||||
|
||||
return render_template('index.html',
|
||||
total_users=total_users,
|
||||
@@ -124,66 +55,15 @@ def homepage():
|
||||
|
||||
@index.route('/stats')
|
||||
def stats():
|
||||
db = get_db()
|
||||
accept_entries = []
|
||||
reject_entries = []
|
||||
available_groups = []
|
||||
accept_entries = get_latest_auth_logs('Access-Accept', limit=25)
|
||||
reject_entries = get_latest_auth_logs('Access-Reject', limit=25)
|
||||
available_groups = get_all_groups()
|
||||
|
||||
if db:
|
||||
cursor = db.cursor(dictionary=True)
|
||||
|
||||
# Fetch available VLANs
|
||||
cursor.execute("SELECT DISTINCT groupname FROM radgroupcheck ORDER BY groupname")
|
||||
available_groups = [row['groupname'] for row in cursor.fetchall()]
|
||||
|
||||
# Get existing users and map to group
|
||||
cursor.execute("""
|
||||
SELECT r.username, g.groupname
|
||||
FROM radcheck r
|
||||
LEFT JOIN radusergroup g ON r.username = g.username
|
||||
""")
|
||||
existing_user_map = {
|
||||
row['username'].replace(":", "").replace("-", "").upper(): row['groupname']
|
||||
for row in cursor.fetchall()
|
||||
}
|
||||
|
||||
# Access-Reject entries
|
||||
cursor.execute("""
|
||||
SELECT p.username, d.description, p.reply, p.authdate
|
||||
FROM radpostauth p
|
||||
LEFT JOIN rad_description d ON p.username = d.username
|
||||
WHERE p.reply = 'Access-Reject'
|
||||
ORDER BY p.authdate DESC LIMIT 25
|
||||
""")
|
||||
reject_entries = cursor.fetchall()
|
||||
for row in reject_entries:
|
||||
normalized = row['username'].replace(":", "").replace("-", "").upper()
|
||||
row['vendor'] = lookup_vendor(row['username'])['vendor']
|
||||
row['ago'] = time_ago(row['authdate'])
|
||||
|
||||
if normalized in existing_user_map:
|
||||
row['already_exists'] = True
|
||||
row['existing_vlan'] = existing_user_map[normalized]
|
||||
else:
|
||||
row['already_exists'] = False
|
||||
row['existing_vlan'] = None
|
||||
print(f"⚠ Not found in radcheck: {row['username']} → {normalized}")
|
||||
|
||||
# Access-Accept entries
|
||||
cursor.execute("""
|
||||
SELECT p.username, d.description, p.reply, p.authdate
|
||||
FROM radpostauth p
|
||||
LEFT JOIN rad_description d ON p.username = d.username
|
||||
WHERE p.reply = 'Access-Accept'
|
||||
ORDER BY p.authdate DESC LIMIT 25
|
||||
""")
|
||||
accept_entries = cursor.fetchall()
|
||||
for row in accept_entries:
|
||||
row['vendor'] = lookup_vendor(row['username'])['vendor']
|
||||
row['ago'] = time_ago(row['authdate'])
|
||||
|
||||
cursor.close()
|
||||
db.close()
|
||||
for entry in accept_entries + reject_entries:
|
||||
entry['ago'] = time_ago(entry['timestamp'])
|
||||
entry['vendor'] = get_vendor_info(entry['mac_address'])
|
||||
entry['already_exists'] = entry.get('vlan_id') is not None
|
||||
entry['existing_vlan'] = entry.get('vlan_id') if entry['already_exists'] else None
|
||||
|
||||
return render_template('stats.html',
|
||||
accept_entries=accept_entries,
|
||||
@@ -191,13 +71,24 @@ def stats():
|
||||
available_groups=available_groups)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@index.route('/lookup_mac', methods=['POST'])
|
||||
def lookup_mac():
|
||||
mac = request.form.get('mac', '').strip()
|
||||
if not mac:
|
||||
return jsonify({"error": "MAC address is required"}), 400
|
||||
|
||||
return jsonify(lookup_vendor(mac))
|
||||
return jsonify(get_vendor_info(mac))
|
||||
|
||||
|
||||
def get_summary_counts():
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
cursor.execute("SELECT COUNT(*) AS count FROM users")
|
||||
total_users = cursor.fetchone()['count']
|
||||
|
||||
cursor.execute("SELECT COUNT(*) AS count FROM groups")
|
||||
total_groups = cursor.fetchone()['count']
|
||||
|
||||
cursor.close()
|
||||
return total_users, total_groups
|
||||
|
||||
@@ -1,276 +1,49 @@
|
||||
from flask import Blueprint, render_template, request, redirect, url_for, jsonify, flash
|
||||
from database import get_db
|
||||
import mysql.connector, os, time, requests
|
||||
from flask import Blueprint, render_template, request, redirect, url_for
|
||||
from db_interface import get_all_users, get_all_groups, add_user, update_description, update_vlan, delete_user, refresh_vendors
|
||||
|
||||
user = Blueprint('user', __name__) # ✅ Blueprint name = "user"
|
||||
user = Blueprint('user', __name__, url_prefix='/user')
|
||||
|
||||
@user.route('/user_list')
|
||||
|
||||
@user.route('/')
|
||||
def user_list():
|
||||
db = get_db()
|
||||
if db is None:
|
||||
return "Database connection failed", 500
|
||||
|
||||
cursor = db.cursor(dictionary=True)
|
||||
try:
|
||||
# Get user info
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
r.username AS mac_address,
|
||||
rd.description AS description,
|
||||
ug.groupname AS vlan_id,
|
||||
mvc.vendor_name AS vendor
|
||||
FROM radcheck r
|
||||
LEFT JOIN radusergroup ug ON r.username = ug.username
|
||||
LEFT JOIN rad_description rd ON r.username = rd.username
|
||||
LEFT JOIN mac_vendor_cache mvc ON UPPER(REPLACE(REPLACE(r.username, ':', ''), '-', '')) LIKE CONCAT(mvc.mac_prefix, '%')
|
||||
""")
|
||||
results = cursor.fetchall()
|
||||
|
||||
# Get available groups
|
||||
cursor.execute("SELECT groupname FROM radgroupcheck")
|
||||
groups = [{'groupname': row['groupname']} for row in cursor.fetchall()]
|
||||
|
||||
cursor.close()
|
||||
db.close()
|
||||
return render_template('user_list.html', results=results, groups=groups)
|
||||
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Database error: {e}")
|
||||
cursor.close()
|
||||
db.close()
|
||||
return "Database error", 500
|
||||
users = get_all_users()
|
||||
groups = get_all_groups()
|
||||
return render_template('user_list.html', users=users, groups=groups)
|
||||
|
||||
|
||||
@user.route('/update_user', methods=['POST'])
|
||||
def update_user():
|
||||
mac_address = request.form['mac_address']
|
||||
description = request.form['description']
|
||||
vlan_id = request.form['vlan_id']
|
||||
new_mac_address = request.form.get('new_mac_address')
|
||||
|
||||
db = get_db()
|
||||
if db:
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
db.autocommit = False
|
||||
|
||||
if new_mac_address and new_mac_address != mac_address:
|
||||
cursor.execute("""
|
||||
UPDATE radcheck
|
||||
SET username = %s, value = %s
|
||||
WHERE username = %s
|
||||
""", (new_mac_address, new_mac_address, mac_address))
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE rad_description
|
||||
SET username = %s, description = %s
|
||||
WHERE username = %s
|
||||
""", (new_mac_address, description, mac_address))
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE radusergroup
|
||||
SET username = %s, groupname = %s
|
||||
WHERE username = %s
|
||||
""", (new_mac_address, vlan_id, mac_address))
|
||||
else:
|
||||
cursor.execute("""
|
||||
UPDATE rad_description
|
||||
SET description = %s
|
||||
WHERE username = %s
|
||||
""", (description, mac_address))
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE radusergroup
|
||||
SET groupname = %s
|
||||
WHERE username = %s
|
||||
""", (vlan_id, mac_address))
|
||||
|
||||
db.commit()
|
||||
db.autocommit = True
|
||||
cursor.close()
|
||||
return "success"
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
db.autocommit = True
|
||||
cursor.close()
|
||||
return str(e)
|
||||
finally:
|
||||
db.close()
|
||||
return "Database Connection Failed"
|
||||
@user.route('/add', methods=['POST'])
|
||||
def add():
|
||||
mac = request.form['mac_address']
|
||||
desc = request.form.get('description', '')
|
||||
group_id = request.form['group_id']
|
||||
add_user(mac, desc, group_id)
|
||||
return redirect(url_for('user.user_list'))
|
||||
|
||||
|
||||
@user.route('/delete_user/<mac_address>')
|
||||
def delete_user(mac_address):
|
||||
db = get_db()
|
||||
if db:
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
db.autocommit = False
|
||||
cursor.execute("DELETE FROM rad_description WHERE username = %s", (mac_address,))
|
||||
cursor.execute("DELETE FROM radcheck WHERE username = %s", (mac_address,))
|
||||
cursor.execute("DELETE FROM radusergroup WHERE username = %s", (mac_address,))
|
||||
db.commit()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return redirect(url_for('user.user_list'))
|
||||
except mysql.connector.Error as err:
|
||||
print(f"Database Error: {err}")
|
||||
db.rollback()
|
||||
cursor.close()
|
||||
db.close()
|
||||
return redirect(url_for('user.user_list'))
|
||||
return "Database Connection Failed"
|
||||
@user.route('/update_description', methods=['POST'])
|
||||
def update_description_route():
|
||||
mac = request.form['mac_address']
|
||||
desc = request.form.get('description', '')
|
||||
update_description(mac, desc)
|
||||
return redirect(url_for('user.user_list'))
|
||||
|
||||
|
||||
@user.route('/add_user', methods=['POST'])
|
||||
def add_user():
|
||||
try:
|
||||
data = request.get_json()
|
||||
mac_address = data.get('mac_address')
|
||||
description = data.get('description')
|
||||
vlan_id = data.get('vlan_id')
|
||||
@user.route('/update_vlan', methods=['POST'])
|
||||
def update_vlan_route():
|
||||
mac = request.form['mac_address']
|
||||
group_id = request.form['group_id']
|
||||
update_vlan(mac, group_id)
|
||||
return redirect(url_for('user.user_list'))
|
||||
|
||||
if not mac_address:
|
||||
return jsonify({'success': False, 'message': 'MAC Address is required'}), 400
|
||||
|
||||
db = get_db()
|
||||
if db is None:
|
||||
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
|
||||
@user.route('/delete', methods=['POST'])
|
||||
def delete():
|
||||
mac = request.form['mac_address']
|
||||
delete_user(mac)
|
||||
return redirect(url_for('user.user_list'))
|
||||
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
db.autocommit = False
|
||||
|
||||
cursor.execute("SELECT username FROM radcheck WHERE username = %s", (mac_address,))
|
||||
if cursor.fetchone():
|
||||
return jsonify({'success': False, 'message': 'User already exists'}), 400
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO radcheck (username, attribute, op, value)
|
||||
VALUES (%s, 'Cleartext-Password', ':=', %s)
|
||||
""", (mac_address, mac_address))
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO rad_description (username, description)
|
||||
VALUES (%s, %s)
|
||||
""", (mac_address, description))
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO radusergroup (username, groupname)
|
||||
VALUES (%s, %s)
|
||||
""", (mac_address, vlan_id))
|
||||
|
||||
db.commit()
|
||||
db.autocommit = True
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'success': True, 'message': 'User added successfully'})
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
db.autocommit = True
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({'success': False, 'message': str(e)}), 500
|
||||
except Exception:
|
||||
return jsonify({'success': False, 'message': 'Unknown error'}), 500
|
||||
|
||||
@user.route('/add_from_reject', methods=['POST'])
|
||||
def add_from_reject():
|
||||
username = request.form.get('username')
|
||||
groupname = request.form.get('groupname')
|
||||
|
||||
if not username or not groupname:
|
||||
flash("Missing MAC address or group", "error")
|
||||
return redirect(url_for('index.stats'))
|
||||
|
||||
db = get_db()
|
||||
cursor = db.cursor()
|
||||
try:
|
||||
db.autocommit = False
|
||||
|
||||
# Check if already exists
|
||||
cursor.execute("SELECT username FROM radcheck WHERE username = %s", (username,))
|
||||
if cursor.fetchone():
|
||||
flash(f"{username} already exists", "info")
|
||||
else:
|
||||
cursor.execute("""
|
||||
INSERT INTO radcheck (username, attribute, op, value)
|
||||
VALUES (%s, 'Cleartext-Password', ':=', %s)
|
||||
""", (username, username))
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO rad_description (username, description)
|
||||
VALUES (%s, '')
|
||||
""", (username,))
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO radusergroup (username, groupname)
|
||||
VALUES (%s, %s)
|
||||
""", (username, groupname))
|
||||
|
||||
db.commit()
|
||||
flash(f"{username} added to group {groupname}", "success")
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
flash(f"Error: {str(e)}", "error")
|
||||
finally:
|
||||
db.autocommit = True
|
||||
cursor.close()
|
||||
db.close()
|
||||
|
||||
return redirect(url_for('index.stats'))
|
||||
|
||||
@user.route('/refresh_vendors', methods=['POST'])
|
||||
def refresh_vendors():
|
||||
db = get_db()
|
||||
cursor = db.cursor(dictionary=True)
|
||||
|
||||
api_url = os.getenv('OUI_API_API_URL', 'https://api.maclookup.app/v2/macs/{}').strip('"')
|
||||
api_key = os.getenv('OUI_API_API_KEY', '').strip('"')
|
||||
limit = int(os.getenv('OUI_API_RATE_LIMIT', 2))
|
||||
headers = {'Authorization': f'Bearer {api_key}'} if api_key else {}
|
||||
|
||||
cursor.execute("""
|
||||
SELECT r.username
|
||||
FROM radcheck r
|
||||
LEFT JOIN mac_vendor_cache m ON UPPER(REPLACE(REPLACE(r.username, ':', ''), '-', '')) LIKE CONCAT(m.mac_prefix, '%')
|
||||
WHERE m.vendor_name IS NULL OR m.vendor_name = 'Unknown Vendor'
|
||||
LIMIT 5
|
||||
""")
|
||||
entries = cursor.fetchall()
|
||||
|
||||
if not entries:
|
||||
cursor.close()
|
||||
db.close()
|
||||
return jsonify({"success": True, "updated": 0, "remaining": False})
|
||||
|
||||
updated = 0
|
||||
for entry in entries:
|
||||
mac = entry['username']
|
||||
prefix = mac.replace(':', '').replace('-', '').upper()[:6]
|
||||
|
||||
try:
|
||||
r = requests.get(api_url.format(mac), headers=headers, timeout=3)
|
||||
if r.status_code == 200:
|
||||
data = r.json()
|
||||
vendor = data.get("company", "not found")
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO mac_vendor_cache (mac_prefix, vendor_name, last_updated)
|
||||
VALUES (%s, %s, NOW())
|
||||
ON DUPLICATE KEY UPDATE vendor_name = VALUES(vendor_name), last_updated = NOW()
|
||||
""", (prefix, vendor))
|
||||
db.commit()
|
||||
updated += 1
|
||||
except Exception as e:
|
||||
print(f"Error for {mac}: {e}")
|
||||
|
||||
time.sleep(1 / limit)
|
||||
|
||||
cursor.close()
|
||||
db.close()
|
||||
|
||||
return jsonify({"success": True, "updated": updated, "remaining": True})
|
||||
def refresh():
|
||||
refresh_vendors()
|
||||
return {'status': 'OK'}
|
||||
|
||||
Reference in New Issue
Block a user