1. 실습환경 준비

1-1. 실습에 필요한 라이브러리 import

import warnings
from urllib3.exceptions import InsecureRequestWarning
from urllib3 import disable_warnings
from elasticsearch import Elasticsearch
from langchain_community.embeddings import HuggingFaceEmbeddings
warnings.filterwarnings('ignore')
disable_warnings(InsecureRequestWarning)

 

1-2. ElasticSearch 연결

# ElasticSearch 연결
es_client=Elasticsearch(hosts=['https://localhost:9200'], basic_auth=['elastic', 'elastic'], verify_certs=False)
es_client.info()

2. Vector 데이터 생성

2-1. Vector Embedding 함수 선언

# Embedding
def emb_func(text) :
    model_name = "intfloat/multilingual-e5-large"
    model_kwargs = {'device': 'cpu'}
    encode_kwargs = {'normalize_embeddings': True}
        
    hf = HuggingFaceEmbeddings(
        model_name=model_name,
        model_kwargs=model_kwargs,
        encode_kwargs=encode_kwargs
    )
    
    return hf.embed_query(text)

 

2-2. 인덱스 생성

# 인덱스 매핑
setting_mapping = {
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
   },
  "mappings": {
    "properties": {
        "movie_name": {"type": "keyword"}, # 단어 자체를 저장하도록 keyword 타입으로 지정
                      # keyword 유형은 토큰 단위로 데이터 저장됨
        "movie_desc": {"type": "text", "analyzer": "nori"},  # 필드의 타입은 text로 하여 한국어 analyzer인 nori를 사용.문장을 형태소 단위로 분할
                      # 긴 설명이니까, text 유형이되 nori 한글 형태소 처리기 사용하여 토큰 저장되도록 함
        "vector": { # Vectorize 부분
            "type": "dense_vector",  # Dense vertor
            "dims": 1024,   # Vector Index 의 차원. 1024차원. 임베딩 모델이 지원하는 차원과 동일하게 설정
            "index": "true",  # 벡터 인덱스를 생성하여 검색 가능하도록 설정
            "similarity": "cosine",  # 거리 계산 방식 : Cosine 방식
            "index_options": {   
              "type": "hnsw", #근사 최근접 이웃 탐색을 위한 HNSW 인덱스 사용
              "m": 16,  #각 노드가 연결할 최대 이웃 수 (그래프 연결 밀도)
              "ef_construction": 100  #인덱스 구축 시 탐색 깊이 (값이 클수록 품질을 올라가나, 속도는 느려짐)
            }
        }
    }
  }
}

indname = 'movie_index_1'

# 인덱스가 존재하는 경우 삭제
if es_client.indices.exists(index=indname):
    es_client.indices.delete(index=indname)
    print(f"기존 인덱스 '{indname}' 삭제 완료")
else:
    print(f"인덱스 '{indname}' 없음")

# 인덱스 생성
es_client.indices.create(index=indname, body=setting_mapping)
print(f"인덱스 '{indname}' 생성 완료")

# 인덱스 생성 여부 확인
response = es_client.cat.indices(
    index=indname,
    h=['index'] # 출력하는 열을 선택
)
print(response)

# 인덱스 내 document 개수 조회
response = es_client.count(index=indname)
print(response['count'])

 

2-3. Document 데이터 생성

# CSV 파일을 읽어서 영화 데이터를 벡터DB에 적재
# movie_desc은 vector 필드에 임베딩하여 적재하도록 ret['vector'] = emb_func(emb_text) 사전 정의한 임베딩 함수 호출

import csv

with open('/home/ec2-user/AI_Project/Document/movie.csv', 'r', encoding='utf-8') as file:
    csv_reader = csv.reader(file)
    for row in csv_reader:
        ret = {}
        ret['movie_name'] = row[0] 
        ret['movie_desc'] = row[1] 
        
        emb_text = row[1] 
        ret['vector'] = emb_func(emb_text)
        es_client.index(index=indname, body= ret)
        
        
# 인덱스 내 document 개수 조회
response = es_client.count(index=indname)
print(response['count'])

# 인덱스 내 document 확인
response = es_client.search(
    index=indname,
    size=100 # 최대 10,000개 문서 조회
)
for idx, hit in enumerate(response["hits"]["hits"]):
    print(hit["_source"])

 

3-1. Vector Search (Similarity Search-KNN)

# 질문 리스트
qtext='우주를 배경으로 하는 영화 추천해줘'
# qtext='연인과 같이 보면 좋을 영화'

# 벡터 질의(Similarity Search-KNN) 실행
script_query = {
    "knn": {
      "field": "vector",
      "query_vector": emb_func(qtext),
      "num_candidates": 30  # 벡터 유사도를 계산할 후보 벡터 수. 수가 크면 정확도가 올라가나 성능을 낮아질수 있음
    }
  }
response = es_client.search(
        index=indname,
        query=script_query,
        size=10,  # 최종 Return 수. 10건만 추출
        source_includes=["movie_name", "movie_desc"]  # 추출할 필드
    )

for idx, hit in enumerate(response["hits"]["hits"]):
    print('[' + str(idx) + ':' + str(hit["_score"]) + '] ' +  hit["_source"]['movie_name'] + "-" +  hit["_source"]['movie_desc'] )

 

3-2. Hybrid Search (Keyword + Vector) - 가중치 조절 가능

# Hybrid 질의(Keyword+Vector) 함수 작성 (가중치 조절 가능) 
def hybrid_search(query_text, query_vector, vector_weight=1.0, keyword_weight=1.0):
    search_query = {
        "size": 10,  ## 최종 Return 수. 10건만 추출
        "query": { # query.match 또는 query.multi_match는 bm25 방식의 keyword 검색을 의미한다.
            "multi_match": {
              "query": query_text,
              "fields": ["movie_desc"],
               "boost": keyword_weight # 키워드 검색 가중치
            }
          },
        "knn": { # knn 형식은 vector 검색을 의미한다.
                "field": "vector",
                "query_vector": query_vector,
                "k": 50,    # 유사도가 높은 상위 대상 중 반환할 개수
                "num_candidates": 50,   # 벡터 유사도를 계산할 후보 벡터 수
                "boost": vector_weight #백터 검색 가중치
            }
    }

    
    response = es_client.search(index=indname, body=search_query)
    return response

# Hybrid 질의 실행
qtext='우주를 배경으로 하는 영화 추천해줘'
query_vector = emb_func(qtext) # 입력 벡터
keyword_weight = 10
vector_weight = 1.0

#keyword_weight = 1.0
#vector_weight = 100.0

#keyword_weight = 1.0
#vector_weight = 5.0
   
response = hybrid_search(qtext, query_vector, vector_weight, keyword_weight)
    
for idx, hit in enumerate(response["hits"]["hits"]):
    print('[' + str(idx) + ':' + str(hit["_score"]) + '] ' +  hit["_source"]['movie_name'] + "-" +  hit["_source"]['movie_desc'] )

 

4. 기타 복잡한 데이터 Case 예시

indname = 'order_index_1'

# 인덱스 매핑 후 인덱스 삭제 및 생성을 함수로 구현
def create_es_index():
    setting_mapping = {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
       }, 
      "mappings": {
        "properties": {
            "order_id": { "type": "long" },
            "customer_id": { "type": "text", "analyzer": "nori" },
            "customer_name": { "type": "text", "analyzer": "nori"},
            "age": { "type": "keyword"},
            "sex": { "type": "keyword" },
            "location": { "type": "text", "analyzer": "nori"}, 
            "item_name": { "type": "text", "analyzer": "nori"}, 
            "item_category": { "type": "keyword" },
            "total_price": { "type": "keyword"}, 
            "vector": { # Vectorize 부분
                "type": "dense_vector",
                "dims": 1024, # Vector Index 의 차원
                "index": "true",
                "similarity": "cosine",
                "index_options": {
                  "type": "hnsw",
                  "m": 16, 
                  "ef_construction": 100 
                    }
            }
        }
      }
    }
    
    # 인덱스가 존재하는 경우 삭제
    if es_client.indices.exists(index=indname):
        es_client.indices.delete(index=indname)
        print(f"기존 인덱스 '{indname}' 삭제 완료")
    else:
        print(f"인덱스 '{indname}' 없음")
    
    # 인덱스 생성
    es_client.indices.create(index=indname, body=setting_mapping)
    print(f"인덱스 '{indname}' 생성 완료")

 

아래와 같이 전체 컬럼들을 조합하여 문맥화 시켜서, vector 컬럼에 업데이트 할 수도 있다.

# vector 데이터로 update
def transform_data_with_vectors():
    script_query = {
        "query": {
          "match_all": {}  ## 조건 없이 전체 데이터 요청
        },
        "size": 1000  ## 1000건 반환
      }    
    response = es_client.search(index=indname, body=script_query)
    #print(response['hits']['total'])    
    
    for idx, hit in enumerate(response["hits"]["hits"]):
        doc_id = hit["_id"]
           
        emb_text = str(hit["_source"]['age']) + "대, 성별은 " +  hit["_source"]['sex'] + " " + hit["_source"]['location'] + "거주, " + hit["_source"]['item_category'] + "카테고리 - " + hit["_source"]['item_name'] + " 상품 구매"     
        update_body = {
            "doc" : {
                "vector": emb_func(emb_text)
            }
        }
        
        es_client.update(index=indname, id=doc_id, body=update_body)

 

참고 자체 실습 시, blog 글 크롤링 코드

import requests
from bs4 import BeautifulSoup
import pandas as pd

results = []

for post_number in range(212, 207, -1):
    url = f'https://jatechpedia.tistory.com/{post_number}'
    response = requests.get(url)

    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')

        try:
            title = soup.find('meta', property='og:title')['content']
            url = soup.find('meta', property='og:url')['content']
            content = soup.find('meta', property='og:description')['content']
            created_at = soup.find('meta', property='og:regDate')['content']

            # 🔥 문자열 대신 dict로 저장
            results.append({
                "title": title,
                "url": url,
                "created_at": created_at,
                "content": content
            })

        except TypeError:
            print(f"Post {post_number}: 데이터 없음")

# 🔥 DataFrame으로 변환 (CSV처럼 다룰 수 있음)
blog_data = pd.DataFrame(results)

# 출력
print(blog_data)
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기