ElasticSearch 9.2.0

docker-compose.yml

version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:9.2.0
    container_name: elasticsearch
    environment:
      - "discovery.type=single-node"
      - "xpack.security.enabled=false"
      - "xpack.security.enrollment.enabled=false"
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - "bootstrap.memory_lock=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks:
      - elastic
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:9200 || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3

  kibana:
    image: docker.elastic.co/kibana/kibana:9.2.0
    container_name: kibana
    environment:
      - "ELASTICSEARCH_HOSTS=http://elasticsearch:9200"
      - "XPACK_SECURITY_ENABLED=false"
    ports:
      - "5601:5601"
    depends_on:
      elasticsearch:
        condition: service_healthy
    networks:
      - elastic
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:5601/api/status || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 5

networks:
  elastic:
    driver: bridge

volumes:
  elasticsearch-data:
    driver: local

 

es_client.py

#!/usr/bin/env python3
from elasticsearch import Elasticsearch
from datetime import datetime
import json
import time

class ElasticsearchManager:
    def __init__(self, hosts=["http://localhost:9200"]):
        self.es = Elasticsearch(
            hosts=hosts,
            verify_certs=False,
            request_timeout=30
        )
        self.connected = False
        self.connect()
    
    def connect(self):
        """连接到 Elasticsearch"""
        try:
            if self.es.ping():
                info = self.es.info()
                self.connected = True
                print("✅ 成功连接到 Elasticsearch")
                print(f"  集群名称: {info['cluster_name']}")
                print(f"  版本: {info['version']['number']}")
                return True
            else:
                print("❌ Elasticsearch 未响应")
                return False
        except Exception as e:
            print(f"❌ 连接失败: {e}")
            return False
    
    def wait_for_connection(self, max_retries=10, delay=5):
        """等待 Elasticsearch 服务就绪"""
        print("⏳ 等待 Elasticsearch 服务启动...")
        for i in range(max_retries):
            try:
                if self.es.ping():
                    self.connected = True
                    print("✅ Elasticsearch 已就绪")
                    return True
                else:
                    print(f"  尝试 {i+1}/{max_retries}...")
                    time.sleep(delay)
            except Exception as e:
                print(f"  尝试 {i+1}/{max_retries} 失败: {e}")
                time.sleep(delay)
        
        print("❌ Elasticsearch 启动超时")
        return False
    
    def create_index(self, index_name, mapping=None):
        """创建索引"""
        if not self.connected:
            print("❌ 请先建立连接")
            return False
        
        if self.es.indices.exists(index=index_name):
            print(f"📁 索引 '{index_name}' 已存在")
            return True
        
        # 默认映射
        if mapping is None:
            mapping = {
                "mappings": {
                    "properties": {
                        "title": {"type": "text", "analyzer": "standard"},
                        "content": {"type": "text", "analyzer": "standard"},
                        "author": {"type": "keyword"},
                        "tags": {"type": "keyword"},
                        "views": {"type": "integer"},
                        "created_at": {"type": "date"},
                        "published": {"type": "boolean"}
                    }
                },
                "settings": {
                    "number_of_shards": 1,
                    "number_of_replicas": 0
                }
            }
        
        try:
            self.es.indices.create(index=index_name, body=mapping)
            print(f"✅ 成功创建索引: {index_name}")
            return True
        except Exception as e:
            print(f"❌ 创建索引失败: {e}")
            return False
    
    def list_indices(self):
        """列出所有索引"""
        if not self.connected:
            print("❌ 请先建立连接")
            return {}
        
        try:
            indices = self.es.indices.get_alias(index="*")
            print("📋 现有索引:")
            for index_name in indices:
                print(f"  - {index_name}")
            return indices
        except Exception as e:
            print(f"❌ 获取索引列表失败: {e}")
            return {}
    
    def add_document(self, index_name, document):
        """添加文档"""
        if not self.connected:
            print("❌ 请先建立连接")
            return None
        
        try:
            result = self.es.index(index=index_name, body=document)
            print(f"✅ 文档添加成功, ID: {result['_id']}")
            return result['_id']
        except Exception as e:
            print(f"❌ 添加文档失败: {e}")
            return None
    
    def search_documents(self, index_name, query_body):
        """搜索文档"""
        if not self.connected:
            print("❌ 请先建立连接")
            return []
        
        try:
            result = self.es.search(index=index_name, body=query_body)
            hits = result['hits']['hits']
            total = result['hits']['total']['value']
            
            print(f"🔍 搜索到 {total} 个结果:")
            for hit in hits:
                print(f"  - ID: {hit['_id']}, 分数: {hit['_score']:.2f}")
                print(f"    标题: {hit['_source'].get('title', 'N/A')}")
                print(f"    作者: {hit['_source'].get('author', 'N/A')}")
                print("    " + "-" * 40)
            
            return hits
        except Exception as e:
            print(f"❌ 搜索失败: {e}")
            return []
    
    def demo_operations(self):
        """演示各种操作"""
        if not self.wait_for_connection():
            return
        
        index_name = "demo_articles"
        
        # 1. 创建索引
        self.create_index(index_name)
        
        # 2. 列出索引
        self.list_indices()
        
        # 3. 添加文档
        documents = [
            {
                "title": "Elasticsearch 入门指南",
                "content": "学习如何使用 Elasticsearch 进行全文搜索。",
                "author": "张三",
                "tags": ["教程", "搜索", "入门"],
                "views": 150,
                "created_at": datetime.now().isoformat(),
                "published": True
            },
            {
                "title": "Python 数据处理技巧",
                "content": "使用 Python 处理和分析数据的实用技巧。",
                "author": "李四",
                "tags": ["Python", "数据分析"],
                "views": 200,
                "created_at": datetime.now().isoformat(),
                "published": True
            },
            {
                "title": "Docker 容器化实践",
                "content": "Docker 在微服务架构中的应用实践。",
                "author": "王五",
                "tags": ["Docker", "DevOps", "容器"],
                "views": 180,
                "created_at": datetime.now().isoformat(),
                "published": False
            }
        ]
        
        print("\n📝 添加示例文档:")
        for doc in documents:
            self.add_document(index_name, doc)
        
        # 刷新索引
        self.es.indices.refresh(index=index_name)
        
        # 4. 搜索演示
        print("\n🔍 搜索演示:")
        
        # 简单匹配搜索
        simple_query = {
            "query": {
                "match": {
                    "content": "数据"
                }
            }
        }
        self.search_documents(index_name, simple_query)
        
        # 多字段搜索
        multi_match_query = {
            "query": {
                "multi_match": {
                    "query": "Python 技巧",
                    "fields": ["title", "content"]
                }
            }
        }
        self.search_documents(index_name, multi_match_query)
        
        # 布尔查询
        bool_query = {
            "query": {
                "bool": {
                    "must": [
                        {"term": {"published": True}}
                    ],
                    "should": [
                        {"match": {"content": "搜索"}},
                        {"match": {"content": "数据"}}
                    ]
                }
            }
        }
        self.search_documents(index_name, bool_query)

def main():
    """主函数"""
    print("🚀 Elasticsearch Python 客户端演示")
    print("=" * 50)
    
    # 创建管理器实例
    es_mgr = ElasticsearchManager()
    
    # 运行演示
    es_mgr.demo_operations()
    
    print("\n🎉 演示完成!")
    print("💡 您可以在 Kibana 中查看数据: http://localhost:5601")

if __name__ == "__main__":
    main()