231 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			231 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from elasticsearch import Elasticsearch
 | 
						||
import os
 | 
						||
import json
 | 
						||
import hashlib
 | 
						||
import requests
 | 
						||
import json
 | 
						||
 | 
						||
# Elasticsearch连接配置
 | 
						||
ES_URL = "http://localhost:9200"
 | 
						||
AUTH = None  # 如需认证则改为("用户名","密码")
 | 
						||
 | 
						||
# document=os.open('results/output.json', os.O_RDONLY)
 | 
						||
 | 
						||
# 创建Elasticsearch客户端实例,连接到本地Elasticsearch服务
 | 
						||
es = Elasticsearch(["http://localhost:9200"])
 | 
						||
 | 
						||
# 定义索引名称和类型名称
 | 
						||
index_name = "wordsearch2"
 | 
						||
 | 
						||
def create_index_with_mapping():
 | 
						||
    """修正后的索引映射配置"""
 | 
						||
    # 修正映射结构(移除keyword字段的非法参数)
 | 
						||
    mapping = {
 | 
						||
        "mappings": {
 | 
						||
            "properties": {
 | 
						||
                "id": {
 | 
						||
                    "type": "text",  # 改为text类型支持分词
 | 
						||
                    "analyzer": "ik_max_word",
 | 
						||
                    "search_analyzer": "ik_smart"
 | 
						||
                },
 | 
						||
                "name": {
 | 
						||
                    "type": "text",
 | 
						||
                    "analyzer": "ik_max_word",
 | 
						||
                    "search_analyzer": "ik_smart"
 | 
						||
                },
 | 
						||
                "students": {"type": "keyword"},  # 仅保留type参数
 | 
						||
                "teacher": {"type": "keyword"},    # 仅保留type参数
 | 
						||
                "timestamp": {
 | 
						||
                    "type": "date",
 | 
						||
                    "format": "strict_date_optional_time||epoch_millis"
 | 
						||
                }
 | 
						||
            }
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    # 检查索引是否存在,不存在则创建
 | 
						||
    if not es.indices.exists(index=index_name):
 | 
						||
        es.indices.create(index=index_name, body=mapping)
 | 
						||
        print(f"创建索引 {index_name} 并设置映射")
 | 
						||
    else:
 | 
						||
        print(f"索引 {index_name} 已存在")
 | 
						||
 | 
						||
 | 
						||
 | 
						||
def get_doc_id(data):
 | 
						||
    """
 | 
						||
    根据关键字段生成唯一ID(用于去重)
 | 
						||
    可以根据实际需求调整字段组合
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        data (dict): 包含文档数据的字典
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        str: 基于数据内容生成的MD5哈希值作为唯一ID
 | 
						||
    """
 | 
						||
    # 组合关键字段生成唯一字符串
 | 
						||
    unique_str = f"{data['id']}{data['name']}{data['students']}{data['teacher']}"
 | 
						||
    # 使用MD5哈希生成唯一ID
 | 
						||
    return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
 | 
						||
 | 
						||
 | 
						||
def insert_data(data):
 | 
						||
    """
 | 
						||
    向Elasticsearch插入数据
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        data (dict): 要插入的数据
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        bool: 插入成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    # 生成文档唯一ID
 | 
						||
    return  batch_write_data(data)
 | 
						||
 | 
						||
 | 
						||
def search_data(query):
 | 
						||
    """
 | 
						||
    在Elasticsearch中搜索数据
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        query (str): 搜索关键词
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        list: 包含搜索结果的列表,每个元素是一个文档的源数据
 | 
						||
    """
 | 
						||
    # 执行多字段匹配搜索
 | 
						||
    result = es.search(index=index_name, body={"query": {"multi_match": {"query": query, "fields": ["*"]}}})
 | 
						||
    # 返回搜索结果的源数据部分
 | 
						||
    return [hit["_source"] for hit in result['hits']['hits']]
 | 
						||
 | 
						||
def search_all():
 | 
						||
    """
 | 
						||
    获取所有文档
 | 
						||
    
 | 
						||
    返回:
 | 
						||
        list: 包含所有文档的列表,每个元素包含文档ID和源数据
 | 
						||
    """
 | 
						||
    # 执行匹配所有文档的查询
 | 
						||
    result = es.search(index=index_name, body={"query": {"match_all": {}}})
 | 
						||
    # 返回包含文档ID和源数据的列表
 | 
						||
    return [{
 | 
						||
        "_id": hit["_id"],
 | 
						||
        **hit["_source"]
 | 
						||
    } for hit in result['hits']['hits']]
 | 
						||
 | 
						||
def delete_by_id(doc_id):
 | 
						||
    """
 | 
						||
    根据 doc_id 删除文档
 | 
						||
    
 | 
						||
    参数:
 | 
						||
        doc_id (str): 要删除的文档ID
 | 
						||
        
 | 
						||
    返回:
 | 
						||
        bool: 删除成功返回True,失败返回False
 | 
						||
    """
 | 
						||
    try:
 | 
						||
        # 执行删除操作
 | 
						||
        es.delete(index=index_name, id=doc_id)
 | 
						||
        return True
 | 
						||
    except Exception as e:
 | 
						||
        print("删除失败:", str(e))
 | 
						||
        return False
 | 
						||
 | 
						||
def search_by_any_field(keyword):
 | 
						||
    """全字段模糊搜索(支持拼写错误)"""
 | 
						||
    try:
 | 
						||
        # update_mapping()
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{index_name}/_search",
 | 
						||
            auth=AUTH,
 | 
						||
            json={
 | 
						||
                "query": {
 | 
						||
                    "multi_match": {
 | 
						||
                        "query": keyword,
 | 
						||
                        "fields": ["*"],  # 匹配所有字段
 | 
						||
                        "fuzziness": "AUTO",  # 启用模糊匹配
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        results = response.json()["hits"]["hits"]
 | 
						||
        print(f"\n模糊搜索 '{keyword}' 找到 {len(results)} 条结果:")
 | 
						||
 | 
						||
        for doc in results:
 | 
						||
            print(f"\n文档ID: {doc['_id']}")
 | 
						||
            if '_source' in doc:
 | 
						||
                max_key_len = max(len(k) for k in doc['_source'].keys())
 | 
						||
                for key, value in doc['_source'].items():
 | 
						||
                    # 提取高亮部分
 | 
						||
                    highlight = doc.get('highlight', {}).get(key, [value])[0]
 | 
						||
                    print(f"{key:>{max_key_len + 2}} : {highlight}")
 | 
						||
            else:
 | 
						||
                print("无_source数据")
 | 
						||
 | 
						||
        return results
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"搜索失败: {e.response.text}")
 | 
						||
        return []
 | 
						||
 | 
						||
def batch_write_data(data):
 | 
						||
    """批量写入获奖数据"""
 | 
						||
    try:
 | 
						||
        response = requests.post(
 | 
						||
            f"{ES_URL}/{index_name}/_doc",
 | 
						||
            json=data,
 | 
						||
            auth=AUTH,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        doc_id = response.json()["_id"]
 | 
						||
        print(f"文档写入成功,ID: {doc_id}, 内容: {data}")
 | 
						||
        return True
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"文档写入失败: {e.response.text}, 数据: {data}")
 | 
						||
        return False
 | 
						||
 | 
						||
def update_mapping():
 | 
						||
    # 定义新的映射配置
 | 
						||
    new_mapping = {
 | 
						||
        "properties": {
 | 
						||
            "id": {
 | 
						||
                "type": "text",
 | 
						||
                "analyzer": "ik_max_word",
 | 
						||
                "search_analyzer": "ik_smart"
 | 
						||
            },
 | 
						||
            "name": {
 | 
						||
                "type": "text",
 | 
						||
                "analyzer": "ik_max_word"
 | 
						||
            },
 | 
						||
            "students": {
 | 
						||
                "type": "keyword"
 | 
						||
            },
 | 
						||
            "teacher": {
 | 
						||
                "type": "keyword"
 | 
						||
            }
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    # 执行PUT请求更新映射
 | 
						||
    try:
 | 
						||
        response = requests.put(
 | 
						||
            f"{ES_URL}/{index_name}/_mapping",
 | 
						||
            auth=AUTH,
 | 
						||
            json=new_mapping,
 | 
						||
            headers={"Content-Type": "application/json"}
 | 
						||
        )
 | 
						||
        response.raise_for_status()
 | 
						||
        print("索引映射更新成功")
 | 
						||
        print(response.json())
 | 
						||
 | 
						||
        # 验证映射更新结果
 | 
						||
        verify = requests.get(
 | 
						||
            f"{ES_URL}/{index_name}/_mapping",
 | 
						||
            auth=AUTH
 | 
						||
        )
 | 
						||
        print("\n验证结果:")
 | 
						||
        print(verify.json())
 | 
						||
    except requests.exceptions.HTTPError as e:
 | 
						||
        print(f"请求失败: {e.response.text}")
 |