新增“数据编辑”

This commit is contained in:
2025-10-14 18:00:40 +08:00
parent a678adf646
commit 8a752b2b92
2 changed files with 395 additions and 3 deletions

View File

@@ -25,6 +25,7 @@ def create_index_with_mapping():
"mappings": { "mappings": {
"properties": { "properties": {
"writer_id":{"type": "text"}, "writer_id":{"type": "text"},
"data": { "data": {
"type": "text", # 存储转换后的字符串,支持分词搜索 "type": "text", # 存储转换后的字符串,支持分词搜索
"analyzer": "ik_max_word", "analyzer": "ik_max_word",

397
app.py
View File

@@ -1,10 +1,13 @@
import base64 import base64
from flask import Flask, request, render_template, redirect, url_for, jsonify from flask import Flask, request, render_template, redirect, url_for, jsonify, session, flash, send_from_directory
from werkzeug.utils import secure_filename
import os import os
import uuid import uuid
from PIL import Image from PIL import Image
import re import re
import json import json
import requests
from functools import wraps
from ESConnect import * from ESConnect import *
from json_converter import json_to_string, string_to_json from json_converter import json_to_string, string_to_json
from openai import OpenAI from openai import OpenAI
@@ -14,7 +17,45 @@ from openai import OpenAI
app = Flask(__name__) app = Flask(__name__)
# app.config.from_object(config.Config) # app.config.from_object(config.Config)
# 设置会话密钥,用于加密会话数据
app.secret_key = 'your-secret-key-change-this-in-production'
# OCR和信息提取函数使用大模型API处理图片并提取结构化信息 # OCR和信息提取函数使用大模型API处理图片并提取结构化信息
# 权限装饰器
def login_required(f):
"""要求用户登录的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'error')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""要求管理员权限的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'error')
return redirect(url_for('login'))
if session.get('permission', 1) != 0:
flash('权限不足,需要管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
def user_or_admin_required(f):
"""要求普通用户或管理员权限的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'error')
return redirect(url_for('login'))
permission = session.get('permission', 1)
if permission not in [0, 1]:
flash('权限不足', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
def ocr_and_extract_info(image_path): def ocr_and_extract_info(image_path):
""" """
使用大模型API进行OCR识别并提取图片中的结构化信息 使用大模型API进行OCR识别并提取图片中的结构化信息
@@ -138,8 +179,258 @@ def ocr_and_extract_info(image_path):
""" """
# 登录页面路由
@app.route('/login', methods=['GET', 'POST'])
def login():
"""
处理用户登录
GET: 显示登录页面
POST: 处理登录表单提交
"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('login.html')
# 验证用户
user_data = verify_user(username, password)
if user_data:
# 登录成功,设置会话
session['user_id'] = user_data['user_id']
session['username'] = user_data['username']
session['permission'] = user_data['premission']
flash(f'欢迎回来,{username}', 'success')
return redirect(url_for('index'))
else:
flash('用户名或密码错误', 'error')
return render_template('login.html')
return render_template('login.html')
# 登出路由
@app.route('/logout')
def logout():
"""
处理用户登出
"""
session.clear()
flash('已成功登出', 'info')
return redirect(url_for('login'))
# 用户管理页面路由
@app.route('/user_management')
@admin_required
def user_management():
"""
显示用户管理页面(仅管理员可访问)
"""
users = get_all_users()
return render_template('user_management.html', users=users)
# 注册新用户路由
@app.route('/register', methods=['GET', 'POST'])
@admin_required
def register():
"""
注册新用户(仅管理员可访问)
GET: 显示注册页面
POST: 处理注册表单提交
"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
permission = int(request.form.get('permission', 1))
# 验证输入
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('register.html')
if password != confirm_password:
flash('两次输入的密码不一致', 'error')
return render_template('register.html')
if len(password) < 6:
flash('密码长度至少6位', 'error')
return render_template('register.html')
# 检查用户名是否已存在
existing_user = get_user_by_username(username)
if existing_user:
flash('用户名已存在', 'error')
return render_template('register.html')
# 创建新用户
success = create_user(username, password, permission)
if success:
flash(f'用户 {username} 创建成功', 'success')
return redirect(url_for('user_management'))
else:
flash('创建用户失败', 'error')
return render_template('register.html')
return render_template('register.html')
# 修改用户密码路由
@app.route('/change_password/<username>', methods=['POST'])
@admin_required
def change_password(username):
"""
修改用户密码(仅管理员可访问)
"""
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
if not new_password or not confirm_password:
flash('请输入新密码', 'error')
return redirect(url_for('user_management'))
if new_password != confirm_password:
flash('两次输入的密码不一致', 'error')
return redirect(url_for('user_management'))
if len(new_password) < 6:
flash('密码长度至少6位', 'error')
return redirect(url_for('user_management'))
success = update_user_password(username, new_password)
if success:
flash(f'用户 {username} 密码修改成功', 'success')
else:
flash(f'修改用户 {username} 密码失败', 'error')
return redirect(url_for('user_management'))
# 修改用户权限路由
@app.route('/change_permission/<username>', methods=['POST'])
@admin_required
def change_permission(username):
"""
修改用户权限(仅管理员可访问)
"""
new_permission = int(request.form.get('permission', 1))
success = update_user_permission(username, new_permission)
if success:
flash(f'用户 {username} 权限修改成功', 'success')
else:
flash(f'修改用户 {username} 权限失败', 'error')
return redirect(url_for('user_management'))
# 删除用户路由
@app.route('/delete_user/<username>', methods=['POST'])
@admin_required
def delete_user_route(username):
"""
删除用户(仅管理员可访问)
"""
success = delete_user(username)
if success:
flash(f'用户 {username} 删除成功', 'success')
else:
flash(f'删除用户 {username} 失败', 'error')
return redirect(url_for('user_management'))
# 个人设置页面路由
@app.route('/profile')
@login_required
def profile():
"""
显示个人设置页面
"""
return render_template('profile.html')
# 修改个人密码路由
@app.route('/change_own_password', methods=['POST'])
@login_required
def change_own_password():
"""
用户修改自己的密码
"""
old_password = request.form.get('old_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# 验证输入
if not old_password or not new_password or not confirm_password:
flash('请填写所有密码字段', 'error')
return redirect(url_for('profile'))
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return redirect(url_for('profile'))
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return redirect(url_for('profile'))
# 调用修改密码函数
success = update_user_own_password(session['user_id'], old_password, new_password)
if success:
flash('密码修改成功', 'success')
else:
flash('密码修改失败,请检查旧密码是否正确', 'error')
return redirect(url_for('profile'))
# 个人数据页面路由
@app.route('/my_data')
@login_required
def my_data():
"""
显示用户自己的数据
"""
user_id = session['user_id']
keyword = request.args.get('keyword', '')
# 查询用户自己的数据
if keyword:
data = search_data_by_user(user_id, keyword)
else:
data = search_data_by_user(user_id)
# 将data字段从字符串转换回JSON格式以便显示
processed_data = []
for item in data:
if 'data' in item and item['data']:
try:
# 将data字段的字符串转换回JSON
original_data = string_to_json(item['data'])
# 合并原始数据和其他字段
display_item = {
'_id': item['_id'],
'image': item.get('image', ''),
**original_data # 展开原始数据字段
}
processed_data.append(display_item)
except Exception as e:
# 如果转换失败,保持原始格式
processed_data.append(item)
else:
processed_data.append(item)
return render_template('my_data.html', data=processed_data, keyword=keyword)
# 首页路由 # 首页路由
@app.route('/') @app.route('/')
@login_required
def index(): def index():
""" """
渲染首页模板 渲染首页模板
@@ -151,6 +442,7 @@ def index():
# 图片上传路由 # 图片上传路由
@app.route('/upload', methods=['POST']) @app.route('/upload', methods=['POST'])
@user_or_admin_required
def upload_image(): def upload_image():
""" """
处理图片上传请求调用OCR识别但不存储结果 处理图片上传请求调用OCR识别但不存储结果
@@ -234,6 +526,7 @@ def confirm_data():
# 搜索路由 # 搜索路由
@app.route('/search') @app.route('/search')
@user_or_admin_required
def search(): def search():
""" """
处理搜索请求从Elasticsearch中检索匹配的数据 处理搜索请求从Elasticsearch中检索匹配的数据
@@ -273,6 +566,7 @@ def search():
# 结果页面路由 # 结果页面路由
@app.route('/results') @app.route('/results')
@user_or_admin_required
def results_page(): def results_page():
""" """
渲染搜索结果页面 渲染搜索结果页面
@@ -284,6 +578,7 @@ def results_page():
# 显示所有数据路由 # 显示所有数据路由
@app.route('/all') @app.route('/all')
@admin_required
def show_all(): def show_all():
""" """
获取所有数据并渲染到页面 获取所有数据并渲染到页面
@@ -331,22 +626,118 @@ def serve_image(filename):
# 删除数据路由 # 删除数据路由
@app.route('/delete/<doc_id>', methods=['POST']) @app.route('/delete/<doc_id>', methods=['POST'])
@login_required
def delete_entry(doc_id): def delete_entry(doc_id):
""" """
根据文档ID删除数据 根据文档ID删除数据
参数: 参数:
doc_id (str): 要删除的文档ID doc_id (str): 要删除的文档ID
返回: 返回:
重定向到所有数据页面或错误信息 重定向到所有数据页面或错误信息
""" """
user_id = session['user_id']
user_permission = session.get('permission', 1)
# 管理员可以删除所有数据,普通用户只能删除自己的数据
if user_permission == 0: # 管理员
success = delete_by_id(doc_id)
redirect_url = 'show_all'
else: # 普通用户
success = delete_data_by_id(doc_id, user_id)
redirect_url = 'my_data'
if success:
return redirect(url_for(redirect_url))
if delete_by_id(doc_id): if delete_by_id(doc_id):
return redirect(url_for('show_all')) return redirect(url_for('show_all'))
else: else:
return "删除失败", 500 return "删除失败", 500
@app.route('/edit/<doc_id>', methods=['GET', 'POST'])
@login_required
def edit_entry(doc_id):
"""
编辑数据条目(用户只能编辑自己的数据)
"""
if request.method == 'GET':
# 获取要编辑的数据
try:
# 先获取文档检查权限
response = requests.get(
f"{ES_URL}/{data_index_name}/_doc/{doc_id}",
auth=AUTH
)
response.raise_for_status()
doc = response.json()
if not doc.get("found"):
flash('数据不存在', 'error')
return redirect(url_for('my_data'))
# 检查权限
user_id = session['user_id']
user_permission = session.get('permission', 1)
doc_user_id = doc["_source"].get("user_id")
# 管理员可以编辑所有数据,普通用户只能编辑自己的数据
if user_permission != 0 and doc_user_id != user_id:
flash('您无权编辑此数据', 'error')
return redirect(url_for('my_data'))
# 解析数据
data_str = doc["_source"].get("data", "{}")
original_data = string_to_json(data_str)
edit_data = {
'_id': doc_id,
'image': doc["_source"].get('image', ''),
**original_data
}
return render_template('edit.html', data=edit_data)
except Exception as e:
flash('获取数据失败', 'error')
return redirect(url_for('my_data'))
else: # POST 请求 - 保存编辑
try:
# 获取编辑后的数据
edited_data = {}
for key, value in request.form.items():
if key != '_id' and key != 'image':
edited_data[key] = value
# 转换为字符串格式
data_string = json_to_string(edited_data)
# 构造更新数据
updated_data = {
"data": data_string,
"image": request.form.get('image', ''),
"user_id": session['user_id']
}
# 更新数据
success = update_data_by_id(doc_id, updated_data, session['user_id'])
if success:
flash('数据更新成功', 'success')
else:
flash('数据更新失败', 'error')
# 根据用户权限重定向
if session.get('permission', 1) == 0:
return redirect(url_for('show_all'))
else:
return redirect(url_for('my_data'))
except Exception as e:
flash('保存数据失败', 'error')
return redirect(url_for('my_data'))
# 主程序入口 # 主程序入口
if __name__ == '__main__': if __name__ == '__main__':