Elasticsearch_docker_数据_导入导出数据模版

分类:N05_python

标签:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import json
import time
from datetime import datetime

class Elasticsearch6Importer:
    def __init__(self, hosts, http_auth=None):
        """
        初始化Elasticsearch 6.7连接
        """
        self.es = Elasticsearch(
            hosts=hosts,
            http_auth=http_auth,
            timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
    
    def test_connection(self):
        """测试连接"""
        try:
            info = self.es.info()
            print(f"成功连接到 Elasticsearch {info['version']['number']}")
            return True
        except Exception as e:
            print(f"连接失败: {e}")
            return False
    
    def import_json_file(self, index_name, json_file, id_field=None, batch_size=1000):
        """
        从JSON文件导入数据到Elasticsearch
        
        Args:
            index_name: 目标索引名称
            json_file: JSON文件路径
            id_field: 作为文档ID的字段名(可选)
            batch_size: 每批导入的文档数
        """
        try:
            print(f"开始导入数据到索引: {index_name}")
            print(f"数据文件: {json_file}")
            
            # 读取JSON文件
            with open(json_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            if not isinstance(data, list):
                print("错误: JSON文件应该是一个数组")
                return False
            
            total_docs = len(data)
            print(f"总共 {total_docs} 个文档需要导入")
            
            # 分批导入
            success_count = 0
            failed_count = 0
            start_time = time.time()
            
            for i in range(0, total_docs, batch_size):
                batch = data[i:i + batch_size]
                actions = []
                
                for doc in batch:
                    action = {
                        "_index": index_name,
                        "_source": doc
                    }
                    
                    # 如果指定了ID字段,使用它作为文档ID
                    if id_field and id_field in doc:
                        action["_id"] = str(doc[id_field])
                    
                    actions.append(action)
                
                # 批量导入
                try:
                    success, failed = bulk(self.es, actions, raise_on_error=False)
                    success_count += success
                    failed_count += len(failed) if failed else 0
                    
                    print(f"批次 {i//batch_size + 1}: 成功 {success} 个, 失败 {failed_count if failed else 0} 个")
                    
                    # 如果失败太多,打印错误信息
                    if failed:
                        for fail in failed:
                            print(f"失败文档: {fail}")
                    
                    # 避免请求过快,小暂停
                    time.sleep(0.1)
                    
                except Exception as e:
                    print(f"批次导入失败: {e}")
                    failed_count += len(batch)
            
            end_time = time.time()
            elapsed = end_time - start_time
            
            print(f"\n导入完成!")
            print(f"成功: {success_count}")
            print(f"失败: {failed_count}")
            print(f"耗时: {elapsed:.2f} 秒")
            print(f"速度: {success_count/elapsed:.2f} 文档/秒")
            
            return success_count
            
        except Exception as e:
            print(f"导入失败: {str(e)}")
            import traceback
            traceback.print_exc()
            return 0
    
    def import_ndjson_file(self, index_name, ndjson_file, id_field=None, batch_size=1000):
        """
        导入NDJSON文件(每行一个JSON对象)
        
        Args:
            index_name: 目标索引名称
            ndjson_file: NDJSON文件路径
            id_field: 作为文档ID的字段名(可选)
            batch_size: 每批导入的文档数
        """
        try:
            print(f"开始导入NDJSON数据到索引: {index_name}")
            print(f"数据文件: {ndjson_file}")
            
            success_count = 0
            failed_count = 0
            batch_actions = []
            start_time = time.time()
            line_count = 0
            
            with open(ndjson_file, 'r', encoding='utf-8') as f:
                for line_num, line in enumerate(f, 1):
                    line = line.strip()
                    if not line:
                        continue
                    
                    try:
                        doc = json.loads(line)
                        action = {
                            "_index": index_name,
                            "_source": doc
                        }
                        
                        if id_field and id_field in doc:
                            action["_id"] = str(doc[id_field])
                        
                        batch_actions.append(action)
                        line_count += 1
                        
                        # 达到批次大小时执行导入
                        if len(batch_actions) >= batch_size:
                            success, failed = bulk(self.es, batch_actions, raise_on_error=False)
                            success_count += success
                            failed_count += len(failed) if failed else 0
                            batch_actions = []
                            
                            if line_num % (batch_size * 10) == 0:
                                print(f"已处理 {line_num} 行, 成功导入 {success_count} 个文档")
                    
                    except json.JSONDecodeError as e:
                        print(f"第 {line_num} 行JSON解析失败: {e}")
                        failed_count += 1
            
            # 导入最后一批
            if batch_actions:
                success, failed = bulk(self.es, batch_actions, raise_on_error=False)
                success_count += success
                failed_count += len(failed) if failed else 0
            
            end_time = time.time()
            elapsed = end_time - start_time
            
            print(f"\nNDJSON导入完成!")
            print(f"总行数: {line_count}")
            print(f"成功: {success_count}")
            print(f"失败: {failed_count}")
            print(f"耗时: {elapsed:.2f} 秒")
            
            return success_count
            
        except Exception as e:
            print(f"NDJSON导入失败: {str(e)}")
            import traceback
            traceback.print_exc()
            return 0
    
    def import_csv_file(self, index_name, csv_file, id_field=None, batch_size=1000, delimiter=','):
        """
        从CSV文件导入数据
        
        Args:
            index_name: 目标索引名称
            csv_file: CSV文件路径
            id_field: 作为文档ID的字段名(可选)
            batch_size: 每批导入的文档数
            delimiter: CSV分隔符
        """
        try:
            import pandas as pd
            print(f"开始导入CSV数据到索引: {index_name}")
            print(f"数据文件: {csv_file}")
            
            # 读取CSV文件
            df = pd.read_csv(csv_file, delimiter=delimiter, encoding='utf-8')
            
            # 转换NaN为None(以便JSON序列化)
            df = df.where(pd.notnull(df), None)
            
            # 转换为字典列表
            data = df.to_dict('records')
            
            # 调用JSON导入方法
            return self.import_json_file(index_name, data, id_field, batch_size)
            
        except Exception as e:
            print(f"CSV导入失败: {str(e)}")
            import traceback
            traceback.print_exc()
            return 0

# 使用示例
if __name__ == "__main__":
    # 创建导入器
    importer = Elasticsearch6Importer(
        hosts=['http://es.wpfnet.cn:80'],
        # http_auth=('username', 'password')  # 如果需要认证
    )
    
    # 测试连接
    if importer.test_connection():
        # 导入JSON文件
        importer.import_json_file(
            index_name="my_index",
            json_file="H:/qtool.json",
            id_field="id",  # 使用"id"字段作为文档ID
            batch_size=500
        )
        
        # 导入NDJSON文件
        # importer.import_ndjson_file(
        #     index_name="logs",
        #     ndjson_file="logs.ndjson",
        #     batch_size=1000
        # )
        
        # 导入CSV文件
        # importer.import_csv_file(
        #     index_name="users",
        #     csv_file="users.csv",
        #     id_field="user_id",
        #     batch_size=500
        # )
# test_import.py
from elasticsearch import Elasticsearch
import json

def test_es_connection():
    """测试ES连接和版本"""
    es = Elasticsearch(['http://localhost:9200'])
    
    try:
        info = es.info()
        print(f"Elasticsearch版本: {info['version']['number']}")
        print(f"集群名称: {info['cluster_name']}")
        return True
    except Exception as e:
        print(f"连接失败: {e}")
        return False

def test_single_document():
    """测试单个文档导入"""
    es = Elasticsearch(['http://localhost:9200'])
    
    test_doc = {
        "name": "测试文档",
        "timestamp": "2024-01-01T12:00:00",
        "value": 123.45,
        "tags": ["test", "import"]
    }
    
    try:
        # ES 6.x 需要指定 doc_type
        response = es.index(
            index="test_index",
            doc_type="_doc",  # 必须指定类型
            body=test_doc
        )
        print(f"单个文档导入成功: {response}")
        return True
    except Exception as e:
        print(f"单个文档导入失败: {e}")
        return False

def test_bulk_import():
    """测试批量导入"""
    from ES6BulkImporter import ES6BulkImporter
    
    # 创建测试数据
    test_data = [
        {"id": 1, "name": "文档1", "category": "A"},
        {"id": 2, "name": "文档2", "category": "B"},
        {"id": 3, "name": "文档3", "category": "A"},
        {"id": 4, "name": "文档4", "category": "C"}
    ]
    
    # 保存为临时文件
    temp_file = "test_data.json"
    with open(temp_file, 'w', encoding='utf-8') as f:
        json.dump(test_data, f, ensure_ascii=False)
    
    # 导入
    importer = ES6BulkImporter(['http://es.wpfnet.cn:80/'])
    
    success = importer.import_from_json_file(
        index_name="test_bulk_index",
        json_file_path=temp_file,
        doc_type="test_type",
        id_field="id",
        batch_size=2
    )
    
    print(f"批量导入测试: 成功 {success} 个文档")
    
    # 清理
    import os
    if os.path.exists(temp_file):
        os.remove(temp_file)
    
    return success > 0

if __name__ == "__main__":
    print("测试 Elasticsearch 6.7 导入功能")
    print("=" * 50)
    
    # 测试1: 连接
    if test_es_connection():
        print("✅ 连接测试通过")
    else:
        print("❌ 连接测试失败")
        exit(1)
    
    print()
    
    # 测试2: 单个文档
    if test_single_document():
        print("✅ 单个文档导入测试通过")
    else:
        print("❌ 单个文档导入测试失败")
    
    print()
    
    # 测试3: 批量导入
    # if test_bulk_import():
    #     print("✅ 批量导入测试通过")
    # else:
    #     print("❌ 批量导入测试失败")
    
    print("\n测试完成!")


修改内容