Files
Achievement_Inputing/app.py

752 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
from flask import Flask, request, render_template, redirect, url_for, jsonify, session, flash, send_from_directory
from werkzeug.utils import secure_filename
import os
import uuid
from PIL import Image
import re
import json
import requests
from ESConnect import *
from json_converter import json_to_string, string_to_json
from openai import OpenAI
from functools import wraps
# import config
# 创建Flask应用实例
app = Flask(__name__)
# 设置会话密钥,用于加密会话数据
app.secret_key = 'your-secret-key-change-this-in-production'
# app.config.from_object(config.Config)
# 权限装饰器
def login_required(f):
"""要求用户登录的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'error')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""要求管理员权限的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'error')
return redirect(url_for('login'))
if session.get('permission', 1) != 0:
flash('权限不足,需要管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
def user_or_admin_required(f):
"""要求普通用户或管理员权限的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'error')
return redirect(url_for('login'))
permission = session.get('permission', 1)
if permission not in [0, 1]:
flash('权限不足', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
# OCR和信息提取函数使用大模型API处理图片并提取结构化信息
def ocr_and_extract_info(image_path):
"""
使用大模型API进行OCR识别并提取图片中的结构化信息
参数:
image_path (str): 图片文件路径
返回:
dict: 包含提取信息的字典,格式为 {'id': '', 'name': '', 'students': '', 'teacher': ''}
"""
def encode_image(image_path):
"""
将图片编码为base64格式
参数:
image_path (str): 图片文件路径
返回:
str: base64编码的图片字符串
"""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
# 将图片转换为base64编码
base64_image = encode_image(image_path)
# 初始化OpenAI客户端使用百度AI Studio的API
client = OpenAI(
api_key="188f57db3766e02ed2c7e18373996d84f4112272",
# 含有 AI Studio 访问令牌的环境变量https://aistudio.baidu.com/account/accessToken,
base_url="https://aistudio.baidu.com/llm/lmapi/v3", # aistudio 大模型 api 服务域名
)
# 调用大模型API进行图片识别和信息提取
chat_completion = client.chat.completions.create(
messages=[
{'role': 'system', 'content': '你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。'},
{'role': 'user', "content": [
{"type": "text", "text": "请识别这张图片中的信息将你认为重要的数据转换为不包含嵌套的json不要显示其它信息以便于解析"
"直接输出json结果即可"
"你可以自行决定使用哪些json字段"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}"
}
}
]}
],
model="ernie-4.5-turbo-vl-32k", # 使用百度文心大模型
)
# 获取API返回的文本内容
response_text = chat_completion.choices[0].message.content
# 添加调试信息:输出模型返回的原始字符串
print("=" * 50)
print("模型返回的原始字符串:")
print(response_text)
print("=" * 50)
def parse_respound(text):
"""
解析API返回的文本提取JSON数据
参数:
text (str): API返回的文本
返回:
dict or None: 解析成功返回字典失败返回None
"""
# 尝试直接解析标准JSON
try:
result=json.loads(text)
if result:
print("✓ 成功解析标准JSON格式")
return result
except json.JSONDecodeError:
print("✗ 无法解析标准JSON格式")
pass
# 提取markdown代码块中的内容
code_block = re.search(r'```json\n(.*?)```', text, re.DOTALL)
if code_block:
try:
result=json.loads(code_block.group(1))
if result:
print("✓ 成功解析markdown代码块中的JSON")
return result
except json.JSONDecodeError:
print("✗ 无法解析markdown代码块中的JSON")
pass
# 尝试替换单引号并解析
try:
fixed_json = text.replace("'", "\"")
result=json.loads(fixed_json)
if(result):
print("✓ 成功解析替换单引号后的JSON")
return result
except json.JSONDecodeError:
print("✗ 无法解析替换单引号后的JSON")
pass
# 解析API返回的文本
result_data = parse_respound(response_text)
# 添加调试信息:输出解析结果
print("解析结果:")
if result_data:
print(f"✓ 解析成功: {result_data}")
else:
print("✗ 解析失败返回None")
print("=" * 50)
return result_data
"""
模拟大模型识别图像并返回结构化JSON。
实际应调用Qwen-VL或其他OCR+解析服务。
"""
# 登录页面路由
@app.route('/login', methods=['GET', 'POST'])
def login():
"""
处理用户登录
GET: 显示登录页面
POST: 处理登录表单提交
"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('login.html')
# 验证用户
user_data = verify_user(username, password)
if user_data:
# 登录成功,设置会话
session['user_id'] = user_data['user_id']
session['username'] = user_data['username']
session['permission'] = user_data['premission']
flash(f'欢迎回来,{username}', 'success')
return redirect(url_for('index'))
else:
flash('用户名或密码错误', 'error')
return render_template('login.html')
return render_template('login.html')
# 登出路由
@app.route('/logout')
def logout():
"""
处理用户登出
"""
session.clear()
flash('已成功登出', 'info')
return redirect(url_for('login'))
# 用户管理页面路由
@app.route('/user_management')
@admin_required
def user_management():
"""
显示用户管理页面(仅管理员可访问)
"""
users = get_all_users()
return render_template('user_management.html', users=users)
# 注册新用户路由
@app.route('/register', methods=['GET', 'POST'])
@admin_required
def register():
"""
注册新用户(仅管理员可访问)
GET: 显示注册页面
POST: 处理注册表单提交
"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
permission = int(request.form.get('permission', 1))
# 验证输入
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('register.html')
if password != confirm_password:
flash('两次输入的密码不一致', 'error')
return render_template('register.html')
if len(password) < 6:
flash('密码长度至少6位', 'error')
return render_template('register.html')
# 检查用户名是否已存在
existing_user = get_user_by_username(username)
if existing_user:
flash('用户名已存在', 'error')
return render_template('register.html')
# 创建新用户
success = create_user(username, password, permission)
if success:
flash(f'用户 {username} 创建成功', 'success')
return redirect(url_for('user_management'))
else:
flash('创建用户失败', 'error')
return render_template('register.html')
return render_template('register.html')
# 修改用户密码路由
@app.route('/change_password/<username>', methods=['POST'])
@admin_required
def change_password(username):
"""
修改用户密码(仅管理员可访问)
"""
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
if not new_password or not confirm_password:
flash('请输入新密码', 'error')
return redirect(url_for('user_management'))
if new_password != confirm_password:
flash('两次输入的密码不一致', 'error')
return redirect(url_for('user_management'))
if len(new_password) < 6:
flash('密码长度至少6位', 'error')
return redirect(url_for('user_management'))
success = update_user_password(username, new_password)
if success:
flash(f'用户 {username} 密码修改成功', 'success')
else:
flash(f'修改用户 {username} 密码失败', 'error')
return redirect(url_for('user_management'))
# 修改用户权限路由
@app.route('/change_permission/<username>', methods=['POST'])
@admin_required
def change_permission(username):
"""
修改用户权限(仅管理员可访问)
"""
new_permission = int(request.form.get('permission', 1))
success = update_user_permission(username, new_permission)
if success:
flash(f'用户 {username} 权限修改成功', 'success')
else:
flash(f'修改用户 {username} 权限失败', 'error')
return redirect(url_for('user_management'))
# 删除用户路由
@app.route('/delete_user/<username>', methods=['POST'])
@admin_required
def delete_user_route(username):
"""
删除用户(仅管理员可访问)
"""
success = delete_user(username)
if success:
flash(f'用户 {username} 删除成功', 'success')
else:
flash(f'删除用户 {username} 失败', 'error')
return redirect(url_for('user_management'))
# 个人设置页面路由
@app.route('/profile')
@login_required
def profile():
"""
显示个人设置页面
"""
return render_template('profile.html')
# 修改个人密码路由
@app.route('/change_own_password', methods=['POST'])
@login_required
def change_own_password():
"""
用户修改自己的密码
"""
old_password = request.form.get('old_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# 验证输入
if not old_password or not new_password or not confirm_password:
flash('请填写所有密码字段', 'error')
return redirect(url_for('profile'))
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return redirect(url_for('profile'))
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return redirect(url_for('profile'))
# 调用修改密码函数
success = update_user_own_password(session['user_id'], old_password, new_password)
if success:
flash('密码修改成功', 'success')
else:
flash('密码修改失败,请检查旧密码是否正确', 'error')
return redirect(url_for('profile'))
# 个人数据页面路由
@app.route('/my_data')
@login_required
def my_data():
"""
显示用户自己的数据
"""
user_id = session['user_id']
keyword = request.args.get('keyword', '')
# 查询用户自己的数据
if keyword:
data = search_data_by_user(user_id, keyword)
else:
data = search_data_by_user(user_id)
# 将data字段从字符串转换回JSON格式以便显示
processed_data = []
for item in data:
if 'data' in item and item['data']:
try:
# 将data字段的字符串转换回JSON
original_data = string_to_json(item['data'])
# 合并原始数据和其他字段
display_item = {
'_id': item['_id'],
'image': item.get('image', ''),
**original_data # 展开原始数据字段
}
processed_data.append(display_item)
except Exception as e:
# 如果转换失败,保持原始格式
processed_data.append(item)
else:
processed_data.append(item)
return render_template('my_data.html', data=processed_data, keyword=keyword)
# 首页路由
@app.route('/')
@login_required
def index():
"""
渲染首页模板
返回:
str: 渲染后的HTML页面
"""
return render_template('index.html')
# 图片上传路由
@app.route('/upload', methods=['POST'])
@user_or_admin_required
def upload_image():
"""
处理图片上传请求调用OCR识别但不存储结果
返回:
JSON: 识别结果,供用户编辑确认
"""
# 获取上传的文件
file = request.files.get('file')
if not file:
return jsonify({"error": "No file uploaded"}), 400
# 保存上传的图片
filename = f"{uuid.uuid4()}_{file.filename}"
image_path = os.path.join("image", filename)
file.save(image_path)
# 调用大模型进行识别
try:
print(f"开始处理图片: {image_path}")
original_data = ocr_and_extract_info(image_path) # 获取原始JSON数据
if original_data:
print(f"识别成功: {original_data}")
# 返回识别结果和图片文件名,供用户编辑确认
return jsonify({
"message": "识别成功,请确认数据后点击录入",
"data": original_data,
"image": filename
})
else:
print("✗ 无法识别图片内容")
return jsonify({"error": "无法识别图片内容"}), 400
except Exception as e:
print(f"✗ 处理过程中发生错误: {str(e)}")
return jsonify({"error": str(e)}), 500
# 确认录入路由
@app.route('/confirm', methods=['POST'])
<<<<<<< HEAD
@user_or_admin_required
=======
>>>>>>> 30645e46ff2a6ee5c12fd95fb21b7eb4fb51c5f0
def confirm_data():
"""
确认并录入用户编辑后的数据
返回:
JSON: 录入成功或失败的响应
"""
try:
# 获取前端提交的数据
request_data = request.get_json()
if not request_data:
return jsonify({"error": "没有接收到数据"}), 400
# 获取编辑后的数据和图片文件名
edited_data = request_data.get('data', {})
image_filename = request_data.get('image', '')
if not edited_data:
return jsonify({"error": "数据不能为空"}), 400
# 使用json_converter将JSON数据转换为字符串
data_string = json_to_string(edited_data)
print(f"转换后的数据字符串: {data_string}")
<<<<<<< HEAD
# 构造新的数据结构只包含data和image字段并添加用户ID
processed_data = {
"data": data_string,
"image": image_filename, # 存储图片文件名
"user_id": session['user_id'] # 添加用户ID关联
=======
# 构造新的数据结构只包含data和image字段
processed_data = {
"data": data_string,
"image": image_filename # 存储图片文件名
>>>>>>> 30645e46ff2a6ee5c12fd95fb21b7eb4fb51c5f0
}
print(f"准备存储的数据: {processed_data}")
# 存入ES
insert_data(processed_data)
print("✓ 数据成功存储到Elasticsearch")
return jsonify({"message": "数据录入成功", "data": edited_data})
except Exception as e:
print(f"✗ 录入过程中发生错误: {str(e)}")
return jsonify({"error": str(e)}), 500
# 搜索路由
@app.route('/search')
@user_or_admin_required
def search():
"""
处理搜索请求从Elasticsearch中检索匹配的数据
返回:
JSON: 搜索结果列表
"""
keyword = request.args.get('q')
if not keyword:
return jsonify([])
results = search_by_any_field(keyword)
# 处理搜索结果将data字段转换回JSON格式
processed_results = []
for result in results:
if '_source' in result and 'data' in result['_source']:
try:
# 将data字段的字符串转换回JSON
original_data = string_to_json(result['_source']['data'])
# 构造新的结果格式
processed_result = {
'_id': result.get('_id', ''),
'_source': {
'image': result['_source'].get('image', ''),
**original_data # 展开原始数据字段
}
}
processed_results.append(processed_result)
except Exception as e:
# 如果转换失败,保持原始格式
processed_results.append(result)
else:
processed_results.append(result)
print(processed_results)
return jsonify(processed_results)
# 结果页面路由
@app.route('/results')
@user_or_admin_required
def results_page():
"""
渲染搜索结果页面
返回:
str: 渲染后的HTML页面
"""
return render_template('results.html')
# 显示所有数据路由
@app.route('/all')
@admin_required
def show_all():
"""
获取所有数据并渲染到页面
返回:
str: 渲染后的HTML页面包含所有数据
"""
all_data = search_all()
# 将data字段从字符串转换回JSON格式以便显示
processed_data = []
for item in all_data:
if 'data' in item and item['data']:
try:
# 将data字段的字符串转换回JSON
original_data = string_to_json(item['data'])
# 合并原始数据和其他字段
display_item = {
'_id': item['_id'],
'image': item.get('image', ''),
**original_data # 展开原始数据字段
}
processed_data.append(display_item)
except Exception as e:
# 如果转换失败,保持原始格式
processed_data.append(item)
else:
processed_data.append(item)
return render_template('all.html', data=processed_data)
# 添加图片路由
@app.route('/image/<filename>')
def serve_image(filename):
"""
提供图片文件服务
参数:
filename (str): 图片文件名
返回:
Response: 图片文件响应
"""
from flask import send_from_directory
return send_from_directory('image', filename)
# 删除数据路由
@app.route('/delete/<doc_id>', methods=['POST'])
@login_required
def delete_entry(doc_id):
"""
根据文档ID删除数据用户只能删除自己的数据管理员可以删除所有数据
参数:
doc_id (str): 要删除的文档ID
返回:
重定向到相应页面或错误信息
"""
user_id = session['user_id']
user_permission = session.get('permission', 1)
# 管理员可以删除所有数据,普通用户只能删除自己的数据
if user_permission == 0: # 管理员
success = delete_by_id(doc_id)
redirect_url = 'show_all'
else: # 普通用户
success = delete_data_by_id(doc_id, user_id)
redirect_url = 'my_data'
if success:
return redirect(url_for(redirect_url))
else:
return "删除失败", 500
# 编辑数据路由
@app.route('/edit/<doc_id>', methods=['GET', 'POST'])
@login_required
def edit_entry(doc_id):
"""
编辑数据条目(用户只能编辑自己的数据)
"""
if request.method == 'GET':
# 获取要编辑的数据
try:
# 先获取文档检查权限
response = requests.get(
f"{ES_URL}/{data_index_name}/_doc/{doc_id}",
auth=AUTH
)
response.raise_for_status()
doc = response.json()
if not doc.get("found"):
flash('数据不存在', 'error')
return redirect(url_for('my_data'))
# 检查权限
user_id = session['user_id']
user_permission = session.get('permission', 1)
doc_user_id = doc["_source"].get("user_id")
# 管理员可以编辑所有数据,普通用户只能编辑自己的数据
if user_permission != 0 and doc_user_id != user_id:
flash('您无权编辑此数据', 'error')
return redirect(url_for('my_data'))
# 解析数据
data_str = doc["_source"].get("data", "{}")
original_data = string_to_json(data_str)
edit_data = {
'_id': doc_id,
'image': doc["_source"].get('image', ''),
**original_data
}
return render_template('edit.html', data=edit_data)
except Exception as e:
flash('获取数据失败', 'error')
return redirect(url_for('my_data'))
else: # POST 请求 - 保存编辑
try:
# 获取编辑后的数据
edited_data = {}
for key, value in request.form.items():
if key != '_id' and key != 'image':
edited_data[key] = value
# 转换为字符串格式
data_string = json_to_string(edited_data)
# 构造更新数据
updated_data = {
"data": data_string,
"image": request.form.get('image', ''),
"user_id": session['user_id']
}
# 更新数据
success = update_data_by_id(doc_id, updated_data, session['user_id'])
if success:
flash('数据更新成功', 'success')
else:
flash('数据更新失败', 'error')
# 根据用户权限重定向
if session.get('permission', 1) == 0:
return redirect(url_for('show_all'))
else:
return redirect(url_for('my_data'))
except Exception as e:
flash('保存数据失败', 'error')
return redirect(url_for('my_data'))
# 主程序入口
if __name__ == '__main__':
# 创建Elasticsearch索引
create_index_with_mapping()
# 创建图片存储目录
os.makedirs("image", exist_ok=True)
# 启动Flask应用
app.run(use_reloader=False)