import base64 from flask import Flask, request, render_template, redirect, url_for, jsonify, session, flash, send_from_directory from werkzeug.utils import secure_filename import os import uuid from PIL import Image import re import json import requests from ESConnect import * from json_converter import json_to_string, string_to_json from openai import OpenAI from functools import wraps # import config # 创建Flask应用实例 app = Flask(__name__) # 设置会话密钥,用于加密会话数据 app.secret_key = 'your-secret-key-change-this-in-production' # app.config.from_object(config.Config) # 权限装饰器 def login_required(f): """要求用户登录的装饰器""" @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: flash('请先登录', 'error') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function def admin_required(f): """要求管理员权限的装饰器""" @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: flash('请先登录', 'error') return redirect(url_for('login')) if session.get('permission', 1) != 0: flash('权限不足,需要管理员权限', 'error') return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function def user_or_admin_required(f): """要求普通用户或管理员权限的装饰器""" @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: flash('请先登录', 'error') return redirect(url_for('login')) permission = session.get('permission', 1) if permission not in [0, 1]: flash('权限不足', 'error') return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function # OCR和信息提取函数,使用大模型API处理图片并提取结构化信息 def ocr_and_extract_info(image_path): """ 使用大模型API进行OCR识别并提取图片中的结构化信息 参数: image_path (str): 图片文件路径 返回: dict: 包含提取信息的字典,格式为 {'id': '', 'name': '', 'students': '', 'teacher': ''} """ def encode_image(image_path): """ 将图片编码为base64格式 参数: image_path (str): 图片文件路径 返回: str: base64编码的图片字符串 """ with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # 将图片转换为base64编码 base64_image = encode_image(image_path) # 初始化OpenAI客户端,使用百度AI Studio的API client = OpenAI( api_key="188f57db3766e02ed2c7e18373996d84f4112272", # 含有 AI Studio 访问令牌的环境变量,https://aistudio.baidu.com/account/accessToken, base_url="https://aistudio.baidu.com/llm/lmapi/v3", # aistudio 大模型 api 服务域名 ) # 调用大模型API进行图片识别和信息提取 chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': '你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。'}, {'role': 'user', "content": [ {"type": "text", "text": "请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析" "直接输出json结果即可" "你可以自行决定使用哪些json字段"}, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}" } } ]} ], model="ernie-4.5-turbo-vl-32k", # 使用百度文心大模型 ) # 获取API返回的文本内容 response_text = chat_completion.choices[0].message.content # 添加调试信息:输出模型返回的原始字符串 print("=" * 50) print("模型返回的原始字符串:") print(response_text) print("=" * 50) def parse_respound(text): """ 解析API返回的文本,提取JSON数据 参数: text (str): API返回的文本 返回: dict or None: 解析成功返回字典,失败返回None """ # 尝试直接解析标准JSON try: result=json.loads(text) if result: print("✓ 成功解析标准JSON格式") return result except json.JSONDecodeError: print("✗ 无法解析标准JSON格式") pass # 提取markdown代码块中的内容 code_block = re.search(r'```json\n(.*?)```', text, re.DOTALL) if code_block: try: result=json.loads(code_block.group(1)) if result: print("✓ 成功解析markdown代码块中的JSON") return result except json.JSONDecodeError: print("✗ 无法解析markdown代码块中的JSON") pass # 尝试替换单引号并解析 try: fixed_json = text.replace("'", "\"") result=json.loads(fixed_json) if(result): print("✓ 成功解析替换单引号后的JSON") return result except json.JSONDecodeError: print("✗ 无法解析替换单引号后的JSON") pass # 解析API返回的文本 result_data = parse_respound(response_text) # 添加调试信息:输出解析结果 print("解析结果:") if result_data: print(f"✓ 解析成功: {result_data}") else: print("✗ 解析失败,返回None") print("=" * 50) return result_data """ 模拟大模型识别图像并返回结构化JSON。 实际应调用Qwen-VL或其他OCR+解析服务。 """ # 登录页面路由 @app.route('/login', methods=['GET', 'POST']) def login(): """ 处理用户登录 GET: 显示登录页面 POST: 处理登录表单提交 """ if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if not username or not password: flash('请输入用户名和密码', 'error') return render_template('login.html') # 验证用户 user_data = verify_user(username, password) if user_data: # 登录成功,设置会话 session['user_id'] = user_data['user_id'] session['username'] = user_data['username'] session['permission'] = user_data['premission'] flash(f'欢迎回来,{username}!', 'success') return redirect(url_for('index')) else: flash('用户名或密码错误', 'error') return render_template('login.html') return render_template('login.html') # 登出路由 @app.route('/logout') def logout(): """ 处理用户登出 """ session.clear() flash('已成功登出', 'info') return redirect(url_for('login')) # 用户管理页面路由 @app.route('/user_management') @admin_required def user_management(): """ 显示用户管理页面(仅管理员可访问) """ users = get_all_users() return render_template('user_management.html', users=users) # 注册新用户路由 @app.route('/register', methods=['GET', 'POST']) @admin_required def register(): """ 注册新用户(仅管理员可访问) GET: 显示注册页面 POST: 处理注册表单提交 """ if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') confirm_password = request.form.get('confirm_password') permission = int(request.form.get('permission', 1)) # 验证输入 if not username or not password: flash('请输入用户名和密码', 'error') return render_template('register.html') if password != confirm_password: flash('两次输入的密码不一致', 'error') return render_template('register.html') if len(password) < 6: flash('密码长度至少6位', 'error') return render_template('register.html') # 检查用户名是否已存在 existing_user = get_user_by_username(username) if existing_user: flash('用户名已存在', 'error') return render_template('register.html') # 创建新用户 success = create_user(username, password, permission) if success: flash(f'用户 {username} 创建成功', 'success') return redirect(url_for('user_management')) else: flash('创建用户失败', 'error') return render_template('register.html') return render_template('register.html') # 修改用户密码路由 @app.route('/change_password/', methods=['POST']) @admin_required def change_password(username): """ 修改用户密码(仅管理员可访问) """ new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') if not new_password or not confirm_password: flash('请输入新密码', 'error') return redirect(url_for('user_management')) if new_password != confirm_password: flash('两次输入的密码不一致', 'error') return redirect(url_for('user_management')) if len(new_password) < 6: flash('密码长度至少6位', 'error') return redirect(url_for('user_management')) success = update_user_password(username, new_password) if success: flash(f'用户 {username} 密码修改成功', 'success') else: flash(f'修改用户 {username} 密码失败', 'error') return redirect(url_for('user_management')) # 修改用户权限路由 @app.route('/change_permission/', methods=['POST']) @admin_required def change_permission(username): """ 修改用户权限(仅管理员可访问) """ new_permission = int(request.form.get('permission', 1)) success = update_user_permission(username, new_permission) if success: flash(f'用户 {username} 权限修改成功', 'success') else: flash(f'修改用户 {username} 权限失败', 'error') return redirect(url_for('user_management')) # 删除用户路由 @app.route('/delete_user/', methods=['POST']) @admin_required def delete_user_route(username): """ 删除用户(仅管理员可访问) """ success = delete_user(username) if success: flash(f'用户 {username} 删除成功', 'success') else: flash(f'删除用户 {username} 失败', 'error') return redirect(url_for('user_management')) # 个人设置页面路由 @app.route('/profile') @login_required def profile(): """ 显示个人设置页面 """ return render_template('profile.html') # 修改个人密码路由 @app.route('/change_own_password', methods=['POST']) @login_required def change_own_password(): """ 用户修改自己的密码 """ old_password = request.form.get('old_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') # 验证输入 if not old_password or not new_password or not confirm_password: flash('请填写所有密码字段', 'error') return redirect(url_for('profile')) if new_password != confirm_password: flash('两次输入的新密码不一致', 'error') return redirect(url_for('profile')) if len(new_password) < 6: flash('新密码长度至少6位', 'error') return redirect(url_for('profile')) # 调用修改密码函数 success = update_user_own_password(session['user_id'], old_password, new_password) if success: flash('密码修改成功', 'success') else: flash('密码修改失败,请检查旧密码是否正确', 'error') return redirect(url_for('profile')) # 个人数据页面路由 @app.route('/my_data') @login_required def my_data(): """ 显示用户自己的数据 """ user_id = session['user_id'] keyword = request.args.get('keyword', '') # 查询用户自己的数据 if keyword: data = search_data_by_user(user_id, keyword) else: data = search_data_by_user(user_id) # 将data字段从字符串转换回JSON格式以便显示 processed_data = [] for item in data: if 'data' in item and item['data']: try: # 将data字段的字符串转换回JSON original_data = string_to_json(item['data']) # 合并原始数据和其他字段 display_item = { '_id': item['_id'], 'image': item.get('image', ''), **original_data # 展开原始数据字段 } processed_data.append(display_item) except Exception as e: # 如果转换失败,保持原始格式 processed_data.append(item) else: processed_data.append(item) return render_template('my_data.html', data=processed_data, keyword=keyword) # 首页路由 @app.route('/') @login_required def index(): """ 渲染首页模板 返回: str: 渲染后的HTML页面 """ return render_template('index.html') # 图片上传路由 @app.route('/upload', methods=['POST']) @user_or_admin_required def upload_image(): """ 处理图片上传请求,调用OCR识别但不存储结果 返回: JSON: 识别结果,供用户编辑确认 """ # 获取上传的文件 file = request.files.get('file') if not file: return jsonify({"error": "No file uploaded"}), 400 # 保存上传的图片 filename = f"{uuid.uuid4()}_{file.filename}" image_path = os.path.join("image", filename) file.save(image_path) # 调用大模型进行识别 try: print(f"开始处理图片: {image_path}") original_data = ocr_and_extract_info(image_path) # 获取原始JSON数据 if original_data: print(f"识别成功: {original_data}") # 返回识别结果和图片文件名,供用户编辑确认 return jsonify({ "message": "识别成功,请确认数据后点击录入", "data": original_data, "image": filename }) else: print("✗ 无法识别图片内容") return jsonify({"error": "无法识别图片内容"}), 400 except Exception as e: print(f"✗ 处理过程中发生错误: {str(e)}") return jsonify({"error": str(e)}), 500 # 确认录入路由 @app.route('/confirm', methods=['POST']) @user_or_admin_required def confirm_data(): """ 确认并录入用户编辑后的数据 返回: JSON: 录入成功或失败的响应 """ try: # 获取前端提交的数据 request_data = request.get_json() if not request_data: return jsonify({"error": "没有接收到数据"}), 400 # 获取编辑后的数据和图片文件名 edited_data = request_data.get('data', {}) image_filename = request_data.get('image', '') if not edited_data: return jsonify({"error": "数据不能为空"}), 400 # 使用json_converter将JSON数据转换为字符串 data_string = json_to_string(edited_data) print(f"转换后的数据字符串: {data_string}") # 构造新的数据结构,只包含data和image字段,并添加用户ID processed_data = { "data": data_string, "image": image_filename, # 存储图片文件名 "user_id": session['user_id'] # 添加用户ID关联 } print(f"准备存储的数据: {processed_data}") # 存入ES insert_data(processed_data) print("✓ 数据成功存储到Elasticsearch") return jsonify({"message": "数据录入成功", "data": edited_data}) except Exception as e: print(f"✗ 录入过程中发生错误: {str(e)}") return jsonify({"error": str(e)}), 500 # 搜索路由 @app.route('/search') @user_or_admin_required def search(): """ 处理搜索请求,从Elasticsearch中检索匹配的数据 返回: JSON: 搜索结果列表 """ keyword = request.args.get('q') if not keyword: return jsonify([]) results = search_by_any_field(keyword) # 处理搜索结果,将data字段转换回JSON格式 processed_results = [] for result in results: if '_source' in result and 'data' in result['_source']: try: # 将data字段的字符串转换回JSON original_data = string_to_json(result['_source']['data']) # 构造新的结果格式 processed_result = { '_id': result.get('_id', ''), '_source': { 'image': result['_source'].get('image', ''), **original_data # 展开原始数据字段 } } processed_results.append(processed_result) except Exception as e: # 如果转换失败,保持原始格式 processed_results.append(result) else: processed_results.append(result) print(processed_results) return jsonify(processed_results) # 结果页面路由 @app.route('/results') @user_or_admin_required def results_page(): """ 渲染搜索结果页面 返回: str: 渲染后的HTML页面 """ return render_template('results.html') # 显示所有数据路由 @app.route('/all') @admin_required def show_all(): """ 获取所有数据并渲染到页面 返回: str: 渲染后的HTML页面,包含所有数据 """ all_data = search_all() # 将data字段从字符串转换回JSON格式以便显示 processed_data = [] for item in all_data: if 'data' in item and item['data']: try: # 将data字段的字符串转换回JSON original_data = string_to_json(item['data']) # 合并原始数据和其他字段 display_item = { '_id': item['_id'], 'image': item.get('image', ''), **original_data # 展开原始数据字段 } processed_data.append(display_item) except Exception as e: # 如果转换失败,保持原始格式 processed_data.append(item) else: processed_data.append(item) return render_template('all.html', data=processed_data) # 添加图片路由 @app.route('/image/') def serve_image(filename): """ 提供图片文件服务 参数: filename (str): 图片文件名 返回: Response: 图片文件响应 """ from flask import send_from_directory return send_from_directory('image', filename) # 删除数据路由 @app.route('/delete/', methods=['POST']) @login_required def delete_entry(doc_id): """ 根据文档ID删除数据(用户只能删除自己的数据,管理员可以删除所有数据) 参数: doc_id (str): 要删除的文档ID 返回: 重定向到相应页面或错误信息 """ user_id = session['user_id'] user_permission = session.get('permission', 1) # 管理员可以删除所有数据,普通用户只能删除自己的数据 if user_permission == 0: # 管理员 success = delete_by_id(doc_id) redirect_url = 'show_all' else: # 普通用户 success = delete_data_by_id(doc_id, user_id) redirect_url = 'my_data' if success: return redirect(url_for(redirect_url)) else: return "删除失败", 500 # 编辑数据路由 @app.route('/edit/', methods=['GET', 'POST']) @login_required def edit_entry(doc_id): """ 编辑数据条目(用户只能编辑自己的数据) """ if request.method == 'GET': # 获取要编辑的数据 try: # 先获取文档检查权限 response = requests.get( f"{ES_URL}/{data_index_name}/_doc/{doc_id}", auth=AUTH ) response.raise_for_status() doc = response.json() if not doc.get("found"): flash('数据不存在', 'error') return redirect(url_for('my_data')) # 检查权限 user_id = session['user_id'] user_permission = session.get('permission', 1) doc_user_id = doc["_source"].get("user_id") # 管理员可以编辑所有数据,普通用户只能编辑自己的数据 if user_permission != 0 and doc_user_id != user_id: flash('您无权编辑此数据', 'error') return redirect(url_for('my_data')) # 解析数据 data_str = doc["_source"].get("data", "{}") original_data = string_to_json(data_str) edit_data = { '_id': doc_id, 'image': doc["_source"].get('image', ''), **original_data } return render_template('edit.html', data=edit_data) except Exception as e: flash('获取数据失败', 'error') return redirect(url_for('my_data')) else: # POST 请求 - 保存编辑 try: # 获取编辑后的数据 edited_data = {} for key, value in request.form.items(): if key != '_id' and key != 'image': edited_data[key] = value # 转换为字符串格式 data_string = json_to_string(edited_data) # 构造更新数据 updated_data = { "data": data_string, "image": request.form.get('image', ''), "user_id": session['user_id'] } # 更新数据 success = update_data_by_id(doc_id, updated_data, session['user_id']) if success: flash('数据更新成功', 'success') else: flash('数据更新失败', 'error') # 根据用户权限重定向 if session.get('permission', 1) == 0: return redirect(url_for('show_all')) else: return redirect(url_for('my_data')) except Exception as e: flash('保存数据失败', 'error') return redirect(url_for('my_data')) # 主程序入口 if __name__ == '__main__': # 创建Elasticsearch索引 create_index_with_mapping() # 创建图片存储目录 os.makedirs("image", exist_ok=True) # 启动Flask应用 app.run(use_reloader=False)