from flask import Flask, render_template, request, redirect, url_for, jsonify import mysql.connector import json app = Flask(__name__) DB_CONFIG = { 'host': '192.168.60.150', 'user': 'user_92z0Kj', 'password': '5B3UXZV8vyrB', 'database': 'radius_NIaIuT' } def get_db(): try: db = mysql.connector.connect(**DB_CONFIG) return db except mysql.connector.Error as err: print(f"Database Connection Error: {err}") return None @app.route('/', methods=['GET', 'POST']) def index(): sql_results = None sql_error = None total_users = 0 total_groups = 0 db = get_db() if db: cursor = db.cursor(dictionary=True) try: cursor.execute("SELECT COUNT(DISTINCT username) as total FROM radcheck;") total_users = cursor.fetchone()['total'] cursor.execute("SELECT COUNT(DISTINCT groupname) as total FROM radgroupreply;") total_groups = cursor.fetchone()['total'] except mysql.connector.Error as err: print(f"Error fetching counts: {err}") cursor.close() db.close() return render_template('index.html', total_users=total_users, total_groups=total_groups, sql_results=sql_results, sql_error=sql_error) @app.route('/sql', methods=['POST']) def sql(): sql_results = None sql_error = None sql_query = request.form['query'] db = get_db() if db: try: cursor = db.cursor(dictionary=True) cursor.execute(sql_query) sql_results = cursor.fetchall() cursor.close() db.close() except mysql.connector.Error as err: sql_error = str(err) except Exception as e: sql_error = str(e) total_users = 0 total_groups = 0 db = get_db() if db: cursor = db.cursor(dictionary=True) try: cursor.execute("SELECT COUNT(DISTINCT username) as total FROM radcheck;") total_users = cursor.fetchone()['total'] cursor.execute("SELECT COUNT(DISTINCT groupname) as total FROM radgroupreply;") total_groups = cursor.fetchone()['total'] except mysql.connector.Error as err: print(f"Error fetching counts: {err}") cursor.close() db.close() return render_template('index.html', total_users=total_users, total_groups=total_groups, sql_results=sql_results, sql_error=sql_error) @app.route('/user_list') def user_list(): db = get_db() if db is None: return "Database connection failed", 500 cursor = db.cursor(dictionary=True) try: cursor.execute(""" SELECT r.username AS mac_address, rd.description AS description, ug.groupname AS vlan_id FROM radcheck r LEFT JOIN radusergroup ug ON r.username = ug.username LEFT JOIN rad_description rd ON r.username = rd.username """) results = cursor.fetchall() print("Results:", results) cursor.execute("SELECT groupname FROM radgroupcheck") groups = cursor.fetchall() groups = [{'groupname': row['groupname']} for row in groups] print("Groups:", groups) cursor.close() db.close() return render_template('user_list_inline_edit.html', results=results, groups=groups) except mysql.connector.Error as e: print(f"Database error: {e}") cursor.close() db.close() return "Database error", 500 @app.route('/update_user', methods=['POST']) def update_user(): mac_address = request.form['mac_address'] description = request.form['description'] vlan_id = request.form['vlan_id'] new_mac_address = request.form.get('new_mac_address') print(f"Update request received: mac_address={mac_address}, description={description}, vlan_id={vlan_id}, new_mac_address={new_mac_address}") db = get_db() if db: cursor = db.cursor() try: db.autocommit = False if new_mac_address and new_mac_address != mac_address: print("Updating MAC address...") # Update radcheck cursor.execute(""" UPDATE radcheck SET username = %s, value = %s WHERE username = %s """, (new_mac_address, new_mac_address, mac_address)) print(f"radcheck update affected {cursor.rowcount} rows.") # Update rad_description cursor.execute(""" UPDATE rad_description SET username = %s, description = %s WHERE username = %s """, (new_mac_address, description, mac_address)) print(f"rad_description update affected {cursor.rowcount} rows.") # Update radusergroup cursor.execute(""" UPDATE radusergroup SET username = %s, groupname = %s WHERE username = %s """, (new_mac_address, vlan_id, mac_address)) print(f"radusergroup update affected {cursor.rowcount} rows.") mac_address = new_mac_address else: print("Updating description and VLAN...") # Update rad_description cursor.execute(""" UPDATE rad_description SET description = %s WHERE username = %s """, (description, mac_address)) print(f"rad_description update affected {cursor.rowcount} rows.") # Update radusergroup cursor.execute(""" UPDATE radusergroup SET groupname = %s WHERE username = %s """, (vlan_id, mac_address)) print(f"radusergroup update affected {cursor.rowcount} rows.") if cursor.rowcount > 0: print("Database rows were modified.") else: print("Database rows were not modified.") db.commit() db.autocommit = True cursor.close() print("Update successful") return "success" except mysql.connector.Error as err: db.rollback() db.autocommit = True cursor.close() print(f"Database Error: {err}") return str(err) except Exception as e: db.rollback() db.autocommit = True cursor.close() print(f"Exception: {e}") return str(e) finally: db.close() else: print("Database Connection Failed") return "Database Connection Failed" @app.route('/delete_user/') def delete_user(mac_address): db = get_db() if db: cursor = db.cursor() try: db.autocommit = False cursor.execute("DELETE FROM rad_description WHERE username = %s", (mac_address,)) cursor.execute("DELETE FROM radcheck WHERE username = %s", (mac_address,)) cursor.execute("DELETE FROM radusergroup WHERE username = %s", (mac_address,)) db.commit() db.autocommit = True cursor.close() db.close() return redirect(url_for('user_list')) except mysql.connector.Error as err: print(f"Database Error: {err}") db.rollback() db.autocommit = True cursor.close() db.close() return redirect(url_for('user_list')) return "Database Connection Failed" @app.route('/groups') def groups(): db = get_db() if db: cursor = db.cursor() try: cursor.execute("SELECT DISTINCT groupname FROM radgroupcheck") group_names = [row[0] for row in cursor.fetchall()] grouped_results = {} for groupname in group_names: cursor.execute("SELECT id, attribute, op, value FROM radgroupreply WHERE groupname = %s", (groupname,)) attributes = cursor.fetchall() grouped_results[groupname] = [{'id': row[0], 'attribute': row[1], 'op': row[2], 'value': row[3]} for row in attributes] cursor.close() db.close() return render_template('group_list_nested.html', grouped_results=grouped_results) except mysql.connector.Error as err: print(f"Database Error: {err}") cursor.close() db.close() return render_template('group_list_nested.html', grouped_results={}) return "Database Connection Failed" @app.route('/edit_groupname/', methods=['GET', 'POST']) def edit_groupname(old_groupname): db = get_db() if db: cursor = db.cursor(dictionary=True) if request.method == 'POST': new_groupname = request.form['groupname'] try: db.autocommit = False cursor.execute(""" UPDATE radgroupreply SET groupname = %s WHERE groupname = %s """, (new_groupname, old_groupname)) cursor.execute(""" UPDATE radusergroup SET groupname = %s WHERE groupname = %s """, (new_groupname, old_groupname)) db.commit() db.autocommit = True cursor.close() db.close() return redirect(url_for('groups')) except mysql.connector.Error as err: db.rollback() db.autocommit = True cursor.close() db.close() return f"Database Error: {err}" else: return render_template('edit_groupname.html', old_groupname=old_groupname) return "Database Connection Failed" @app.route('/update_attribute', methods=['POST']) def update_attribute(): group_id = request.form['group_id'] attribute = request.form['attribute'] op = request.form['op'] value = request.form['value'] db = get_db() if db: cursor = db.cursor() try: db.autocommit = False cursor.execute(""" UPDATE radgroupreply SET attribute = %s, op = %s, value = %s WHERE id = %s """, (attribute, op, value, group_id)) db.commit() db.autocommit = True cursor.close() return "success" except mysql.connector.Error as err: db.rollback() db.autocommit = True cursor.close() return str(err) except Exception as e: db.rollback() db.autocommit = True cursor.close() return str(e) finally: db.close() return "Database Connection Failed" @app.route('/add_attribute', methods=['POST']) def add_attribute(): groupname = request.form['groupname'] attribute = request.form['attribute'] op = request.form['op'] value = request.form['value'] db = get_db() if db: cursor = db.cursor() try: cursor.execute(""" INSERT INTO radgroupreply (groupname, attribute, op, value) VALUES (%s, %s, %s, %s) """, (groupname, attribute, op, value)) db.commit() cursor.close() db.close() return "success" except mysql.connector.Error as err: print(f"Database Error: {err}") db.rollback() cursor.close() db.close() return str(err) return "Database Connection Failed" @app.route('/edit_attribute/', methods=['GET', 'POST']) def edit_attribute(group_id): db = get_db() if db: cursor = db.cursor(dictionary=True) if request.method == 'POST': attribute = request.form['attribute'] op = request.form['op'] value = request.form['value'] try: db.autocommit = False cursor.execute(""" UPDATE radgroupreply SET attribute = %s, op = %s, value = %s WHERE id = %s """, (attribute, op, value, group_id)) db.commit() db.autocommit = True cursor.close() db.close() return redirect(url_for('groups')) except mysql.connector.Error as err: db.rollback() db.autocommit = True cursor.close() db.close() return f"Database Error: {err}" else: cursor.execute("SELECT * FROM radgroupreply WHERE id = %s", (group_id,)) attribute_data = cursor.fetchone() cursor.close() db.close() return render_template('edit_attribute.html', attribute_data=attribute_data) return "Database Connection Failed" @app.route('/add_group', methods=['POST']) def add_group(): groupname = request.form['groupname'] db = get_db() if db: cursor = db.cursor() try: cursor.execute("INSERT INTO radgroupreply (groupname, attribute, op, value) VALUES (%s, '', '', '')", (groupname,)) cursor.execute("INSERT INTO radusergroup (groupname, username) VALUES (%s, '')", (groupname,)) cursor.execute("INSERT INTO radgroupcheck (groupname, attribute, op, value) VALUES (%s, 'Auth-Type', ':=', 'Accept')", (groupname,)) db.commit() cursor.close() db.close() return "success" except mysql.connector.Error as err: print(f"Database Error: {err}") db.rollback() cursor.close() db.close() return str(err) return "Database Connection Failed" @app.route('/delete_group_rows/') def delete_group_rows(groupname): db = get_db() if db: cursor = db.cursor() try: cursor.execute("DELETE FROM radgroupreply WHERE groupname = %s", (groupname,)) cursor.execute("DELETE FROM radusergroup WHERE groupname = %s", (groupname,)) cursor.execute("DELETE FROM radgroupcheck WHERE groupname = %s", (groupname,)) db.commit() cursor.close() db.close() return redirect(url_for('groups')) except mysql.connector.Error as err: print(f"Database Error: {err}") db.rollback() cursor.close() db.close() return redirect(url_for('groups')) return "Database Connection Failed" @app.route('/delete_group/') def delete_group(group_id): db = get_db() if db: cursor = db.cursor() try: cursor.execute("DELETE FROM radgroupreply WHERE id = %s", (group_id,)) cursor.execute("DELETE FROM radgroupcheck WHERE id = %s", (group_id,)) db.commit() cursor.close() db.close() return redirect(url_for('groups')) except mysql.connector.Error as err: print(f"Database Error: {err}") db.rollback() cursor.close() db.close() return redirect(url_for('groups')) return "Database Connection Failed" @app.route('/add_user', methods=['POST']) def add_user(): try: data = request.get_json() mac_address = data.get('mac_address') description = data.get('description') vlan_id = data.get('vlan_id') if not mac_address: return jsonify({'success': False, 'message': 'MAC Address is required'}), 400 db = get_db() if db is None: return jsonify({'success': False, 'message': 'Database connection failed'}), 500 cursor = db.cursor() try: db.autocommit = False cursor.execute("SELECT username FROM radcheck WHERE username = %s", (mac_address,)) if cursor.fetchone(): cursor.close() db.close() return jsonify({'success': False, 'message': 'User with this MAC Address already exists'}), 400 cursor.execute(""" INSERT INTO radcheck (username, attribute, op, value) VALUES (%s, 'Cleartext-Password', ':=', %s) """, (mac_address, mac_address)) cursor.execute(""" INSERT INTO rad_description (username, description) VALUES (%s, %s) """, (mac_address, description)) cursor.execute(""" INSERT INTO radusergroup (username, groupname) VALUES (%s, %s) """, (mac_address, vlan_id)) db.commit() db.autocommit = True cursor.close() db.close() return jsonify({'success': True, 'message': 'User added successfully'}) except mysql.connector.Error as err: print(f"Database Error: {err}") db.rollback() db.autocommit = True cursor.close() db.close() return jsonify({'success': False, 'message': f"Database error: {err}"}), 500 except Exception as e: print(f"Error adding user: {e}") db.rollback() db.autocommit = True cursor.close() db.close() return jsonify({'success': False, 'message': str(e)}), 500 finally: db.close() except Exception as e: return jsonify({'success': False, 'message': 'Unknown error'}), 500 if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=8080)