from elasticsearch import Elasticsearch # import os # import json import hashlib import requests import json # Elasticsearch连接配置 ES_URL = "http://localhost:9200" AUTH = None # 如需认证则改为("用户名","密码") # document=os.open('results/output.json', os.O_RDONLY) # 创建Elasticsearch客户端实例,连接到本地Elasticsearch服务 es = Elasticsearch(["http://localhost:9200"]) # 定义索引名称和类型名称 data_index_name = "wordsearch266666" users_index_name = "users" def create_index_with_mapping(): """修正后的索引映射配置""" # 新增一个用户mapping data_mapping = { "mappings": { "properties": { "writer_id":{"type": "text"}, "data": { "type": "text", # 存储转换后的字符串,支持分词搜索 "analyzer": "ik_max_word", "search_analyzer": "ik_smart" }, "image": {"type": "keyword"}, # 存储图片路径或标识 } } } users_mapping = { "mappings": { "properties": { "user_id":{"type":"long"}, #由系统分配的用户唯一id "username":{"type":"keyword"}, #可修改的用户名 "password":{"type":"keyword"}, #密码 "premission":{"type":"integer"},#权限组分配(比方说0就是管理员,1是普通用户,以此类推) } } } # 检查数据索引是否存在,不存在则创建 if not es.indices.exists(index=data_index_name): es.indices.create(index=data_index_name, body=data_mapping) print(f"创建索引 {data_index_name} 并设置映射") else: print(f"索引 {data_index_name} 已存在") # 检查用户索引是否存在,不存在则创建 if not es.indices.exists(index=users_index_name): es.indices.create(index=users_index_name, body=users_mapping) print(f"创建索引 {users_index_name} 并设置映射") admin={"user_id":0000000000,"username": "admin", "password": "admin", "premission": 0} write_user_data(admin) else: print(f"索引 {users_index_name} 已存在") def update_document(es, index_name, doc_id=None, updated_doc=None): """更新指定ID的文档""" es.update(index=index_name, id=doc_id, body={"doc": updated_doc}) def get_doc_id(data): """ 根据数据内容生成唯一ID(用于去重) 参数: data (dict): 包含文档数据的字典 返回: str: 基于数据内容生成的MD5哈希值作为唯一ID """ # 使用data字段的内容生成唯一字符串 data_str = data.get('data', '') image_str = data.get('image', '') unique_str = f"{data_str}{image_str}" # 使用MD5哈希生成唯一ID return hashlib.md5(unique_str.encode('utf-8')).hexdigest() def insert_data(data): """ 向Elasticsearch插入数据 参数: data (dict): 要插入的数据 返回: bool: 插入成功返回True,失败返回False """ # 生成文档唯一ID return batch_write_data(data) def search_data(query): """ 在Elasticsearch中搜索数据 参数: query (str): 搜索关键词 返回: list: 包含搜索结果的列表,每个元素是一个文档的源数据 """ # 执行多字段匹配搜索 result = es.search(index=data_index_name, body={"query": {"multi_match": {"query": query, "fields": ["*"]}}}) # 返回搜索结果的源数据部分 return [hit["_source"] for hit in result['hits']['hits']] def search_all(): """ 获取所有文档 返回: list: 包含所有文档的列表,每个元素包含文档ID和源数据 """ # 执行匹配所有文档的查询 result = es.search(index=data_index_name, body={"query": {"match_all": {}}}) # 返回包含文档ID和源数据的列表 return [{ "_id": hit["_id"], **hit["_source"] } for hit in result['hits']['hits']] def delete_by_id(doc_id): """ 根据 doc_id 删除文档 参数: doc_id (str): 要删除的文档ID 返回: bool: 删除成功返回True,失败返回False """ try: # 执行删除操作 es.delete(index=data_index_name, id=doc_id) return True except Exception as e: print("删除失败:", str(e)) return False def update_by_id(doc_id, updated_data): """ 根据文档ID更新数据 参数: doc_id (str): 要更新的文档ID updated_data (dict): 更新的数据内容 返回: bool: 更新成功返回True,失败返回False """ try: # 执行更新操作 es.update(index=data_index_name, id=doc_id, body={"doc": updated_data}) print(f"文档 {doc_id} 更新成功") return True except Exception as e: print(f"更新失败: {str(e)}") return False def get_by_id(doc_id): """ 根据文档ID获取单个文档 参数: doc_id (str): 要获取的文档ID 返回: dict or None: 成功返回文档数据,失败返回None """ try: # 执行获取操作 result = es.get(index=data_index_name, id=doc_id) if result['found']: return { "_id": result['_id'], **result['_source'] } return None except Exception as e: print(f"获取文档失败: {str(e)}") return None def search_by_any_field(keyword): """全字段模糊搜索(支持拼写错误)""" try: # update_data_mapping() response = requests.post( f"{ES_URL}/{data_index_name}/_search", auth=AUTH, json={ "query": { "multi_match": { "query": keyword, "fields": ["*"], # 匹配所有字段 "fuzziness": "AUTO", # 启用模糊匹配 } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] print(f"\n模糊搜索 '{keyword}' 找到 {len(results)} 条结果:") for doc in results: print(f"\n文档ID: {doc['_id']}") if '_source' in doc: max_key_len = max(len(k) for k in doc['_source'].keys()) for key, value in doc['_source'].items(): # 提取高亮部分 highlight = doc.get('highlight', {}).get(key, [value])[0] print(f"{key:>{max_key_len + 2}} : {highlight}") else: print("无_source数据") return results except requests.exceptions.HTTPError as e: print(f"搜索失败: {e.response.text}") return [] def batch_write_data(data): """批量写入获奖数据""" try: response = requests.post( f"{ES_URL}/{data_index_name}/_doc", json=data, auth=AUTH, headers={"Content-Type": "application/json"} ) response.raise_for_status() doc_id = response.json()["_id"] print(f"文档写入成功,ID: {doc_id}, 内容: {data}") return True except requests.exceptions.HTTPError as e: print(f"文档写入失败: {e.response.text}, 数据: {data}") return False def write_user_data(data): """写入用户数据""" try: response = requests.post( f"{ES_URL}/{users_index_name}/_doc", json=data, auth=AUTH, headers={"Content-Type": "application/json"} ) response.raise_for_status() doc_id = response.json()["_id"] print(f"文档写入成功,ID: {doc_id}, 内容: {data}") return True except requests.exceptions.HTTPError as e: print(f"文档写入失败: {e.response.text}, 数据: {data}") return False def verify_user(username, password): """ 验证用户登录信息 参数: username (str): 用户名 password (str): 密码 返回: dict or None: 验证成功返回用户信息,失败返回None """ try: # 搜索用户名匹配的用户 response = requests.post( f"{ES_URL}/{users_index_name}/_search", auth=AUTH, json={ "query": { "term": { "username": username } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] if results: user_data = results[0]["_source"] # 验证密码 if user_data.get("password") == password: print(f"用户 {username} 登录成功") return user_data else: print(f"用户 {username} 密码错误") return None else: print(f"用户 {username} 不存在") return None except requests.exceptions.HTTPError as e: print(f"用户验证失败: {e.response.text}") return None def get_user_by_username(username): """ 根据用户名查询用户信息 参数: username (str): 用户名 返回: dict or None: 查询成功返回用户信息,失败返回None """ try: response = requests.post( f"{ES_URL}/{users_index_name}/_search", auth=AUTH, json={ "query": { "term": { "username": username } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] if results: return results[0]["_source"] else: return None except requests.exceptions.HTTPError as e: print(f"用户查询失败: {e.response.text}") return None def create_user(username, password, permission=1): """ 创建新用户 参数: username (str): 用户名 password (str): 密码 permission (int): 权限级别,默认为1(普通用户) 返回: bool: 创建成功返回True,失败返回False """ # 检查用户名是否已存在 if get_user_by_username(username): print(f"用户名 {username} 已存在") return False # 生成新的用户ID import time user_id = int(time.time() * 1000) # 使用时间戳作为用户ID user_data = { "user_id": user_id, "username": username, "password": password, "premission": permission } return write_user_data(user_data) def get_all_users(): """ 获取所有用户信息 返回: list: 包含所有用户信息的列表 """ try: response = requests.post( f"{ES_URL}/{users_index_name}/_search", auth=AUTH, json={ "query": { "match_all": {} }, "size": 1000 # 限制返回数量,可根据需要调整 } ) response.raise_for_status() results = response.json()["hits"]["hits"] users = [] for hit in results: user_data = hit["_source"] user_data["_id"] = hit["_id"] # 添加文档ID用于后续操作 users.append(user_data) return users except requests.exceptions.HTTPError as e: print(f"获取用户列表失败: {e.response.text}") return [] def update_user_password(username, new_password): """ 更新用户密码 参数: username (str): 用户名 new_password (str): 新密码 返回: bool: 更新成功返回True,失败返回False """ try: # 先查找用户 response = requests.post( f"{ES_URL}/{users_index_name}/_search", auth=AUTH, json={ "query": { "term": { "username": username } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] if not results: print(f"用户 {username} 不存在") return False # 获取用户文档ID doc_id = results[0]["_id"] user_data = results[0]["_source"] # 更新密码 user_data["password"] = new_password # 更新文档 update_response = requests.post( f"{ES_URL}/{users_index_name}/_doc/{doc_id}", auth=AUTH, json=user_data, headers={"Content-Type": "application/json"} ) update_response.raise_for_status() print(f"用户 {username} 密码更新成功") return True except requests.exceptions.HTTPError as e: print(f"更新用户密码失败: {e.response.text}") return False def delete_user(username): """ 删除用户 参数: username (str): 要删除的用户名 返回: bool: 删除成功返回True,失败返回False """ try: # 防止删除管理员账户 if username == "admin": print("不能删除管理员账户") return False # 先查找用户 response = requests.post( f"{ES_URL}/{users_index_name}/_search", auth=AUTH, json={ "query": { "term": { "username": username } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] if not results: print(f"用户 {username} 不存在") return False # 获取用户文档ID doc_id = results[0]["_id"] # 删除用户 delete_response = requests.delete( f"{ES_URL}/{users_index_name}/_doc/{doc_id}", auth=AUTH ) delete_response.raise_for_status() print(f"用户 {username} 删除成功") return True except requests.exceptions.HTTPError as e: print(f"删除用户失败: {e.response.text}") return False def update_user_permission(username, new_permission): """ 更新用户权限 参数: username (str): 用户名 new_permission (int): 新权限级别 返回: bool: 更新成功返回True,失败返回False """ try: # 防止修改管理员权限 if username == "admin": print("不能修改管理员权限") return False # 先查找用户 response = requests.post( f"{ES_URL}/{users_index_name}/_search", auth=AUTH, json={ "query": { "term": { "username": username } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] if not results: print(f"用户 {username} 不存在") return False # 获取用户文档ID doc_id = results[0]["_id"] user_data = results[0]["_source"] # 更新权限 user_data["premission"] = new_permission # 更新文档 update_response = requests.post( f"{ES_URL}/{users_index_name}/_doc/{doc_id}", auth=AUTH, json=user_data, headers={"Content-Type": "application/json"} ) update_response.raise_for_status() print(f"用户 {username} 权限更新成功") return True except requests.exceptions.HTTPError as e: print(f"更新用户权限失败: {e.response.text}") return False def search_data_by_user(user_id, keyword=None): """ 根据用户ID查询该用户的数据,支持关键词搜索 参数: user_id (str): 用户ID keyword (str, optional): 搜索关键词 返回: list: 包含文档ID和源数据的列表 """ try: if keyword: # 带关键词的搜索 query = { "bool": { "must": [ {"term": {"user_id": user_id}}, { "multi_match": { "query": keyword, "fields": ["*"], "fuzziness": "AUTO" } } ] } } else: # 只按用户ID搜索 query = { "term": {"user_id": user_id} } response = requests.post( f"{ES_URL}/{data_index_name}/_search", auth=AUTH, json={ "query": query, "size": 1000 # 限制返回数量 } ) response.raise_for_status() results = response.json()["hits"]["hits"] # 返回包含文档ID和源数据的列表 return [{ "_id": hit["_id"], **hit["_source"] } for hit in results] except requests.exceptions.HTTPError as e: print(f"查询用户数据失败: {e.response.text}") return [] def update_data_by_id(doc_id, updated_data, user_id): """ 根据文档ID更新数据(仅允许数据所有者修改) 参数: doc_id (str): 文档ID updated_data (dict): 更新的数据 user_id (str): 当前用户ID 返回: bool: 更新成功返回True,失败返回False """ try: # 先查询文档,验证所有权 response = requests.get( f"{ES_URL}/{data_index_name}/_doc/{doc_id}", auth=AUTH ) response.raise_for_status() doc = response.json() # 检查文档是否存在 if not doc.get("found"): print(f"文档 {doc_id} 不存在") return False # 检查用户权限(只能修改自己的数据) if doc["_source"].get("user_id") != user_id: print(f"用户 {user_id} 无权修改文档 {doc_id}") return False # 保持用户ID不变 updated_data["user_id"] = user_id # 更新文档 update_response = requests.post( f"{ES_URL}/{data_index_name}/_doc/{doc_id}", auth=AUTH, json=updated_data, headers={"Content-Type": "application/json"} ) update_response.raise_for_status() print(f"文档 {doc_id} 更新成功") return True except requests.exceptions.HTTPError as e: print(f"更新文档失败: {e.response.text}") return False def delete_data_by_id(doc_id, user_id): """ 根据文档ID删除数据(仅允许数据所有者或管理员删除) 参数: doc_id (str): 文档ID user_id (str): 当前用户ID 返回: bool: 删除成功返回True,失败返回False """ try: # 先查询文档,验证所有权 response = requests.get( f"{ES_URL}/{data_index_name}/_doc/{doc_id}", auth=AUTH ) response.raise_for_status() doc = response.json() # 检查文档是否存在 if not doc.get("found"): print(f"文档 {doc_id} 不存在") return False # 检查用户权限(只能删除自己的数据,管理员可以删除所有数据) doc_user_id = doc["_source"].get("user_id") if doc_user_id != user_id: # 检查是否为管理员 user_info = get_user_by_username(user_id) # 这里需要用户名,稍后会修改 if not user_info or user_info.get("premission") != 0: print(f"用户 {user_id} 无权删除文档 {doc_id}") return False # 删除文档 delete_response = requests.delete( f"{ES_URL}/{data_index_name}/_doc/{doc_id}", auth=AUTH ) delete_response.raise_for_status() print(f"文档 {doc_id} 删除成功") return True except requests.exceptions.HTTPError as e: print(f"删除文档失败: {e.response.text}") return False def update_user_own_password(user_id, old_password, new_password): """ 用户修改自己的密码 参数: user_id (str): 用户ID old_password (str): 旧密码 new_password (str): 新密码 返回: bool: 修改成功返回True,失败返回False """ try: # 先查找用户 response = requests.post( f"{ES_URL}/{users_index_name}/_search", auth=AUTH, json={ "query": { "term": { "user_id": user_id } } } ) response.raise_for_status() results = response.json()["hits"]["hits"] if not results: print(f"用户 {user_id} 不存在") return False user_data = results[0]["_source"] doc_id = results[0]["_id"] # 验证旧密码 if user_data.get("password") != old_password: print("旧密码错误") return False # 更新密码 user_data["password"] = new_password # 更新文档 update_response = requests.post( f"{ES_URL}/{users_index_name}/_doc/{doc_id}", auth=AUTH, json=user_data, headers={"Content-Type": "application/json"} ) update_response.raise_for_status() print(f"用户 {user_id} 密码修改成功") return True except requests.exceptions.HTTPError as e: print(f"修改密码失败: {e.response.text}") return False