snauwcounter/app/routes.py
Michael Trip b56e866071
All checks were successful
Build and Push Image / build-and-push (push) Successful in 1m26s
initial commit
2026-01-09 21:58:53 +01:00

447 lines
15 KiB
Python

from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, session
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.security import check_password_hash
from sqlalchemy import func, desc
import uuid
import json
from app import db
from app.models import User, Incident
bp = Blueprint('main', __name__)
@bp.route('/health')
def health_check():
"""Health check endpoint voor Kubernetes"""
try:
# Check database connectivity
db.session.execute('SELECT 1')
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'database': 'connected'
}), 200
except Exception as e:
return jsonify({
'status': 'unhealthy',
'timestamp': datetime.utcnow().isoformat(),
'database': 'disconnected',
'error': str(e)
}), 503
@bp.route('/')
def index():
"""Homepage met incident invoer"""
return render_template('index.html')
@bp.route('/add-incident', methods=['GET', 'POST'])
def add_incident():
"""Incident toevoegen (voor geregistreerde gebruikers)"""
if request.method == 'POST':
try:
severity_str = request.form.get('severity')
notes = request.form.get('notes', '').strip()
incident_time = request.form.get('incident_time')
print(f"DEBUG add_incident - Severity: {severity_str}, Time: {incident_time}")
# Validatie
if not severity_str:
flash('Selecteer een Snauw-index', 'error')
return redirect(url_for('main.add_incident'))
severity = int(severity_str)
if not (1 <= severity <= 5):
flash('Snauw-index moet tussen 1 en 5 zijn', 'error')
return redirect(url_for('main.add_incident'))
# Parse timestamp
if incident_time:
try:
# Handle different datetime formats
if 'T' in incident_time:
timestamp = datetime.fromisoformat(incident_time.replace('Z', '+00:00'))
else:
timestamp = datetime.strptime(incident_time, '%Y-%m-%d %H:%M:%S')
except ValueError as dt_error:
print(f"DEBUG DateTime parse error: {dt_error}")
timestamp = datetime.utcnow()
else:
timestamp = datetime.utcnow()
# Maak incident
incident = Incident(
user_id=current_user.id if current_user.is_authenticated else None,
timestamp=timestamp,
severity=severity,
notes=notes if notes else None,
session_id=get_or_create_session_id() if not current_user.is_authenticated else None
)
db.session.add(incident)
db.session.commit()
flash('✅ Incident succesvol opgeslagen!', 'success')
return redirect(url_for('main.statistics'))
except ValueError as e:
print(f"DEBUG ValueError in add_incident: {e}")
flash('Ongeldige invoer. Controleer je gegevens.', 'error')
except Exception as e:
print(f"DEBUG Exception in add_incident: {e}")
flash('Er is een fout opgetreden bij het opslaan.', 'error')
db.session.rollback()
return render_template('add_incident.html')
@bp.route('/api/incidents', methods=['POST'])
def api_add_incident():
"""API endpoint voor het toevoegen van incidenten (voor AJAX)"""
try:
data = request.get_json()
severity = int(data.get('severity'))
notes = data.get('notes', '').strip()
timestamp_str = data.get('timestamp')
if not (1 <= severity <= 5):
return jsonify({'error': 'Snauw-index moet tussen 1 en 5 zijn'}), 400
# Parse timestamp
if timestamp_str:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
else:
timestamp = datetime.utcnow()
# Maak incident
incident = Incident(
user_id=current_user.id if current_user.is_authenticated else None,
timestamp=timestamp,
severity=severity,
notes=notes if notes else None,
session_id=get_or_create_session_id() if not current_user.is_authenticated else None
)
db.session.add(incident)
db.session.commit()
return jsonify({
'success': True,
'message': 'Incident succesvol opgeslagen!',
'incident': incident.to_dict()
})
except ValueError:
return jsonify({'error': 'Ongeldige invoer'}), 400
except Exception as e:
db.session.rollback()
return jsonify({'error': 'Er is een fout opgetreden'}), 500
@bp.route('/statistics')
def statistics():
"""Statistieken pagina"""
# Voor ingelogde gebruikers
if current_user.is_authenticated:
incidents = Incident.query.filter_by(user_id=current_user.id).order_by(desc(Incident.timestamp)).all()
else:
# Voor anonieme gebruikers - via session_id
session_id = get_or_create_session_id()
incidents = Incident.query.filter_by(session_id=session_id).order_by(desc(Incident.timestamp)).all()
# Bereken statistieken
stats = calculate_statistics(incidents)
return render_template('statistics.html', incidents=incidents, stats=stats)
@bp.route('/api/statistics')
def api_statistics():
"""API endpoint voor statistieken (voor AJAX updates)"""
if current_user.is_authenticated:
incidents = Incident.query.filter_by(user_id=current_user.id).all()
else:
session_id = get_or_create_session_id()
incidents = Incident.query.filter_by(session_id=session_id).all()
stats = calculate_statistics(incidents)
return jsonify({
'stats': stats,
'incidents': [incident.to_dict() for incident in incidents]
})
@bp.route('/register', methods=['GET', 'POST'])
def register():
"""Gebruiker registratie"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
# Debug informatie
print(f"DEBUG Register - Email: {email}, Password len: {len(password) if password else 0}")
# Validatie
if not email or '@' not in email or '.' not in email:
flash('Voer een geldig e-mailadres in', 'error')
return render_template('register.html')
if not password or len(password) < 6:
flash('Wachtwoord moet minstens 6 karakters lang zijn', 'error')
return render_template('register.html')
if password != password_confirm:
flash('Wachtwoorden komen niet overeen', 'error')
return render_template('register.html')
# Check of gebruiker al bestaat
if User.query.filter_by(email=email).first():
flash('Er bestaat al een account met dit e-mailadres', 'error')
return render_template('register.html')
try:
# Maak nieuwe gebruiker
user = User(email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
# Probeer data migratie van anonieme gebruiker
migrate_anonymous_data(user)
# Log in de gebruiker
login_user(user)
flash('✅ Account succesvol aangemaakt en ingelogd!', 'success')
return redirect(url_for('main.index'))
except Exception as e:
print(f"DEBUG Register error: {e}")
db.session.rollback()
flash('Er is een fout opgetreden bij het maken van je account', 'error')
return render_template('register.html')
return render_template('register.html')
@bp.route('/login', methods=['GET', 'POST'])
def login():
"""Gebruiker login"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
password = request.form.get('password')
if not email or not password:
flash('Voer e-mail en wachtwoord in', 'error')
return render_template('login.html')
user = User.query.filter_by(email=email).first()
if user and user.check_password(password):
login_user(user)
# Probeer data migratie van anonieme gebruiker
migrate_anonymous_data(user)
flash('✅ Succesvol ingelogd!', 'success')
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('main.index'))
else:
flash('Ongeldig e-mailadres of wachtwoord', 'error')
return render_template('login.html')
@bp.route('/logout')
@login_required
def logout():
"""Gebruiker logout"""
logout_user()
flash('Je bent uitgelogd', 'info')
return redirect(url_for('main.index'))
@bp.route('/migrate-data', methods=['POST'])
def migrate_data():
"""Data migratie van anonieme naar geregistreerde gebruiker"""
if not current_user.is_authenticated:
return jsonify({'error': 'Inloggen vereist'}), 401
try:
data = request.get_json()
anonymous_incidents = data.get('incidents', [])
migrated_count = 0
for incident_data in anonymous_incidents:
# Controleer of incident al bestaat (op basis van timestamp en severity)
timestamp = datetime.fromisoformat(incident_data['timestamp'].replace('Z', '+00:00'))
existing = Incident.query.filter_by(
user_id=current_user.id,
timestamp=timestamp,
severity=incident_data['severity']
).first()
if not existing:
incident = Incident(
user_id=current_user.id,
timestamp=timestamp,
severity=incident_data['severity'],
notes=incident_data.get('notes')
)
db.session.add(incident)
migrated_count += 1
db.session.commit()
return jsonify({
'success': True,
'migrated_count': migrated_count,
'message': f'{migrated_count} incidenten succesvol gemigreerd!'
})
except Exception as e:
db.session.rollback()
return jsonify({'error': 'Fout bij data migratie'}), 500
@bp.route('/export-data')
def export_data():
"""GDPR data export"""
if current_user.is_authenticated:
incidents = Incident.query.filter_by(user_id=current_user.id).all()
data = {
'user': {
'email': current_user.email,
'created_at': current_user.created_at.isoformat()
},
'incidents': [incident.to_dict() for incident in incidents],
'export_date': datetime.utcnow().isoformat(),
'total_incidents': len(incidents)
}
else:
session_id = get_or_create_session_id()
incidents = Incident.query.filter_by(session_id=session_id).all()
data = {
'session_id': session_id,
'incidents': [incident.to_dict() for incident in incidents],
'export_date': datetime.utcnow().isoformat(),
'total_incidents': len(incidents),
'note': 'Anonieme gebruiker data'
}
return jsonify(data)
@bp.route('/delete-account', methods=['POST'])
@login_required
def delete_account():
"""GDPR account verwijdering"""
if request.form.get('confirm') != 'DELETE':
flash('Type "DELETE" om je account te bevestigen', 'error')
return redirect(url_for('main.statistics'))
try:
# Verwijder alle incidenten van de gebruiker
Incident.query.filter_by(user_id=current_user.id).delete()
# Verwijder de gebruiker
user_email = current_user.email
db.session.delete(current_user)
db.session.commit()
logout_user()
flash(f'Account {user_email} en alle gegevens zijn permanent verwijderd', 'info')
return redirect(url_for('main.index'))
except Exception as e:
db.session.rollback()
flash('Fout bij het verwijderen van account', 'error')
return redirect(url_for('main.statistics'))
@bp.route('/privacy')
def privacy():
"""Privacy beleid pagina"""
return render_template('privacy.html')
# Helper functions
def get_or_create_session_id():
"""Haal session ID op of maak een nieuwe voor anonieme gebruikers"""
if 'session_id' not in session:
session['session_id'] = str(uuid.uuid4())
return session['session_id']
def migrate_anonymous_data(user):
"""Probeer data van anonieme gebruiker naar geregistreerde gebruiker te migreren"""
if 'session_id' in session:
session_id = session['session_id']
# Vind incidenten van anonieme sessie
anonymous_incidents = Incident.query.filter_by(session_id=session_id).all()
if anonymous_incidents:
# Migreer naar gebruiker
for incident in anonymous_incidents:
incident.user_id = user.id
incident.session_id = None
db.session.commit()
flash(f'{len(anonymous_incidents)} incidenten zijn gemigreerd naar je account!', 'success')
def calculate_statistics(incidents):
"""Bereken statistieken voor incidents"""
if not incidents:
return {
'total_incidents': 0,
'avg_severity': 0,
'incidents_today': 0,
'incidents_this_week': 0,
'incidents_this_month': 0,
'most_severe': 0,
'trend_data': []
}
now = datetime.utcnow()
today = now.date()
week_ago = now - timedelta(days=7)
month_ago = now - timedelta(days=30)
# Basis statistieken
total_incidents = len(incidents)
severities = [inc.severity for inc in incidents]
avg_severity = sum(severities) / len(severities) if severities else 0
most_severe = max(severities) if severities else 0
# Tijd-gebaseerde counts
incidents_today = len([inc for inc in incidents if inc.timestamp.date() == today])
incidents_this_week = len([inc for inc in incidents if inc.timestamp >= week_ago])
incidents_this_month = len([inc for inc in incidents if inc.timestamp >= month_ago])
# Trend data (laatste 7 dagen)
trend_data = []
for i in range(7):
date = now - timedelta(days=i)
day_incidents = [inc for inc in incidents if inc.timestamp.date() == date.date()]
trend_data.append({
'date': date.strftime('%m/%d'),
'count': len(day_incidents),
'avg_severity': sum(inc.severity for inc in day_incidents) / len(day_incidents) if day_incidents else 0
})
trend_data.reverse() # Chronologische volgorde
return {
'total_incidents': total_incidents,
'avg_severity': round(avg_severity, 1),
'incidents_today': incidents_today,
'incidents_this_week': incidents_this_week,
'incidents_this_month': incidents_this_month,
'most_severe': most_severe,
'trend_data': trend_data
}