Files
Achievement_Inputing/ESConnect.py
2025-09-24 19:46:17 +08:00

231 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from elasticsearch import Elasticsearch
import os
import json
import hashlib
import requests
import json
# Elasticsearch连接配置
ES_URL = "http://localhost:9200"
AUTH = None # 如需认证则改为("用户名","密码")
# document=os.open('results/output.json', os.O_RDONLY)
# 创建Elasticsearch客户端实例连接到本地Elasticsearch服务
es = Elasticsearch(["http://localhost:9200"])
# 定义索引名称和类型名称
index_name = "wordsearch2"
def create_index_with_mapping():
"""修正后的索引映射配置"""
# 修正映射结构移除keyword字段的非法参数
mapping = {
"mappings": {
"properties": {
"id": {
"type": "text", # 改为text类型支持分词
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"students": {"type": "keyword"}, # 仅保留type参数
"teacher": {"type": "keyword"}, # 仅保留type参数
"timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
}
}
# 检查索引是否存在,不存在则创建
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body=mapping)
print(f"创建索引 {index_name} 并设置映射")
else:
print(f"索引 {index_name} 已存在")
def get_doc_id(data):
"""
根据关键字段生成唯一ID用于去重
可以根据实际需求调整字段组合
参数:
data (dict): 包含文档数据的字典
返回:
str: 基于数据内容生成的MD5哈希值作为唯一ID
"""
# 组合关键字段生成唯一字符串
unique_str = f"{data['id']}{data['name']}{data['students']}{data['teacher']}"
# 使用MD5哈希生成唯一ID
return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
def insert_data(data):
"""
向Elasticsearch插入数据
参数:
data (dict): 要插入的数据
返回:
bool: 插入成功返回True失败返回False
"""
# 生成文档唯一ID
return batch_write_data(data)
def search_data(query):
"""
在Elasticsearch中搜索数据
参数:
query (str): 搜索关键词
返回:
list: 包含搜索结果的列表,每个元素是一个文档的源数据
"""
# 执行多字段匹配搜索
result = es.search(index=index_name, body={"query": {"multi_match": {"query": query, "fields": ["*"]}}})
# 返回搜索结果的源数据部分
return [hit["_source"] for hit in result['hits']['hits']]
def search_all():
"""
获取所有文档
返回:
list: 包含所有文档的列表每个元素包含文档ID和源数据
"""
# 执行匹配所有文档的查询
result = es.search(index=index_name, body={"query": {"match_all": {}}})
# 返回包含文档ID和源数据的列表
return [{
"_id": hit["_id"],
**hit["_source"]
} for hit in result['hits']['hits']]
def delete_by_id(doc_id):
"""
根据 doc_id 删除文档
参数:
doc_id (str): 要删除的文档ID
返回:
bool: 删除成功返回True失败返回False
"""
try:
# 执行删除操作
es.delete(index=index_name, id=doc_id)
return True
except Exception as e:
print("删除失败:", str(e))
return False
def search_by_any_field(keyword):
"""全字段模糊搜索(支持拼写错误)"""
try:
# update_mapping()
response = requests.post(
f"{ES_URL}/{index_name}/_search",
auth=AUTH,
json={
"query": {
"multi_match": {
"query": keyword,
"fields": ["*"], # 匹配所有字段
"fuzziness": "AUTO", # 启用模糊匹配
}
}
}
)
response.raise_for_status()
results = response.json()["hits"]["hits"]
print(f"\n模糊搜索 '{keyword}' 找到 {len(results)} 条结果:")
for doc in results:
print(f"\n文档ID: {doc['_id']}")
if '_source' in doc:
max_key_len = max(len(k) for k in doc['_source'].keys())
for key, value in doc['_source'].items():
# 提取高亮部分
highlight = doc.get('highlight', {}).get(key, [value])[0]
print(f"{key:>{max_key_len + 2}} : {highlight}")
else:
print("无_source数据")
return results
except requests.exceptions.HTTPError as e:
print(f"搜索失败: {e.response.text}")
return []
def batch_write_data(data):
"""批量写入获奖数据"""
try:
response = requests.post(
f"{ES_URL}/{index_name}/_doc",
json=data,
auth=AUTH,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
doc_id = response.json()["_id"]
print(f"文档写入成功ID: {doc_id}, 内容: {data}")
return True
except requests.exceptions.HTTPError as e:
print(f"文档写入失败: {e.response.text}, 数据: {data}")
return False
def update_mapping():
# 定义新的映射配置
new_mapping = {
"properties": {
"id": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"name": {
"type": "text",
"analyzer": "ik_max_word"
},
"students": {
"type": "keyword"
},
"teacher": {
"type": "keyword"
}
}
}
# 执行PUT请求更新映射
try:
response = requests.put(
f"{ES_URL}/{index_name}/_mapping",
auth=AUTH,
json=new_mapping,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
print("索引映射更新成功")
print(response.json())
# 验证映射更新结果
verify = requests.get(
f"{ES_URL}/{index_name}/_mapping",
auth=AUTH
)
print("\n验证结果:")
print(verify.json())
except requests.exceptions.HTTPError as e:
print(f"请求失败: {e.response.text}")