1. 실습환경 준비
1-1. 실습에 필요한 라이브러리 import
import warnings
from urllib3.exceptions import InsecureRequestWarning
from urllib3 import disable_warnings
from elasticsearch import Elasticsearch
from langchain_community.embeddings import HuggingFaceEmbeddings
warnings.filterwarnings('ignore')
disable_warnings(InsecureRequestWarning)
1-2. ElasticSearch 연결
# ElasticSearch 연결
es_client=Elasticsearch(hosts=['https://localhost:9200'], basic_auth=['elastic', 'elastic'], verify_certs=False)
es_client.info()
2. Vector 데이터 생성
2-1. Vector Embedding 함수 선언
# Embedding
def emb_func(text) :
model_name = "intfloat/multilingual-e5-large"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': True}
hf = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs
)
return hf.embed_query(text)
2-2. 인덱스 생성
# 인덱스 매핑
setting_mapping = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
},
"mappings": {
"properties": {
"movie_name": {"type": "keyword"}, # 단어 자체를 저장하도록 keyword 타입으로 지정
# keyword 유형은 토큰 단위로 데이터 저장됨
"movie_desc": {"type": "text", "analyzer": "nori"}, # 필드의 타입은 text로 하여 한국어 analyzer인 nori를 사용.문장을 형태소 단위로 분할
# 긴 설명이니까, text 유형이되 nori 한글 형태소 처리기 사용하여 토큰 저장되도록 함
"vector": { # Vectorize 부분
"type": "dense_vector", # Dense vertor
"dims": 1024, # Vector Index 의 차원. 1024차원. 임베딩 모델이 지원하는 차원과 동일하게 설정
"index": "true", # 벡터 인덱스를 생성하여 검색 가능하도록 설정
"similarity": "cosine", # 거리 계산 방식 : Cosine 방식
"index_options": {
"type": "hnsw", #근사 최근접 이웃 탐색을 위한 HNSW 인덱스 사용
"m": 16, #각 노드가 연결할 최대 이웃 수 (그래프 연결 밀도)
"ef_construction": 100 #인덱스 구축 시 탐색 깊이 (값이 클수록 품질을 올라가나, 속도는 느려짐)
}
}
}
}
}
indname = 'movie_index_1'
# 인덱스가 존재하는 경우 삭제
if es_client.indices.exists(index=indname):
es_client.indices.delete(index=indname)
print(f"기존 인덱스 '{indname}' 삭제 완료")
else:
print(f"인덱스 '{indname}' 없음")
# 인덱스 생성
es_client.indices.create(index=indname, body=setting_mapping)
print(f"인덱스 '{indname}' 생성 완료")
# 인덱스 생성 여부 확인
response = es_client.cat.indices(
index=indname,
h=['index'] # 출력하는 열을 선택
)
print(response)
# 인덱스 내 document 개수 조회
response = es_client.count(index=indname)
print(response['count'])
2-3. Document 데이터 생성
# CSV 파일을 읽어서 영화 데이터를 벡터DB에 적재
# movie_desc은 vector 필드에 임베딩하여 적재하도록 ret['vector'] = emb_func(emb_text) 사전 정의한 임베딩 함수 호출
import csv
with open('/home/ec2-user/AI_Project/Document/movie.csv', 'r', encoding='utf-8') as file:
csv_reader = csv.reader(file)
for row in csv_reader:
ret = {}
ret['movie_name'] = row[0]
ret['movie_desc'] = row[1]
emb_text = row[1]
ret['vector'] = emb_func(emb_text)
es_client.index(index=indname, body= ret)
# 인덱스 내 document 개수 조회
response = es_client.count(index=indname)
print(response['count'])
# 인덱스 내 document 확인
response = es_client.search(
index=indname,
size=100 # 최대 10,000개 문서 조회
)
for idx, hit in enumerate(response["hits"]["hits"]):
print(hit["_source"])
3. Vector Search
3-1. Vector Search (Similarity Search-KNN)
# 질문 리스트
qtext='우주를 배경으로 하는 영화 추천해줘'
# qtext='연인과 같이 보면 좋을 영화'
# 벡터 질의(Similarity Search-KNN) 실행
script_query = {
"knn": {
"field": "vector",
"query_vector": emb_func(qtext),
"num_candidates": 30 # 벡터 유사도를 계산할 후보 벡터 수. 수가 크면 정확도가 올라가나 성능을 낮아질수 있음
}
}
response = es_client.search(
index=indname,
query=script_query,
size=10, # 최종 Return 수. 10건만 추출
source_includes=["movie_name", "movie_desc"] # 추출할 필드
)
for idx, hit in enumerate(response["hits"]["hits"]):
print('[' + str(idx) + ':' + str(hit["_score"]) + '] ' + hit["_source"]['movie_name'] + "-" + hit["_source"]['movie_desc'] )
3-2. Hybrid Search (Keyword + Vector) - 가중치 조절 가능
# Hybrid 질의(Keyword+Vector) 함수 작성 (가중치 조절 가능)
def hybrid_search(query_text, query_vector, vector_weight=1.0, keyword_weight=1.0):
search_query = {
"size": 10, ## 최종 Return 수. 10건만 추출
"query": { # query.match 또는 query.multi_match는 bm25 방식의 keyword 검색을 의미한다.
"multi_match": {
"query": query_text,
"fields": ["movie_desc"],
"boost": keyword_weight # 키워드 검색 가중치
}
},
"knn": { # knn 형식은 vector 검색을 의미한다.
"field": "vector",
"query_vector": query_vector,
"k": 50, # 유사도가 높은 상위 대상 중 반환할 개수
"num_candidates": 50, # 벡터 유사도를 계산할 후보 벡터 수
"boost": vector_weight #백터 검색 가중치
}
}
response = es_client.search(index=indname, body=search_query)
return response
# Hybrid 질의 실행
qtext='우주를 배경으로 하는 영화 추천해줘'
query_vector = emb_func(qtext) # 입력 벡터
keyword_weight = 10
vector_weight = 1.0
#keyword_weight = 1.0
#vector_weight = 100.0
#keyword_weight = 1.0
#vector_weight = 5.0
response = hybrid_search(qtext, query_vector, vector_weight, keyword_weight)
for idx, hit in enumerate(response["hits"]["hits"]):
print('[' + str(idx) + ':' + str(hit["_score"]) + '] ' + hit["_source"]['movie_name'] + "-" + hit["_source"]['movie_desc'] )
4. 기타 복잡한 데이터 Case 예시
indname = 'order_index_1'
# 인덱스 매핑 후 인덱스 삭제 및 생성을 함수로 구현
def create_es_index():
setting_mapping = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
},
"mappings": {
"properties": {
"order_id": { "type": "long" },
"customer_id": { "type": "text", "analyzer": "nori" },
"customer_name": { "type": "text", "analyzer": "nori"},
"age": { "type": "keyword"},
"sex": { "type": "keyword" },
"location": { "type": "text", "analyzer": "nori"},
"item_name": { "type": "text", "analyzer": "nori"},
"item_category": { "type": "keyword" },
"total_price": { "type": "keyword"},
"vector": { # Vectorize 부분
"type": "dense_vector",
"dims": 1024, # Vector Index 의 차원
"index": "true",
"similarity": "cosine",
"index_options": {
"type": "hnsw",
"m": 16,
"ef_construction": 100
}
}
}
}
}
# 인덱스가 존재하는 경우 삭제
if es_client.indices.exists(index=indname):
es_client.indices.delete(index=indname)
print(f"기존 인덱스 '{indname}' 삭제 완료")
else:
print(f"인덱스 '{indname}' 없음")
# 인덱스 생성
es_client.indices.create(index=indname, body=setting_mapping)
print(f"인덱스 '{indname}' 생성 완료")
아래와 같이 전체 컬럼들을 조합하여 문맥화 시켜서, vector 컬럼에 업데이트 할 수도 있다.
# vector 데이터로 update
def transform_data_with_vectors():
script_query = {
"query": {
"match_all": {} ## 조건 없이 전체 데이터 요청
},
"size": 1000 ## 1000건 반환
}
response = es_client.search(index=indname, body=script_query)
#print(response['hits']['total'])
for idx, hit in enumerate(response["hits"]["hits"]):
doc_id = hit["_id"]
emb_text = str(hit["_source"]['age']) + "대, 성별은 " + hit["_source"]['sex'] + " " + hit["_source"]['location'] + "거주, " + hit["_source"]['item_category'] + "카테고리 - " + hit["_source"]['item_name'] + " 상품 구매"
update_body = {
"doc" : {
"vector": emb_func(emb_text)
}
}
es_client.update(index=indname, id=doc_id, body=update_body)
참고 자체 실습 시, blog 글 크롤링 코드
import requests
from bs4 import BeautifulSoup
import pandas as pd
results = []
for post_number in range(212, 207, -1):
url = f'https://jatechpedia.tistory.com/{post_number}'
response = requests.get(url)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
try:
title = soup.find('meta', property='og:title')['content']
url = soup.find('meta', property='og:url')['content']
content = soup.find('meta', property='og:description')['content']
created_at = soup.find('meta', property='og:regDate')['content']
# 🔥 문자열 대신 dict로 저장
results.append({
"title": title,
"url": url,
"created_at": created_at,
"content": content
})
except TypeError:
print(f"Post {post_number}: 데이터 없음")
# 🔥 DataFrame으로 변환 (CSV처럼 다룰 수 있음)
blog_data = pd.DataFrame(results)
# 출력
print(blog_data)




최근댓글