分类:N05_python
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import json
import time
from datetime import datetime
class Elasticsearch6Importer:
def __init__(self, hosts, http_auth=None):
"""
初始化Elasticsearch 6.7连接
"""
self.es = Elasticsearch(
hosts=hosts,
http_auth=http_auth,
timeout=30,
max_retries=10,
retry_on_timeout=True
)
def test_connection(self):
"""测试连接"""
try:
info = self.es.info()
print(f"成功连接到 Elasticsearch {info['version']['number']}")
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def import_json_file(self, index_name, json_file, id_field=None, batch_size=1000):
"""
从JSON文件导入数据到Elasticsearch
Args:
index_name: 目标索引名称
json_file: JSON文件路径
id_field: 作为文档ID的字段名(可选)
batch_size: 每批导入的文档数
"""
try:
print(f"开始导入数据到索引: {index_name}")
print(f"数据文件: {json_file}")
# 读取JSON文件
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, list):
print("错误: JSON文件应该是一个数组")
return False
total_docs = len(data)
print(f"总共 {total_docs} 个文档需要导入")
# 分批导入
success_count = 0
failed_count = 0
start_time = time.time()
for i in range(0, total_docs, batch_size):
batch = data[i:i + batch_size]
actions = []
for doc in batch:
action = {
"_index": index_name,
"_source": doc
}
# 如果指定了ID字段,使用它作为文档ID
if id_field and id_field in doc:
action["_id"] = str(doc[id_field])
actions.append(action)
# 批量导入
try:
success, failed = bulk(self.es, actions, raise_on_error=False)
success_count += success
failed_count += len(failed) if failed else 0
print(f"批次 {i//batch_size + 1}: 成功 {success} 个, 失败 {failed_count if failed else 0} 个")
# 如果失败太多,打印错误信息
if failed:
for fail in failed:
print(f"失败文档: {fail}")
# 避免请求过快,小暂停
time.sleep(0.1)
except Exception as e:
print(f"批次导入失败: {e}")
failed_count += len(batch)
end_time = time.time()
elapsed = end_time - start_time
print(f"\n导入完成!")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"耗时: {elapsed:.2f} 秒")
print(f"速度: {success_count/elapsed:.2f} 文档/秒")
return success_count
except Exception as e:
print(f"导入失败: {str(e)}")
import traceback
traceback.print_exc()
return 0
def import_ndjson_file(self, index_name, ndjson_file, id_field=None, batch_size=1000):
"""
导入NDJSON文件(每行一个JSON对象)
Args:
index_name: 目标索引名称
ndjson_file: NDJSON文件路径
id_field: 作为文档ID的字段名(可选)
batch_size: 每批导入的文档数
"""
try:
print(f"开始导入NDJSON数据到索引: {index_name}")
print(f"数据文件: {ndjson_file}")
success_count = 0
failed_count = 0
batch_actions = []
start_time = time.time()
line_count = 0
with open(ndjson_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
doc = json.loads(line)
action = {
"_index": index_name,
"_source": doc
}
if id_field and id_field in doc:
action["_id"] = str(doc[id_field])
batch_actions.append(action)
line_count += 1
# 达到批次大小时执行导入
if len(batch_actions) >= batch_size:
success, failed = bulk(self.es, batch_actions, raise_on_error=False)
success_count += success
failed_count += len(failed) if failed else 0
batch_actions = []
if line_num % (batch_size * 10) == 0:
print(f"已处理 {line_num} 行, 成功导入 {success_count} 个文档")
except json.JSONDecodeError as e:
print(f"第 {line_num} 行JSON解析失败: {e}")
failed_count += 1
# 导入最后一批
if batch_actions:
success, failed = bulk(self.es, batch_actions, raise_on_error=False)
success_count += success
failed_count += len(failed) if failed else 0
end_time = time.time()
elapsed = end_time - start_time
print(f"\nNDJSON导入完成!")
print(f"总行数: {line_count}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"耗时: {elapsed:.2f} 秒")
return success_count
except Exception as e:
print(f"NDJSON导入失败: {str(e)}")
import traceback
traceback.print_exc()
return 0
def import_csv_file(self, index_name, csv_file, id_field=None, batch_size=1000, delimiter=','):
"""
从CSV文件导入数据
Args:
index_name: 目标索引名称
csv_file: CSV文件路径
id_field: 作为文档ID的字段名(可选)
batch_size: 每批导入的文档数
delimiter: CSV分隔符
"""
try:
import pandas as pd
print(f"开始导入CSV数据到索引: {index_name}")
print(f"数据文件: {csv_file}")
# 读取CSV文件
df = pd.read_csv(csv_file, delimiter=delimiter, encoding='utf-8')
# 转换NaN为None(以便JSON序列化)
df = df.where(pd.notnull(df), None)
# 转换为字典列表
data = df.to_dict('records')
# 调用JSON导入方法
return self.import_json_file(index_name, data, id_field, batch_size)
except Exception as e:
print(f"CSV导入失败: {str(e)}")
import traceback
traceback.print_exc()
return 0
# 使用示例
if __name__ == "__main__":
# 创建导入器
importer = Elasticsearch6Importer(
hosts=['http://es.wpfnet.cn:80'],
# http_auth=('username', 'password') # 如果需要认证
)
# 测试连接
if importer.test_connection():
# 导入JSON文件
importer.import_json_file(
index_name="my_index",
json_file="H:/qtool.json",
id_field="id", # 使用"id"字段作为文档ID
batch_size=500
)
# 导入NDJSON文件
# importer.import_ndjson_file(
# index_name="logs",
# ndjson_file="logs.ndjson",
# batch_size=1000
# )
# 导入CSV文件
# importer.import_csv_file(
# index_name="users",
# csv_file="users.csv",
# id_field="user_id",
# batch_size=500
# )# test_import.py
from elasticsearch import Elasticsearch
import json
def test_es_connection():
"""测试ES连接和版本"""
es = Elasticsearch(['http://localhost:9200'])
try:
info = es.info()
print(f"Elasticsearch版本: {info['version']['number']}")
print(f"集群名称: {info['cluster_name']}")
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def test_single_document():
"""测试单个文档导入"""
es = Elasticsearch(['http://localhost:9200'])
test_doc = {
"name": "测试文档",
"timestamp": "2024-01-01T12:00:00",
"value": 123.45,
"tags": ["test", "import"]
}
try:
# ES 6.x 需要指定 doc_type
response = es.index(
index="test_index",
doc_type="_doc", # 必须指定类型
body=test_doc
)
print(f"单个文档导入成功: {response}")
return True
except Exception as e:
print(f"单个文档导入失败: {e}")
return False
def test_bulk_import():
"""测试批量导入"""
from ES6BulkImporter import ES6BulkImporter
# 创建测试数据
test_data = [
{"id": 1, "name": "文档1", "category": "A"},
{"id": 2, "name": "文档2", "category": "B"},
{"id": 3, "name": "文档3", "category": "A"},
{"id": 4, "name": "文档4", "category": "C"}
]
# 保存为临时文件
temp_file = "test_data.json"
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(test_data, f, ensure_ascii=False)
# 导入
importer = ES6BulkImporter(['http://es.wpfnet.cn:80/'])
success = importer.import_from_json_file(
index_name="test_bulk_index",
json_file_path=temp_file,
doc_type="test_type",
id_field="id",
batch_size=2
)
print(f"批量导入测试: 成功 {success} 个文档")
# 清理
import os
if os.path.exists(temp_file):
os.remove(temp_file)
return success > 0
if __name__ == "__main__":
print("测试 Elasticsearch 6.7 导入功能")
print("=" * 50)
# 测试1: 连接
if test_es_connection():
print("✅ 连接测试通过")
else:
print("❌ 连接测试失败")
exit(1)
print()
# 测试2: 单个文档
if test_single_document():
print("✅ 单个文档导入测试通过")
else:
print("❌ 单个文档导入测试失败")
print()
# 测试3: 批量导入
# if test_bulk_import():
# print("✅ 批量导入测试通过")
# else:
# print("❌ 批量导入测试失败")
print("\n测试完成!")