780 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			780 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from elasticsearch import Elasticsearch
 | 
						||
# import os
 | 
						||
# import json
 | 
						||
import hashlib
 | 
						||
import requests
 | 
						||
import json
 | 
						||
 | 
						||
# Elasticsearch连接配置
 | 
						||
ES_URL = "http://localhost:9200"
 | 
						||
AUTH = None  # 如需认证则改为("用户名","密码")
 | 
						||
 | 
						||
# document=os.open('results/output.json', os.O_RDONLY)
 | 
						||
 | 
						||
# 创建Elasticsearch客户端实例,连接到本地Elasticsearch服务
 | 
						||
es = Elasticsearch(["http://localhost:9200"])
 | 
						||
 | 
						||
# 定义索引名称和类型名称
 | 
						||
data_index_name = "wordsearch266666"
 | 
						||
users_index_name = "users"
 | 
						||
 | 
						||
def create_index_with_mapping():
 | 
						||
    """修正后的索引映射配置"""
 | 
						||
    # 新增一个用户mapping
 | 
						||
    data_mapping = {
 | 
						||
        "mappings": {
 | 
						||
            "properties": {
 | 
						||
                "writer_id":{"type": "text"},
 | 
						||
                "data": {
 | 
						||
                    "type": "text",  # 存储转换后的字符串,支持分词搜索
 | 
						||
                    "analyzer": "ik_max_word",
 | 
						||
                    "search_analyzer": "ik_smart"
 | 
						||
                },
 | 
						||
                "image": {"type": "keyword"},  # 存储图片路径或标识
 | 
						||
            }
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    users_mapping = {
 | 
						||
        "mappings": {
 | 
						||
            "properties": {
 | 
						||
                "user_id":{"type":"long"},      #由系统分配的用户唯一id
 | 
						||
                "username":{"type":"keyword"},  #可修改的用户名
 | 
						||
                "password":{"type":"keyword"},  #密码
 | 
						||
                "premission":{"type":"integer"},#权限组分配(比方说0就是管理员,1是普通用户,以此类推)
 | 
						||
            }
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    # 检查数据索引是否存在,不存在则创建
 | 
						||
    if not es.indices.exists(index=data_index_name):
 | 
						||
        es.indices.create(index=data_index_name, body=data_mapping)
 | 
						||
        print(f"创建索引 {data_index_name} 并设置映射")
 | 
						||
    else:
 | 
						||
        print(f"索引 {data_index_name} 已存在")
 | 
						||
 | 
						||
    # 检查用户索引是否存在,不存在则创建
 | 
						||
    if not es.indices.exists(index=users_index_name):
 | 
						||
        es.indices.create(index=users_index_name, body=users_mapping)
 | 
						||
        print(f"创建索引 {users_index_name} 并设置映射")
 | 
						||
        admin={"user_id":0000000000,"username": "admin", "password": "admin", "premission": 0}
 | 
						||
        write_user_data(admin)
 | 
						||
    else:
 | 
						||
        print(f"索引 {users_index_name} 已存在")
 | 
						||
def update_document(es, index_name, doc_id=None, updated_doc=None):
 | 
						||
    """更新指定ID的文档"""
 | 
						||
    es.update(index=index_name, id=doc_id, body={"doc": updated_doc})
 | 
						||
 | 
						||
 | 
						||
def get_doc_id(data):
 | 
						||
    """
 | 
						||
    根据数据内容生成唯一ID(用于去重)
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        data (dict): 包含文档数据的字典
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        str: 基于数据内容生成的MD5哈希值作为唯一ID
 | 
						||
    """
 | 
						||
    # 使用data字段的内容生成唯一字符串
 | 
						||
    data_str = data.get('data', '')
 | 
						||
    image_str = data.get('image', '')
 | 
						||
    unique_str = f"{data_str}{image_str}"
 | 
						||
    # 使用MD5哈希生成唯一ID
 | 
						||
    return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
 | 
						||
 | 
						||
 | 
						||
def insert_data(data):
 | 
						||
    """
 | 
						||
    向Elasticsearch插入数据
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        data (dict): 要插入的数据
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        bool: 插入成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    # 生成文档唯一ID
 | 
						||
    return  batch_write_data(data)
 | 
						||
 | 
						||
 | 
						||
def search_data(query):
 | 
						||
    """
 | 
						||
    在Elasticsearch中搜索数据
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        query (str): 搜索关键词
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        list: 包含搜索结果的列表,每个元素是一个文档的源数据
 | 
						||
    """
 | 
						||
    # 执行多字段匹配搜索
 | 
						||
    result = es.search(index=data_index_name, body={"query": {"multi_match": {"query": query, "fields": ["*"]}}})
 | 
						||
    # 返回搜索结果的源数据部分
 | 
						||
    return [hit["_source"] for hit in result['hits']['hits']]
 | 
						||
 | 
						||
def search_all():
 | 
						||
    """
 | 
						||
    获取所有文档
 | 
						||
    
 | 
						||
    返回:
 | 
						||
        list: 包含所有文档的列表,每个元素包含文档ID和源数据
 | 
						||
    """
 | 
						||
    # 执行匹配所有文档的查询
 | 
						||
    result = es.search(index=data_index_name, body={"query": {"match_all": {}}})
 | 
						||
    # 返回包含文档ID和源数据的列表
 | 
						||
    return [{
 | 
						||
        "_id": hit["_id"],
 | 
						||
        **hit["_source"]
 | 
						||
    } for hit in result['hits']['hits']]
 | 
						||
 | 
						||
def delete_by_id(doc_id):
 | 
						||
    """
 | 
						||
    根据 doc_id 删除文档
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        doc_id (str): 要删除的文档ID
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        bool: 删除成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 执行删除操作
 | 
						||
        es.delete(index=data_index_name, id=doc_id)
 | 
						||
        return True
 | 
						||
    except Exception as e:
 | 
						||
        print("删除失败:", str(e))
 | 
						||
        return False
 | 
						||
 | 
						||
def update_by_id(doc_id, updated_data):
 | 
						||
    """
 | 
						||
    根据文档ID更新数据
 | 
						||
 | 
						||
    参数:
 | 
						||
        doc_id (str): 要更新的文档ID
 | 
						||
        updated_data (dict): 更新的数据内容
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 更新成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 执行更新操作
 | 
						||
        es.update(index=data_index_name, id=doc_id, body={"doc": updated_data})
 | 
						||
        print(f"文档 {doc_id} 更新成功")
 | 
						||
        return True
 | 
						||
    except Exception as e:
 | 
						||
        print(f"更新失败: {str(e)}")
 | 
						||
        return False
 | 
						||
 | 
						||
def get_by_id(doc_id):
 | 
						||
    """
 | 
						||
    根据文档ID获取单个文档
 | 
						||
 | 
						||
    参数:
 | 
						||
        doc_id (str): 要获取的文档ID
 | 
						||
 | 
						||
    返回:
 | 
						||
        dict or None: 成功返回文档数据,失败返回None
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 执行获取操作
 | 
						||
        result = es.get(index=data_index_name, id=doc_id)
 | 
						||
        if result['found']:
 | 
						||
            return {
 | 
						||
                "_id": result['_id'],
 | 
						||
                **result['_source']
 | 
						||
            }
 | 
						||
        return None
 | 
						||
    except Exception as e:
 | 
						||
        print(f"获取文档失败: {str(e)}")
 | 
						||
        return None
 | 
						||
 | 
						||
def search_by_any_field(keyword):
 | 
						||
    """全字段模糊搜索(支持拼写错误)"""
 | 
						||
    try:
 | 
						||
        # update_data_mapping()
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{data_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "multi_match": {
 | 
						||
                        "query": keyword,
 | 
						||
                        "fields": ["*"],  # 匹配所有字段
 | 
						||
                        "fuzziness": "AUTO",  # 启用模糊匹配
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
        print(f"\n模糊搜索 '{keyword}' 找到 {len(results)} 条结果:")
 | 
						||
 | 
						||
        for doc in results:
 | 
						||
            print(f"\n文档ID: {doc['_id']}")
 | 
						||
            if '_source' in doc:
 | 
						||
                max_key_len = max(len(k) for k in doc['_source'].keys())
 | 
						||
                for key, value in doc['_source'].items():
 | 
						||
                    # 提取高亮部分
 | 
						||
                    highlight = doc.get('highlight', {}).get(key, [value])[0]
 | 
						||
                    print(f"{key:>{max_key_len + 2}} : {highlight}")
 | 
						||
            else:
 | 
						||
                print("无_source数据")
 | 
						||
 | 
						||
        return results
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"搜索失败: {e.response.text}")
 | 
						||
        return []
 | 
						||
 | 
						||
def batch_write_data(data):
 | 
						||
    """批量写入获奖数据"""
 | 
						||
    try:
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{data_index_name}/_doc",
 | 
						||
            json=data,
 | 
						||
            auth=AUTH,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        doc_id = response.json()["_id"]
 | 
						||
        print(f"文档写入成功,ID: {doc_id}, 内容: {data}")
 | 
						||
        return True
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"文档写入失败: {e.response.text}, 数据: {data}")
 | 
						||
        return False
 | 
						||
 | 
						||
def write_user_data(data):
 | 
						||
    """写入用户数据"""
 | 
						||
    try:
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_doc",
 | 
						||
            json=data,
 | 
						||
            auth=AUTH,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        doc_id = response.json()["_id"]
 | 
						||
        print(f"文档写入成功,ID: {doc_id}, 内容: {data}")
 | 
						||
        return True
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"文档写入失败: {e.response.text}, 数据: {data}")
 | 
						||
        return False
 | 
						||
 | 
						||
def verify_user(username, password):
 | 
						||
    """
 | 
						||
    验证用户登录信息
 | 
						||
 | 
						||
    参数:
 | 
						||
        username (str): 用户名
 | 
						||
        password (str): 密码
 | 
						||
 | 
						||
    返回:
 | 
						||
        dict or None: 验证成功返回用户信息,失败返回None
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 搜索用户名匹配的用户
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "term": {
 | 
						||
                        "username": username
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        if results:
 | 
						||
            user_data = results[0]["_source"]
 | 
						||
            # 验证密码
 | 
						||
            if user_data.get("password") == password:
 | 
						||
                print(f"用户 {username} 登录成功")
 | 
						||
                return user_data
 | 
						||
            else:
 | 
						||
                print(f"用户 {username} 密码错误")
 | 
						||
                return None
 | 
						||
        else:
 | 
						||
            print(f"用户 {username} 不存在")
 | 
						||
            return None
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"用户验证失败: {e.response.text}")
 | 
						||
        return None
 | 
						||
 | 
						||
def get_user_by_username(username):
 | 
						||
    """
 | 
						||
    根据用户名查询用户信息
 | 
						||
 | 
						||
    参数:
 | 
						||
        username (str): 用户名
 | 
						||
 | 
						||
    返回:
 | 
						||
        dict or None: 查询成功返回用户信息,失败返回None
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "term": {
 | 
						||
                        "username": username
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        if results:
 | 
						||
            return results[0]["_source"]
 | 
						||
        else:
 | 
						||
            return None
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"用户查询失败: {e.response.text}")
 | 
						||
        return None
 | 
						||
 | 
						||
def create_user(username, password, permission=1):
 | 
						||
    """
 | 
						||
    创建新用户
 | 
						||
 | 
						||
    参数:
 | 
						||
        username (str): 用户名
 | 
						||
        password (str): 密码
 | 
						||
        permission (int): 权限级别,默认为1(普通用户)
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 创建成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    # 检查用户名是否已存在
 | 
						||
    if get_user_by_username(username):
 | 
						||
        print(f"用户名 {username} 已存在")
 | 
						||
        return False
 | 
						||
 | 
						||
    # 生成新的用户ID
 | 
						||
    import time
 | 
						||
    user_id = int(time.time() * 1000)  # 使用时间戳作为用户ID
 | 
						||
 | 
						||
    user_data = {
 | 
						||
        "user_id": user_id,
 | 
						||
        "username": username,
 | 
						||
        "password": password,
 | 
						||
        "premission": permission
 | 
						||
    }
 | 
						||
 | 
						||
    return write_user_data(user_data)
 | 
						||
 | 
						||
def get_all_users():
 | 
						||
    """
 | 
						||
    获取所有用户信息
 | 
						||
 | 
						||
    返回:
 | 
						||
        list: 包含所有用户信息的列表
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "match_all": {}
 | 
						||
                },
 | 
						||
                "size": 1000  # 限制返回数量,可根据需要调整
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        users = []
 | 
						||
        for hit in results:
 | 
						||
            user_data = hit["_source"]
 | 
						||
            user_data["_id"] = hit["_id"]  # 添加文档ID用于后续操作
 | 
						||
            users.append(user_data)
 | 
						||
 | 
						||
        return users
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"获取用户列表失败: {e.response.text}")
 | 
						||
        return []
 | 
						||
 | 
						||
def update_user_password(username, new_password):
 | 
						||
    """
 | 
						||
    更新用户密码
 | 
						||
 | 
						||
    参数:
 | 
						||
        username (str): 用户名
 | 
						||
        new_password (str): 新密码
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 更新成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 先查找用户
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "term": {
 | 
						||
                        "username": username
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        if not results:
 | 
						||
            print(f"用户 {username} 不存在")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 获取用户文档ID
 | 
						||
        doc_id = results[0]["_id"]
 | 
						||
        user_data = results[0]["_source"]
 | 
						||
 | 
						||
        # 更新密码
 | 
						||
        user_data["password"] = new_password
 | 
						||
 | 
						||
        # 更新文档
 | 
						||
        update_response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH,
 | 
						||
            json=user_data,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        update_response.raise_for_status()
 | 
						||
 | 
						||
        print(f"用户 {username} 密码更新成功")
 | 
						||
        return True
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"更新用户密码失败: {e.response.text}")
 | 
						||
        return False
 | 
						||
 | 
						||
def delete_user(username):
 | 
						||
    """
 | 
						||
    删除用户
 | 
						||
 | 
						||
    参数:
 | 
						||
        username (str): 要删除的用户名
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 删除成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 防止删除管理员账户
 | 
						||
        if username == "admin":
 | 
						||
            print("不能删除管理员账户")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 先查找用户
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "term": {
 | 
						||
                        "username": username
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        if not results:
 | 
						||
            print(f"用户 {username} 不存在")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 获取用户文档ID
 | 
						||
        doc_id = results[0]["_id"]
 | 
						||
 | 
						||
        # 删除用户
 | 
						||
        delete_response = requests.delete(
 | 
						||
            f"{ES_URL}/{users_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH
 | 
						||
        )
 | 
						||
        delete_response.raise_for_status()
 | 
						||
 | 
						||
        print(f"用户 {username} 删除成功")
 | 
						||
        return True
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"删除用户失败: {e.response.text}")
 | 
						||
        return False
 | 
						||
 | 
						||
def update_user_permission(username, new_permission):
 | 
						||
    """
 | 
						||
    更新用户权限
 | 
						||
 | 
						||
    参数:
 | 
						||
        username (str): 用户名
 | 
						||
        new_permission (int): 新权限级别
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 更新成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 防止修改管理员权限
 | 
						||
        if username == "admin":
 | 
						||
            print("不能修改管理员权限")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 先查找用户
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "term": {
 | 
						||
                        "username": username
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        if not results:
 | 
						||
            print(f"用户 {username} 不存在")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 获取用户文档ID
 | 
						||
        doc_id = results[0]["_id"]
 | 
						||
        user_data = results[0]["_source"]
 | 
						||
 | 
						||
        # 更新权限
 | 
						||
        user_data["premission"] = new_permission
 | 
						||
 | 
						||
        # 更新文档
 | 
						||
        update_response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH,
 | 
						||
            json=user_data,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        update_response.raise_for_status()
 | 
						||
 | 
						||
        print(f"用户 {username} 权限更新成功")
 | 
						||
        return True
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"更新用户权限失败: {e.response.text}")
 | 
						||
        return False
 | 
						||
 | 
						||
def search_data_by_user(user_id, keyword=None):
 | 
						||
    """
 | 
						||
    根据用户ID查询该用户的数据,支持关键词搜索
 | 
						||
 | 
						||
    参数:
 | 
						||
        user_id (str): 用户ID
 | 
						||
        keyword (str, optional): 搜索关键词
 | 
						||
 | 
						||
    返回:
 | 
						||
        list: 包含文档ID和源数据的列表
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        if keyword:
 | 
						||
            # 带关键词的搜索
 | 
						||
            query = {
 | 
						||
                "bool": {
 | 
						||
                    "must": [
 | 
						||
                        {"term": {"user_id": user_id}},
 | 
						||
                        {
 | 
						||
                            "multi_match": {
 | 
						||
                                "query": keyword,
 | 
						||
                                "fields": ["*"],
 | 
						||
                                "fuzziness": "AUTO"
 | 
						||
                            }
 | 
						||
                        }
 | 
						||
                    ]
 | 
						||
                }
 | 
						||
            }
 | 
						||
        else:
 | 
						||
            # 只按用户ID搜索
 | 
						||
            query = {
 | 
						||
                "term": {"user_id": user_id}
 | 
						||
            }
 | 
						||
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{data_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": query,
 | 
						||
                "size": 1000  # 限制返回数量
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        # 返回包含文档ID和源数据的列表
 | 
						||
        return [{
 | 
						||
            "_id": hit["_id"],
 | 
						||
            **hit["_source"]
 | 
						||
        } for hit in results]
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"查询用户数据失败: {e.response.text}")
 | 
						||
        return []
 | 
						||
 | 
						||
def update_data_by_id(doc_id, updated_data, user_id):
 | 
						||
    """
 | 
						||
    根据文档ID更新数据(仅允许数据所有者修改)
 | 
						||
 | 
						||
    参数:
 | 
						||
        doc_id (str): 文档ID
 | 
						||
        updated_data (dict): 更新的数据
 | 
						||
        user_id (str): 当前用户ID
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 更新成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 先查询文档,验证所有权
 | 
						||
        response = requests.get(
 | 
						||
            f"{ES_URL}/{data_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        doc = response.json()
 | 
						||
 | 
						||
        # 检查文档是否存在
 | 
						||
        if not doc.get("found"):
 | 
						||
            print(f"文档 {doc_id} 不存在")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 检查用户权限(只能修改自己的数据)
 | 
						||
        if doc["_source"].get("user_id") != user_id:
 | 
						||
            print(f"用户 {user_id} 无权修改文档 {doc_id}")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 保持用户ID不变
 | 
						||
        updated_data["user_id"] = user_id
 | 
						||
 | 
						||
        # 更新文档
 | 
						||
        update_response = requests.post(
 | 
						||
            f"{ES_URL}/{data_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH,
 | 
						||
            json=updated_data,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        update_response.raise_for_status()
 | 
						||
 | 
						||
        print(f"文档 {doc_id} 更新成功")
 | 
						||
        return True
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"更新文档失败: {e.response.text}")
 | 
						||
        return False
 | 
						||
 | 
						||
def delete_data_by_id(doc_id, user_id):
 | 
						||
    """
 | 
						||
    根据文档ID删除数据(仅允许数据所有者或管理员删除)
 | 
						||
 | 
						||
    参数:
 | 
						||
        doc_id (str): 文档ID
 | 
						||
        user_id (str): 当前用户ID
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 删除成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 先查询文档,验证所有权
 | 
						||
        response = requests.get(
 | 
						||
            f"{ES_URL}/{data_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        doc = response.json()
 | 
						||
 | 
						||
        # 检查文档是否存在
 | 
						||
        if not doc.get("found"):
 | 
						||
            print(f"文档 {doc_id} 不存在")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 检查用户权限(只能删除自己的数据,管理员可以删除所有数据)
 | 
						||
        doc_user_id = doc["_source"].get("user_id")
 | 
						||
        if doc_user_id != user_id:
 | 
						||
            # 检查是否为管理员
 | 
						||
            user_info = get_user_by_username(user_id)  # 这里需要用户名,稍后会修改
 | 
						||
            if not user_info or user_info.get("premission") != 0:
 | 
						||
                print(f"用户 {user_id} 无权删除文档 {doc_id}")
 | 
						||
                return False
 | 
						||
 | 
						||
        # 删除文档
 | 
						||
        delete_response = requests.delete(
 | 
						||
            f"{ES_URL}/{data_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH
 | 
						||
        )
 | 
						||
        delete_response.raise_for_status()
 | 
						||
 | 
						||
        print(f"文档 {doc_id} 删除成功")
 | 
						||
        return True
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"删除文档失败: {e.response.text}")
 | 
						||
        return False
 | 
						||
 | 
						||
def update_user_own_password(user_id, old_password, new_password):
 | 
						||
    """
 | 
						||
    用户修改自己的密码
 | 
						||
 | 
						||
    参数:
 | 
						||
        user_id (str): 用户ID
 | 
						||
        old_password (str): 旧密码
 | 
						||
        new_password (str): 新密码
 | 
						||
 | 
						||
    返回:
 | 
						||
        bool: 修改成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 先查找用户
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "term": {
 | 
						||
                        "user_id": user_id
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
 | 
						||
        if not results:
 | 
						||
            print(f"用户 {user_id} 不存在")
 | 
						||
            return False
 | 
						||
 | 
						||
        user_data = results[0]["_source"]
 | 
						||
        doc_id = results[0]["_id"]
 | 
						||
 | 
						||
        # 验证旧密码
 | 
						||
        if user_data.get("password") != old_password:
 | 
						||
            print("旧密码错误")
 | 
						||
            return False
 | 
						||
 | 
						||
        # 更新密码
 | 
						||
        user_data["password"] = new_password
 | 
						||
 | 
						||
        # 更新文档
 | 
						||
        update_response = requests.post(
 | 
						||
            f"{ES_URL}/{users_index_name}/_doc/{doc_id}",
 | 
						||
            auth=AUTH,
 | 
						||
            json=user_data,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        update_response.raise_for_status()
 | 
						||
 | 
						||
        print(f"用户 {user_id} 密码修改成功")
 | 
						||
        return True
 | 
						||
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"修改密码失败: {e.response.text}")
 | 
						||
        return False
 |