Files
Achievement_Inputing/app.py
Viajero 08994d732d Merge remote-tracking branch 'origin/main-v2'
# Conflicts:
#	ESConnect.py
#	app.py
2025-10-14 15:46:11 +08:00

358 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
from flask import Flask, request, render_template, redirect, url_for, jsonify
import os
import uuid
from PIL import Image
import re
import json
from ESConnect import *
from json_converter import json_to_string, string_to_json
from openai import OpenAI
# import config
# 创建Flask应用实例
app = Flask(__name__)
# app.config.from_object(config.Config)
# OCR和信息提取函数使用大模型API处理图片并提取结构化信息
def ocr_and_extract_info(image_path):
"""
使用大模型API进行OCR识别并提取图片中的结构化信息
参数:
image_path (str): 图片文件路径
返回:
dict: 包含提取信息的字典,格式为 {'id': '', 'name': '', 'students': '', 'teacher': ''}
"""
def encode_image(image_path):
"""
将图片编码为base64格式
参数:
image_path (str): 图片文件路径
返回:
str: base64编码的图片字符串
"""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
# 将图片转换为base64编码
base64_image = encode_image(image_path)
# 初始化OpenAI客户端使用百度AI Studio的API
client = OpenAI(
api_key="188f57db3766e02ed2c7e18373996d84f4112272",
# 含有 AI Studio 访问令牌的环境变量https://aistudio.baidu.com/account/accessToken,
base_url="https://aistudio.baidu.com/llm/lmapi/v3", # aistudio 大模型 api 服务域名
)
# 调用大模型API进行图片识别和信息提取
chat_completion = client.chat.completions.create(
messages=[
{'role': 'system', 'content': '你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。'},
{'role': 'user', "content": [
{"type": "text", "text": "请识别这张图片中的信息将你认为重要的数据转换为不包含嵌套的json不要显示其它信息以便于解析"
"直接输出json结果即可"
"你可以自行决定使用哪些json字段"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}"
}
}
]}
],
model="ernie-4.5-turbo-vl-32k", # 使用百度文心大模型
)
# 获取API返回的文本内容
response_text = chat_completion.choices[0].message.content
# 添加调试信息:输出模型返回的原始字符串
print("=" * 50)
print("模型返回的原始字符串:")
print(response_text)
print("=" * 50)
def parse_respound(text):
"""
解析API返回的文本提取JSON数据
参数:
text (str): API返回的文本
返回:
dict or None: 解析成功返回字典失败返回None
"""
# 尝试直接解析标准JSON
try:
result=json.loads(text)
if result:
print("✓ 成功解析标准JSON格式")
return result
except json.JSONDecodeError:
print("✗ 无法解析标准JSON格式")
pass
# 提取markdown代码块中的内容
code_block = re.search(r'```json\n(.*?)```', text, re.DOTALL)
if code_block:
try:
result=json.loads(code_block.group(1))
if result:
print("✓ 成功解析markdown代码块中的JSON")
return result
except json.JSONDecodeError:
print("✗ 无法解析markdown代码块中的JSON")
pass
# 尝试替换单引号并解析
try:
fixed_json = text.replace("'", "\"")
result=json.loads(fixed_json)
if(result):
print("✓ 成功解析替换单引号后的JSON")
return result
except json.JSONDecodeError:
print("✗ 无法解析替换单引号后的JSON")
pass
# 解析API返回的文本
result_data = parse_respound(response_text)
# 添加调试信息:输出解析结果
print("解析结果:")
if result_data:
print(f"✓ 解析成功: {result_data}")
else:
print("✗ 解析失败返回None")
print("=" * 50)
return result_data
"""
模拟大模型识别图像并返回结构化JSON。
实际应调用Qwen-VL或其他OCR+解析服务。
"""
# 首页路由
@app.route('/')
def index():
"""
渲染首页模板
返回:
str: 渲染后的HTML页面
"""
return render_template('index.html')
# 图片上传路由
@app.route('/upload', methods=['POST'])
def upload_image():
"""
处理图片上传请求调用OCR识别但不存储结果
返回:
JSON: 识别结果,供用户编辑确认
"""
# 获取上传的文件
file = request.files.get('file')
if not file:
return jsonify({"error": "No file uploaded"}), 400
# 保存上传的图片
filename = f"{uuid.uuid4()}_{file.filename}"
image_path = os.path.join("image", filename)
file.save(image_path)
# 调用大模型进行识别
try:
print(f"开始处理图片: {image_path}")
original_data = ocr_and_extract_info(image_path) # 获取原始JSON数据
if original_data:
print(f"识别成功: {original_data}")
# 返回识别结果和图片文件名,供用户编辑确认
return jsonify({
"message": "识别成功,请确认数据后点击录入",
"data": original_data,
"image": filename
})
else:
print("✗ 无法识别图片内容")
return jsonify({"error": "无法识别图片内容"}), 400
except Exception as e:
print(f"✗ 处理过程中发生错误: {str(e)}")
return jsonify({"error": str(e)}), 500
# 确认录入路由
@app.route('/confirm', methods=['POST'])
@user_or_admin_required
def confirm_data():
"""
确认并录入用户编辑后的数据
返回:
JSON: 录入成功或失败的响应
"""
try:
# 获取前端提交的数据
request_data = request.get_json()
if not request_data:
return jsonify({"error": "没有接收到数据"}), 400
# 获取编辑后的数据和图片文件名
edited_data = request_data.get('data', {})
image_filename = request_data.get('image', '')
if not edited_data:
return jsonify({"error": "数据不能为空"}), 400
# 使用json_converter将JSON数据转换为字符串
data_string = json_to_string(edited_data)
print(f"转换后的数据字符串: {data_string}")
# 构造新的数据结构只包含data和image字段并添加用户ID
processed_data = {
"data": data_string,
"image": image_filename, # 存储图片文件名
"user_id": session['user_id'] # 添加用户ID关联
}
print(f"准备存储的数据: {processed_data}")
# 存入ES
insert_data(processed_data)
print("✓ 数据成功存储到Elasticsearch")
return jsonify({"message": "数据录入成功", "data": edited_data})
except Exception as e:
print(f"✗ 录入过程中发生错误: {str(e)}")
return jsonify({"error": str(e)}), 500
# 搜索路由
@app.route('/search')
def search():
"""
处理搜索请求从Elasticsearch中检索匹配的数据
返回:
JSON: 搜索结果列表
"""
keyword = request.args.get('q')
if not keyword:
return jsonify([])
results = search_by_any_field(keyword)
# 处理搜索结果将data字段转换回JSON格式
processed_results = []
for result in results:
if '_source' in result and 'data' in result['_source']:
try:
# 将data字段的字符串转换回JSON
original_data = string_to_json(result['_source']['data'])
# 构造新的结果格式
processed_result = {
'_id': result.get('_id', ''),
'_source': {
'image': result['_source'].get('image', ''),
**original_data # 展开原始数据字段
}
}
processed_results.append(processed_result)
except Exception as e:
# 如果转换失败,保持原始格式
processed_results.append(result)
else:
processed_results.append(result)
print(processed_results)
return jsonify(processed_results)
# 结果页面路由
@app.route('/results')
def results_page():
"""
渲染搜索结果页面
返回:
str: 渲染后的HTML页面
"""
return render_template('results.html')
# 显示所有数据路由
@app.route('/all')
def show_all():
"""
获取所有数据并渲染到页面
返回:
str: 渲染后的HTML页面包含所有数据
"""
all_data = search_all()
# 将data字段从字符串转换回JSON格式以便显示
processed_data = []
for item in all_data:
if 'data' in item and item['data']:
try:
# 将data字段的字符串转换回JSON
original_data = string_to_json(item['data'])
# 合并原始数据和其他字段
display_item = {
'_id': item['_id'],
'image': item.get('image', ''),
**original_data # 展开原始数据字段
}
processed_data.append(display_item)
except Exception as e:
# 如果转换失败,保持原始格式
processed_data.append(item)
else:
processed_data.append(item)
return render_template('all.html', data=processed_data)
# 添加图片路由
@app.route('/image/<filename>')
def serve_image(filename):
"""
提供图片文件服务
参数:
filename (str): 图片文件名
返回:
Response: 图片文件响应
"""
from flask import send_from_directory
return send_from_directory('image', filename)
# 删除数据路由
@app.route('/delete/<doc_id>', methods=['POST'])
def delete_entry(doc_id):
"""
根据文档ID删除数据
参数:
doc_id (str): 要删除的文档ID
返回:
重定向到所有数据页面或错误信息
"""
if delete_by_id(doc_id):
return redirect(url_for('show_all'))
else:
return "删除失败", 500
# 主程序入口
if __name__ == '__main__':
# 创建Elasticsearch索引
create_index_with_mapping()
# 创建图片存储目录
os.makedirs("image", exist_ok=True)
# 启动Flask应用
app.run(use_reloader=False)