docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:9.2.0
container_name: elasticsearch
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false"
- "xpack.security.enrollment.enabled=false"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "bootstrap.memory_lock=true"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
networks:
- elastic
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:9200 || exit 1"]
interval: 30s
timeout: 10s
retries: 3
kibana:
image: docker.elastic.co/kibana/kibana:9.2.0
container_name: kibana
environment:
- "ELASTICSEARCH_HOSTS=http://elasticsearch:9200"
- "XPACK_SECURITY_ENABLED=false"
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
networks:
- elastic
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:5601/api/status || exit 1"]
interval: 30s
timeout: 10s
retries: 5
networks:
elastic:
driver: bridge
volumes:
elasticsearch-data:
driver: local
es_client.py
#!/usr/bin/env python3
from elasticsearch import Elasticsearch
from datetime import datetime
import json
import time
class ElasticsearchManager:
def __init__(self, hosts=["http://localhost:9200"]):
self.es = Elasticsearch(
hosts=hosts,
verify_certs=False,
request_timeout=30
)
self.connected = False
self.connect()
def connect(self):
"""连接到 Elasticsearch"""
try:
if self.es.ping():
info = self.es.info()
self.connected = True
print("✅ 成功连接到 Elasticsearch")
print(f" 集群名称: {info['cluster_name']}")
print(f" 版本: {info['version']['number']}")
return True
else:
print("❌ Elasticsearch 未响应")
return False
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
def wait_for_connection(self, max_retries=10, delay=5):
"""等待 Elasticsearch 服务就绪"""
print("⏳ 等待 Elasticsearch 服务启动...")
for i in range(max_retries):
try:
if self.es.ping():
self.connected = True
print("✅ Elasticsearch 已就绪")
return True
else:
print(f" 尝试 {i+1}/{max_retries}...")
time.sleep(delay)
except Exception as e:
print(f" 尝试 {i+1}/{max_retries} 失败: {e}")
time.sleep(delay)
print("❌ Elasticsearch 启动超时")
return False
def create_index(self, index_name, mapping=None):
"""创建索引"""
if not self.connected:
print("❌ 请先建立连接")
return False
if self.es.indices.exists(index=index_name):
print(f"📁 索引 '{index_name}' 已存在")
return True
# 默认映射
if mapping is None:
mapping = {
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "standard"},
"content": {"type": "text", "analyzer": "standard"},
"author": {"type": "keyword"},
"tags": {"type": "keyword"},
"views": {"type": "integer"},
"created_at": {"type": "date"},
"published": {"type": "boolean"}
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
try:
self.es.indices.create(index=index_name, body=mapping)
print(f"✅ 成功创建索引: {index_name}")
return True
except Exception as e:
print(f"❌ 创建索引失败: {e}")
return False
def list_indices(self):
"""列出所有索引"""
if not self.connected:
print("❌ 请先建立连接")
return {}
try:
indices = self.es.indices.get_alias(index="*")
print("📋 现有索引:")
for index_name in indices:
print(f" - {index_name}")
return indices
except Exception as e:
print(f"❌ 获取索引列表失败: {e}")
return {}
def add_document(self, index_name, document):
"""添加文档"""
if not self.connected:
print("❌ 请先建立连接")
return None
try:
result = self.es.index(index=index_name, body=document)
print(f"✅ 文档添加成功, ID: {result['_id']}")
return result['_id']
except Exception as e:
print(f"❌ 添加文档失败: {e}")
return None
def search_documents(self, index_name, query_body):
"""搜索文档"""
if not self.connected:
print("❌ 请先建立连接")
return []
try:
result = self.es.search(index=index_name, body=query_body)
hits = result['hits']['hits']
total = result['hits']['total']['value']
print(f"🔍 搜索到 {total} 个结果:")
for hit in hits:
print(f" - ID: {hit['_id']}, 分数: {hit['_score']:.2f}")
print(f" 标题: {hit['_source'].get('title', 'N/A')}")
print(f" 作者: {hit['_source'].get('author', 'N/A')}")
print(" " + "-" * 40)
return hits
except Exception as e:
print(f"❌ 搜索失败: {e}")
return []
def demo_operations(self):
"""演示各种操作"""
if not self.wait_for_connection():
return
index_name = "demo_articles"
# 1. 创建索引
self.create_index(index_name)
# 2. 列出索引
self.list_indices()
# 3. 添加文档
documents = [
{
"title": "Elasticsearch 入门指南",
"content": "学习如何使用 Elasticsearch 进行全文搜索。",
"author": "张三",
"tags": ["教程", "搜索", "入门"],
"views": 150,
"created_at": datetime.now().isoformat(),
"published": True
},
{
"title": "Python 数据处理技巧",
"content": "使用 Python 处理和分析数据的实用技巧。",
"author": "李四",
"tags": ["Python", "数据分析"],
"views": 200,
"created_at": datetime.now().isoformat(),
"published": True
},
{
"title": "Docker 容器化实践",
"content": "Docker 在微服务架构中的应用实践。",
"author": "王五",
"tags": ["Docker", "DevOps", "容器"],
"views": 180,
"created_at": datetime.now().isoformat(),
"published": False
}
]
print("\n📝 添加示例文档:")
for doc in documents:
self.add_document(index_name, doc)
# 刷新索引
self.es.indices.refresh(index=index_name)
# 4. 搜索演示
print("\n🔍 搜索演示:")
# 简单匹配搜索
simple_query = {
"query": {
"match": {
"content": "数据"
}
}
}
self.search_documents(index_name, simple_query)
# 多字段搜索
multi_match_query = {
"query": {
"multi_match": {
"query": "Python 技巧",
"fields": ["title", "content"]
}
}
}
self.search_documents(index_name, multi_match_query)
# 布尔查询
bool_query = {
"query": {
"bool": {
"must": [
{"term": {"published": True}}
],
"should": [
{"match": {"content": "搜索"}},
{"match": {"content": "数据"}}
]
}
}
}
self.search_documents(index_name, bool_query)
def main():
"""主函数"""
print("🚀 Elasticsearch Python 客户端演示")
print("=" * 50)
# 创建管理器实例
es_mgr = ElasticsearchManager()
# 运行演示
es_mgr.demo_operations()
print("\n🎉 演示完成!")
print("💡 您可以在 Kibana 中查看数据: http://localhost:5601")
if __name__ == "__main__":
main()