from elasticsearch import Elasticsearch import os import json import hashlib import requests import json # Elasticsearch连接配置 ES_URL = "http://localhost:9200" AUTH = None # 如需认证则改为("用户名","密码") # document=os.open('results/output.json', os.O_RDONLY) # 创建Elasticsearch客户端实例,连接到本地Elasticsearch服务 es = Elasticsearch(["http://localhost:9200"]) # 定义索引名称和类型名称 index_name = "wordsearch2666" def create_index_with_mapping(): """修正后的索引映射配置""" # 修正映射结构(移除keyword字段的非法参数) mapping = { "mappings": { "properties": { "data": { "type": "text", # 存储转换后的字符串,支持分词搜索 "analyzer": "ik_max_word", "search_analyzer": "ik_smart" }, "image": {"type": "keyword"}, # 存储图片路径或标识 } } } # 检查索引是否存在,不存在则创建 if not es.indices.exists(index=index_name): es.indices.create(index=index_name, body=mapping) print(f"创建索引 {index_name} 并设置映射") else: print(f"索引 {index_name} 已存在") def update_document(es, index_name, doc_id=None, updated_doc=None): """更新指定ID的文档""" es.update(index=index_name, id=doc_id, body={"doc": updated_doc}) def get_doc_id(data): """ 根据数据内容生成唯一ID(用于去重) 参数: data (dict): 包含文档数据的字典 返回: str: 基于数据内容生成的MD5哈希值作为唯一ID """ # 使用data字段的内容生成唯一字符串 data_str = data.get('data', '') image_str = data.get('image', '') unique_str = f"{data_str}{image_str}" # 使用MD5哈希生成唯一ID return hashlib.md5(unique_str.encode('utf-8')).hexdigest() def insert_data(data): """ 向Elasticsearch插入数据 参数: data (dict): 要插入的数据 返回: bool: 插入成功返回True,失败返回False """ # 生成文档唯一ID return batch_write_data(data) def search_data(query): """ 在Elasticsearch中搜索数据 参数: query (str): 搜索关键词 返回: list: 包含搜索结果的列表,每个元素是一个文档的源数据 """ # 执行多字段匹配搜索 result = es.search(index=index_name, body={"query": {"multi_match": {"query": query, "fields": ["*"]}}}) # 返回搜索结果的源数据部分 return [hit["_source"] for hit in result['hits']['hits']] def search_all(): """ 获取所有文档 返回: list: 包含所有文档的列表,每个元素包含文档ID和源数据 """ # 执行匹配所有文档的查询 result = es.search(index=index_name, body={"query": {"match_all": {}}}) # 返回包含文档ID和源数据的列表 return [{ "_id": hit["_id"], **hit["_source"] } for hit in result['hits']['hits']] def delete_by_id(doc_id): """ 根据 doc_id 删除文档 参数: doc_id (str): 要删除的文档ID 返回: bool: 删除成功返回True,失败返回False """ try: # 执行删除操作 es.delete(index=index_name, id=doc_id) return True except Exception as e: print("删除失败:", str(e)) return False def update_by_id(doc_id, updated_data): """ 根据文档ID更新数据 参数: doc_id (str): 要更新的文档ID updated_data (dict): 更新的数据内容 返回: bool: 更新成功返回True,失败返回False """ try: # 执行更新操作 es.update(index=index_name, id=doc_id, body={"doc": updated_data}) print(f"文档 {doc_id} 更新成功") return True except Exception as e: print(f"更新失败: {str(e)}") return False def get_by_id(doc_id): """ 根据文档ID获取单个文档 参数: doc_id (str): 要获取的文档ID 返回: dict or None: 成功返回文档数据,失败返回None """ try: # 执行获取操作 result = es.get(index=index_name, id=doc_id) if result['found']: return { "_id": result['_id'], **result['_source'] } return None except Exception as e: print(f"获取文档失败: {str(e)}") return None def search_by_any_field(keyword): """全字段模糊搜索(支持拼写错误)""" try: # update_mapping() response = requests.post( f"{ES_URL}/{index_name}/_search", auth=AUTH, json={ "query": { "multi_match": { "query": keyword, "fields": ["*"], # 匹配所有字段 "fuzziness": "AUTO", # 启用模糊匹配 } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] print(f"\n模糊搜索 '{keyword}' 找到 {len(results)} 条结果:") for doc in results: print(f"\n文档ID: {doc['_id']}") if '_source' in doc: max_key_len = max(len(k) for k in doc['_source'].keys()) for key, value in doc['_source'].items(): # 提取高亮部分 highlight = doc.get('highlight', {}).get(key, [value])[0] print(f"{key:>{max_key_len + 2}} : {highlight}") else: print("无_source数据") return results except requests.exceptions.HTTPError as e: print(f"搜索失败: {e.response.text}") return [] def batch_write_data(data): """批量写入获奖数据""" try: response = requests.post( f"{ES_URL}/{index_name}/_doc", json=data, auth=AUTH, headers={"Content-Type": "application/json"} ) response.raise_for_status() doc_id = response.json()["_id"] print(f"文档写入成功,ID: {doc_id}, 内容: {data}") return True except requests.exceptions.HTTPError as e: print(f"文档写入失败: {e.response.text}, 数据: {data}") return False