From 8a752b2b923068f514465c08fb8579d11ba63774 Mon Sep 17 00:00:00 2001 From: Viajero <2737079298@qq.com> Date: Tue, 14 Oct 2025 18:00:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E2=80=9C=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BC=96=E8=BE=91=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ESConnect.py | 1 + app.py | 397 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 395 insertions(+), 3 deletions(-) diff --git a/ESConnect.py b/ESConnect.py index bebfc5e..38303cd 100644 --- a/ESConnect.py +++ b/ESConnect.py @@ -25,6 +25,7 @@ def create_index_with_mapping(): "mappings": { "properties": { "writer_id":{"type": "text"}, + "data": { "type": "text", # 存储转换后的字符串,支持分词搜索 "analyzer": "ik_max_word", diff --git a/app.py b/app.py index d72d18d..e386623 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,13 @@ import base64 -from flask import Flask, request, render_template, redirect, url_for, jsonify +from flask import Flask, request, render_template, redirect, url_for, jsonify, session, flash, send_from_directory +from werkzeug.utils import secure_filename import os import uuid from PIL import Image import re import json +import requests +from functools import wraps from ESConnect import * from json_converter import json_to_string, string_to_json from openai import OpenAI @@ -14,7 +17,45 @@ from openai import OpenAI app = Flask(__name__) # app.config.from_object(config.Config) +# 设置会话密钥,用于加密会话数据 +app.secret_key = 'your-secret-key-change-this-in-production' # OCR和信息提取函数,使用大模型API处理图片并提取结构化信息 +# 权限装饰器 +def login_required(f): + """要求用户登录的装饰器""" + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + flash('请先登录', 'error') + return redirect(url_for('login')) + return f(*args, **kwargs) + return decorated_function +def admin_required(f): + """要求管理员权限的装饰器""" + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + flash('请先登录', 'error') + return redirect(url_for('login')) + if session.get('permission', 1) != 0: + flash('权限不足,需要管理员权限', 'error') + return redirect(url_for('index')) + return f(*args, **kwargs) + return decorated_function +def user_or_admin_required(f): + """要求普通用户或管理员权限的装饰器""" + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + flash('请先登录', 'error') + return redirect(url_for('login')) + permission = session.get('permission', 1) + if permission not in [0, 1]: + flash('权限不足', 'error') + return redirect(url_for('index')) + return f(*args, **kwargs) + return decorated_function + def ocr_and_extract_info(image_path): """ 使用大模型API进行OCR识别并提取图片中的结构化信息 @@ -138,8 +179,258 @@ def ocr_and_extract_info(image_path): """ +# 登录页面路由 +@app.route('/login', methods=['GET', 'POST']) +def login(): + """ + 处理用户登录 + + GET: 显示登录页面 + POST: 处理登录表单提交 + """ + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + + if not username or not password: + flash('请输入用户名和密码', 'error') + return render_template('login.html') + + # 验证用户 + user_data = verify_user(username, password) + if user_data: + # 登录成功,设置会话 + session['user_id'] = user_data['user_id'] + session['username'] = user_data['username'] + session['permission'] = user_data['premission'] + flash(f'欢迎回来,{username}!', 'success') + return redirect(url_for('index')) + else: + flash('用户名或密码错误', 'error') + return render_template('login.html') + + return render_template('login.html') + + +# 登出路由 +@app.route('/logout') +def logout(): + """ + 处理用户登出 + """ + session.clear() + flash('已成功登出', 'info') + return redirect(url_for('login')) + + +# 用户管理页面路由 +@app.route('/user_management') +@admin_required +def user_management(): + """ + 显示用户管理页面(仅管理员可访问) + """ + users = get_all_users() + return render_template('user_management.html', users=users) + + +# 注册新用户路由 +@app.route('/register', methods=['GET', 'POST']) +@admin_required +def register(): + """ + 注册新用户(仅管理员可访问) + + GET: 显示注册页面 + POST: 处理注册表单提交 + """ + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + confirm_password = request.form.get('confirm_password') + permission = int(request.form.get('permission', 1)) + + # 验证输入 + if not username or not password: + flash('请输入用户名和密码', 'error') + return render_template('register.html') + + if password != confirm_password: + flash('两次输入的密码不一致', 'error') + return render_template('register.html') + + if len(password) < 6: + flash('密码长度至少6位', 'error') + return render_template('register.html') + + # 检查用户名是否已存在 + existing_user = get_user_by_username(username) + if existing_user: + flash('用户名已存在', 'error') + return render_template('register.html') + + # 创建新用户 + success = create_user(username, password, permission) + if success: + flash(f'用户 {username} 创建成功', 'success') + return redirect(url_for('user_management')) + else: + flash('创建用户失败', 'error') + return render_template('register.html') + + return render_template('register.html') + + +# 修改用户密码路由 +@app.route('/change_password/', methods=['POST']) +@admin_required +def change_password(username): + """ + 修改用户密码(仅管理员可访问) + """ + new_password = request.form.get('new_password') + confirm_password = request.form.get('confirm_password') + + if not new_password or not confirm_password: + flash('请输入新密码', 'error') + return redirect(url_for('user_management')) + + if new_password != confirm_password: + flash('两次输入的密码不一致', 'error') + return redirect(url_for('user_management')) + + if len(new_password) < 6: + flash('密码长度至少6位', 'error') + return redirect(url_for('user_management')) + + success = update_user_password(username, new_password) + if success: + flash(f'用户 {username} 密码修改成功', 'success') + else: + flash(f'修改用户 {username} 密码失败', 'error') + + return redirect(url_for('user_management')) + + +# 修改用户权限路由 +@app.route('/change_permission/', methods=['POST']) +@admin_required +def change_permission(username): + """ + 修改用户权限(仅管理员可访问) + """ + new_permission = int(request.form.get('permission', 1)) + + success = update_user_permission(username, new_permission) + if success: + flash(f'用户 {username} 权限修改成功', 'success') + else: + flash(f'修改用户 {username} 权限失败', 'error') + + return redirect(url_for('user_management')) + + +# 删除用户路由 +@app.route('/delete_user/', methods=['POST']) +@admin_required +def delete_user_route(username): + """ + 删除用户(仅管理员可访问) + """ + success = delete_user(username) + if success: + flash(f'用户 {username} 删除成功', 'success') + else: + flash(f'删除用户 {username} 失败', 'error') + + return redirect(url_for('user_management')) + + +# 个人设置页面路由 +@app.route('/profile') +@login_required +def profile(): + """ + 显示个人设置页面 + """ + return render_template('profile.html') + + +# 修改个人密码路由 +@app.route('/change_own_password', methods=['POST']) +@login_required +def change_own_password(): + """ + 用户修改自己的密码 + """ + old_password = request.form.get('old_password') + new_password = request.form.get('new_password') + confirm_password = request.form.get('confirm_password') + + # 验证输入 + if not old_password or not new_password or not confirm_password: + flash('请填写所有密码字段', 'error') + return redirect(url_for('profile')) + + if new_password != confirm_password: + flash('两次输入的新密码不一致', 'error') + return redirect(url_for('profile')) + + if len(new_password) < 6: + flash('新密码长度至少6位', 'error') + return redirect(url_for('profile')) + + # 调用修改密码函数 + success = update_user_own_password(session['user_id'], old_password, new_password) + if success: + flash('密码修改成功', 'success') + else: + flash('密码修改失败,请检查旧密码是否正确', 'error') + + return redirect(url_for('profile')) + + +# 个人数据页面路由 +@app.route('/my_data') +@login_required +def my_data(): + """ + 显示用户自己的数据 + """ + user_id = session['user_id'] + keyword = request.args.get('keyword', '') + + # 查询用户自己的数据 + if keyword: + data = search_data_by_user(user_id, keyword) + else: + data = search_data_by_user(user_id) + + # 将data字段从字符串转换回JSON格式以便显示 + processed_data = [] + for item in data: + if 'data' in item and item['data']: + try: + # 将data字段的字符串转换回JSON + original_data = string_to_json(item['data']) + # 合并原始数据和其他字段 + display_item = { + '_id': item['_id'], + 'image': item.get('image', ''), + **original_data # 展开原始数据字段 + } + processed_data.append(display_item) + except Exception as e: + # 如果转换失败,保持原始格式 + processed_data.append(item) + else: + processed_data.append(item) + + return render_template('my_data.html', data=processed_data, keyword=keyword) + # 首页路由 @app.route('/') +@login_required def index(): """ 渲染首页模板 @@ -151,6 +442,7 @@ def index(): # 图片上传路由 @app.route('/upload', methods=['POST']) +@user_or_admin_required def upload_image(): """ 处理图片上传请求,调用OCR识别但不存储结果 @@ -234,6 +526,7 @@ def confirm_data(): # 搜索路由 @app.route('/search') +@user_or_admin_required def search(): """ 处理搜索请求,从Elasticsearch中检索匹配的数据 @@ -273,6 +566,7 @@ def search(): # 结果页面路由 @app.route('/results') +@user_or_admin_required def results_page(): """ 渲染搜索结果页面 @@ -284,6 +578,7 @@ def results_page(): # 显示所有数据路由 @app.route('/all') +@admin_required def show_all(): """ 获取所有数据并渲染到页面 @@ -331,22 +626,118 @@ def serve_image(filename): # 删除数据路由 @app.route('/delete/', methods=['POST']) +@login_required def delete_entry(doc_id): """ 根据文档ID删除数据 - + 参数: doc_id (str): 要删除的文档ID - + 返回: 重定向到所有数据页面或错误信息 """ + user_id = session['user_id'] + user_permission = session.get('permission', 1) + + # 管理员可以删除所有数据,普通用户只能删除自己的数据 + if user_permission == 0: # 管理员 + success = delete_by_id(doc_id) + redirect_url = 'show_all' + else: # 普通用户 + success = delete_data_by_id(doc_id, user_id) + redirect_url = 'my_data' + + if success: + return redirect(url_for(redirect_url)) if delete_by_id(doc_id): return redirect(url_for('show_all')) else: return "删除失败", 500 +@app.route('/edit/', methods=['GET', 'POST']) +@login_required +def edit_entry(doc_id): + """ + 编辑数据条目(用户只能编辑自己的数据) + """ + if request.method == 'GET': + # 获取要编辑的数据 + try: + # 先获取文档检查权限 + response = requests.get( + f"{ES_URL}/{data_index_name}/_doc/{doc_id}", + auth=AUTH + ) + response.raise_for_status() + doc = response.json() + + if not doc.get("found"): + flash('数据不存在', 'error') + return redirect(url_for('my_data')) + + # 检查权限 + user_id = session['user_id'] + user_permission = session.get('permission', 1) + doc_user_id = doc["_source"].get("user_id") + + # 管理员可以编辑所有数据,普通用户只能编辑自己的数据 + if user_permission != 0 and doc_user_id != user_id: + flash('您无权编辑此数据', 'error') + return redirect(url_for('my_data')) + + # 解析数据 + data_str = doc["_source"].get("data", "{}") + original_data = string_to_json(data_str) + + edit_data = { + '_id': doc_id, + 'image': doc["_source"].get('image', ''), + **original_data + } + + return render_template('edit.html', data=edit_data) + + except Exception as e: + flash('获取数据失败', 'error') + return redirect(url_for('my_data')) + + else: # POST 请求 - 保存编辑 + try: + # 获取编辑后的数据 + edited_data = {} + for key, value in request.form.items(): + if key != '_id' and key != 'image': + edited_data[key] = value + + # 转换为字符串格式 + data_string = json_to_string(edited_data) + + # 构造更新数据 + updated_data = { + "data": data_string, + "image": request.form.get('image', ''), + "user_id": session['user_id'] + } + + # 更新数据 + success = update_data_by_id(doc_id, updated_data, session['user_id']) + + if success: + flash('数据更新成功', 'success') + else: + flash('数据更新失败', 'error') + + # 根据用户权限重定向 + if session.get('permission', 1) == 0: + return redirect(url_for('show_all')) + else: + return redirect(url_for('my_data')) + + except Exception as e: + flash('保存数据失败', 'error') + return redirect(url_for('my_data')) # 主程序入口 if __name__ == '__main__':