AI 시맨틱 캐싱으로 LLM 비용 최적화하기 - 당근마켓 성공 사례로 보는 실전 적용법

당근마켓의 AI 시맨틱 캐싱 적용 사례를 통해 LLM 비용을 80% 절감하는 방법과 실제 구현 전략을 알아보자.

들어가며

최근 당근마켓이 AI 시맨틱 캐싱(Semantic Caching) 기술을 도입하여 LLM 운영 비용을 80% 이상 절감했다는 소식이 개발자 커뮤니티에서 큰 화제가 되었다. ChatGPT, Claude와 같은 대형 언어 모델(LLM)이 일반화되면서 많은 기업들이 AI 서비스를 구축하고 있지만, 동시에 높은 API 비용 문제에 직면하고 있다. 시맨틱 캐싱은 이러한 문제를 해결할 수 있는 핵심 기술로 주목받고 있다. 본 글에서는 AI 시맨틱 캐싱의 개념부터 실제 구현 방법까지 상세히 알아보겠다.

AI 시맨틱 캐싱이란?

전통적인 캐싱과의 차이점

전통적인 캐싱은 정확히 동일한 쿼리(Exact Match)에 대해서만 캐시된 결과를 반환한다. 예를 들어, “오늘 날씨는 어때요?”라는 질문에 대한 답변은 캐시되지만, “오늘 날씨가 어떤가요?”라는 의미적으로 동일한 질문에 대해서는 새로운 API 호출이 발생한다.

반면 시맨틱 캐싱은 의미적으로 유사한 질문들을 벡터 임베딩(Vector Embedding)을 통해 분석하고, 설정된 유사도 임계값 이상의 질문들에 대해서는 캐시된 결과를 반환한다. 이를 통해 훨씬 더 효율적인 캐싱이 가능하다.

# 전통적 캐싱
cache = {
    "오늘 날씨는 어때요?": "오늘은 맑고 기온이 25도입니다."
}

# 시맨틱 캐싱 (의사코드)
semantic_cache = {
    vector_embedding("오늘 날씨는 어때요?"): "오늘은 맑고 기온이 25도입니다."
}
# "오늘 날씨가 어떤가요?"도 유사도 0.95로 판단되어 캐시 히트

시맨틱 캐싱의 작동 원리

시맨틱 캐싱은 다음과 같은 단계로 작동한다:

  1. 임베딩 생성: 사용자 질문을 벡터 임베딩으로 변환
  2. 유사도 계산: 기존 캐시된 질문들과의 코사인 유사도 계산
  3. 임계값 비교: 설정된 임계값(보통 0.85~0.95) 이상의 유사도를 가진 질문 검색
  4. 결과 반환: 유사한 질문이 있으면 캐시된 답변 반환, 없으면 LLM API 호출

당근마켓의 시맨틱 캐싱 적용 사례

도입 배경과 과제

당근마켓은 사용자들의 다양한 문의사항을 처리하기 위해 AI 챗봇을 도입했지만, 월 LLM API 비용이 급격히 증가하는 문제에 직면했다. 특히 다음과 같은 특징들이 비용 증가의 주요 원인이었다:

구현 전략과 기술 스택

당근마켓은 다음과 같은 기술 스택으로 시맨틱 캐싱을 구현했다:

# 당근마켓 시맨틱 캐싱 구현 예시 (추정)
import openai
import redis
import numpy as np
from sentence_transformers import SentenceTransformer

class SemanticCache:
    def __init__(self, similarity_threshold=0.9):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
        self.similarity_threshold = similarity_threshold
    
    def get_embedding(self, text):
        return self.model.encode(text)
    
    def cosine_similarity(self, vec1, vec2):
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    
    def search_similar_query(self, query_embedding):
        # Redis에서 모든 캐시된 임베딩 조회
        cached_embeddings = self.redis_client.hgetall("embeddings")
        
        for key, cached_embedding in cached_embeddings.items():
            cached_vec = np.frombuffer(cached_embedding, dtype=np.float32)
            similarity = self.cosine_similarity(query_embedding, cached_vec)
            
            if similarity >= self.similarity_threshold:
                return key.decode('utf-8')
        
        return None
    
    def get_cached_response(self, query):
        query_embedding = self.get_embedding(query)
        similar_key = self.search_similar_query(query_embedding)
        
        if similar_key:
            return self.redis_client.hget("responses", similar_key)
        
        return None
    
    def cache_response(self, query, response):
        query_embedding = self.get_embedding(query)
        
        # 임베딩과 응답 저장
        self.redis_client.hset("embeddings", query, query_embedding.tobytes())
        self.redis_client.hset("responses", query, response)

성과와 최적화 결과

당근마켓의 시맨틱 캐싱 도입 결과:

시맨틱 캐싱 구현 방법

1단계: 임베딩 모델 선택

시맨틱 캐싱의 성능은 사용하는 임베딩 모델에 크게 의존한다. 다음과 같은 옵션들을 고려할 수 있다:

경량 모델 (빠른 처리 속도):

고성능 모델 (높은 정확도):

# 임베딩 모델 비교 예시
from sentence_transformers import SentenceTransformer

# 경량 모델
model_light = SentenceTransformer('all-MiniLM-L6-v2')

# 고성능 모델
model_heavy = SentenceTransformer('all-mpnet-base-v2')

# 성능 테스트
queries = [
    "배송비는 얼마인가요?",
    "배송비가 어떻게 되나요?",
    "배송 비용이 궁금합니다"
]

for query in queries:
    embedding_light = model_light.encode(query)
    embedding_heavy = model_heavy.encode(query)
    print(f"Query: {query}")
    print(f"Light model dimension: {len(embedding_light)}")
    print(f"Heavy model dimension: {len(embedding_heavy)}")

2단계: 벡터 데이터베이스 구축

시맨틱 캐싱을 위해서는 벡터 검색을 효율적으로 수행할 수 있는 데이터베이스가 필요하다. 주요 옵션들:

Redis with RediSearch:

import redis
from redis.commands.search.field import VectorField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

# Redis 벡터 인덱스 생성
r = redis.Redis(host='localhost', port=6379, db=0)

schema = [
    VectorField(
        "embedding",
        "FLAT",
        {
            "TYPE": "FLOAT32",
            "DIM": 384,  # 임베딩 차원
            "DISTANCE_METRIC": "COSINE"
        }
    )
]

r.ft("semantic_cache_idx").create_index(
    schema,
    definition=IndexDefinition(prefix=["cache:"], index_type=IndexType.HASH)
)

Pinecone (관리형 서비스):

import pinecone

pinecone.init(api_key="your-api-key", environment="us-east1-gcp")

# 인덱스 생성
index = pinecone.Index("semantic-cache")

# 벡터 저장
index.upsert(
    vectors=[
        {
            "id": "query1",
            "values": embedding_vector,
            "metadata": {"query": "배송비는 얼마인가요?", "response": "배송비는 3,000원입니다."}
        }
    ]
)

3단계: 캐시 히트율 최적화

캐시 히트율을 높이기 위한 전략들:

1. 적절한 유사도 임계값 설정:

def optimize_threshold(test_queries, ground_truth):
    thresholds = [0.8, 0.85, 0.9, 0.95]
    best_threshold = 0.9
    best_score = 0
    
    for threshold in thresholds:
        hit_rate = calculate_hit_rate(test_queries, threshold)
        accuracy = calculate_accuracy(test_queries, ground_truth, threshold)
        
        # F1 스코어로 최적 임계값 결정
        f1_score = 2 * (hit_rate * accuracy) / (hit_rate + accuracy)
        
        if f1_score > best_score:
            best_score = f1_score
            best_threshold = threshold
    
    return best_threshold

2. 질문 전처리 및 정규화:

import re

def preprocess_query(query):
    # 불필요한 문자 제거
    query = re.sub(r'[^\w\s]', '', query)
    
    # 소문자 변환
    query = query.lower()
    
    # 공백 정규화
    query = re.sub(r'\s+', ' ', query).strip()
    
    return query

고급 최적화 기법

계층적 캐싱 (Hierarchical Caching)

서로 다른 임계값을 가진 여러 캐시 레이어를 구성하여 정확도와 성능을 모두 최적화하는 방법:

class HierarchicalSemanticCache:
    def __init__(self):
        self.exact_cache = {}  # 정확 매칭 캐시
        self.high_similarity_cache = SemanticCache(threshold=0.95)  # 높은 유사도
        self.medium_similarity_cache = SemanticCache(threshold=0.85)  # 중간 유사도
    
    def get_response(self, query):
        # 1단계: 정확 매칭 확인
        if query in self.exact_cache:
            return self.exact_cache[query]
        
        # 2단계: 높은 유사도 확인
        response = self.high_similarity_cache.get_cached_response(query)
        if response:
            return response
        
        # 3단계: 중간 유사도 확인
        response = self.medium_similarity_cache.get_cached_response(query)
        if response:
            return response
        
        # 4단계: LLM API 호출
        return self.call_llm_api(query)

동적 임계값 조정

사용 패턴에 따라 임계값을 동적으로 조정하는 방법:

class AdaptiveSemanticCache:
    def __init__(self):
        self.base_threshold = 0.9
        self.false_positive_rate = 0.0
        self.cache_hit_rate = 0.0
        self.adjustment_factor = 0.01
    
    def adjust_threshold(self):
        # False Positive가 높으면 임계값 증가
        if self.false_positive_rate > 0.1:
            self.base_threshold += self.adjustment_factor
        
        # 캐시 히트율이 낮으면 임계값 감소
        elif self.cache_hit_rate < 0.5:
            self.base_threshold -= self.adjustment_factor
        
        # 임계값 범위 제한
        self.base_threshold = max(0.8, min(0.98, self.base_threshold))

도입 시 고려사항과 베스트 프랙티스

성능 모니터링과 메트릭

시맨틱 캐싱 시스템의 성능을 지속적으로 모니터링하기 위한 핵심 메트릭들:

class CacheMetrics:
    def __init__(self):
        self.total_queries = 0
        self.cache_hits = 0
        self.false_positives = 0
        self.response_times = []
    
    def calculate_metrics(self):
        return {
            'hit_rate': self.cache_hits / self.total_queries,
            'false_positive_rate': self.false_positives / self.cache_hits,
            'avg_response_time': sum(self.response_times) / len(self.response_times),
            'cost_savings': self.cache_hits * 0.002  # API 비용 절약액
        }

주요 모니터링 메트릭:

데이터 품질 관리

시맨틱 캐싱의 효과를 높이기 위한 데이터 품질 관리 방법:

def quality_check(query, cached_response, actual_response):
    # 응답 품질 검증
    similarity_score = calculate_response_similarity(cached_response, actual_response)
    
    if similarity_score < 0.8:  # 품질 임계값
        # 캐시에서 제거 또는 업데이트
        remove_from_cache(query)
        log_quality_issue(query, similarity_score)
    
    return similarity_score

캐시 무효화 전략

시간이 지남에 따라 부정확해질 수 있는 캐시 데이터를 관리하는 전략:

class CacheInvalidation:
    def __init__(self):
        self.ttl_cache = {}  # Time To Live 캐시
        self.access_count = {}  # 접근 횟수 추적
    
    def should_invalidate(self, query):
        # 시간 기반 무효화
        if self.is_expired(query):
            return True
        
        # 사용 빈도 기반 무효화
        if self.access_count.get(query, 0) > 1000:
            return True
        
        return False
    
    def is_expired(self, query):
        return time.time() > self.ttl_cache.get(query, 0)

실제 구현 예제

완전한 시맨틱 캐싱 시스템

다음은 실제 프로덕션 환경에서 사용할 수 있는 완전한 시맨틱 캐싱 시스템 예제이다:

import asyncio
import hashlib
import json
import time
from typing import Optional, Dict, Any
from dataclasses import dataclass
from sentence_transformers import SentenceTransformer
import redis
import numpy as np

@dataclass
class CacheEntry:
    query: str
    response: str
    embedding: np.ndarray
    timestamp: float
    access_count: int = 0

class ProductionSemanticCache:
    def __init__(
        self,
        model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
        similarity_threshold: float = 0.9,
        ttl: int = 3600,  # 1시간
        max_cache_size: int = 10000
    ):
        self.model = SentenceTransformer(model_name)
        self.similarity_threshold = similarity_threshold
        self.ttl = ttl
        self.max_cache_size = max_cache_size
        
        # Redis 연결
        self.redis_client = redis.Redis(
            host='localhost',
            port=6379,
            db=0,
            decode_responses=True
        )
        
        # 메트릭 추적
        self.metrics = {
            'total_queries': 0,
            'cache_hits': 0,
            'false_positives': 0,
            'response_times': []
        }
    
    async def get_response(self, query: str) -> Optional[str]:
        start_time = time.time()
        self.metrics['total_queries'] += 1
        
        try:
            # 1. 캐시 검색
            cached_response = await self._search_cache(query)
            
            if cached_response:
                self.metrics['cache_hits'] += 1
                response_time = time.time() - start_time
                self.metrics['response_times'].append(response_time)
                return cached_response
            
            # 2. LLM API 호출 (실제 구현에서는 OpenAI API 등)
            response = await self._call_llm_api(query)
            
            # 3. 캐시 저장
            await self._store_in_cache(query, response)
            
            response_time = time.time() - start_time
            self.metrics['response_times'].append(response_time)
            return response
            
        except Exception as e:
            print(f"Error in get_response: {e}")
            return None
    
    async def _search_cache(self, query: str) -> Optional[str]:
        query_embedding = self.model.encode(query)
        
        # Redis에서 모든 캐시 엔트리 검색
        cache_keys = self.redis_client.keys("cache:*")
        
        best_similarity = 0
        best_response = None
        
        for key in cache_keys:
            cached_data = self.redis_client.hgetall(key)
            
            if not cached_data:
                continue
            
            # 임베딩 복원
            cached_embedding = np.frombuffer(
                bytes.fromhex(cached_data['embedding']),
                dtype=np.float32
            )
            
            # 유사도 계산
            similarity = self._cosine_similarity(query_embedding, cached_embedding)
            
            if similarity > best_similarity and similarity >= self.similarity_threshold:
                best_similarity = similarity
                best_response = cached_data['response']
                
                # 접근 횟수 증가
                self.redis_client.hincrby(key, 'access_count', 1)
        
        return best_response
    
    async def _store_in_cache(self, query: str, response: str):
        # 캐시 크기 확인 및 정리
        if len(self.redis_client.keys("cache:*")) >= self.max_cache_size:
            await self._evict_old_entries()
        
        # 새 엔트리 저장
        query_hash = hashlib.md5(query.encode()).hexdigest()
        cache_key = f"cache:{query_hash}"
        
        embedding = self.model.encode(query)
        
        cache_data = {
            'query': query,
            'response': response,
            'embedding': embedding.tobytes().hex(),
            'timestamp': time.time(),
            'access_count': 0
        }
        
        self.redis_client.hset(cache_key, mapping=cache_data)
        self.redis_client.expire(cache_key, self.ttl)
    
    async def _evict_old_entries(self):
        # LRU 정책으로 오래된 엔트리 제거
        cache_keys = self.redis_client.keys("cache:*")
        
        if not cache_keys:
            return
        
        # 접근 횟수와 타임스탬프를 기준으로 정렬
        key_scores = []
        for key in cache_keys:
            cached_data = self.redis_client.hgetall(key)
            if cached_data:
                score = (
                    float(cached_data.get('access_count', 0)) * 0.7 +
                    float(cached_data.get('timestamp', 0)) * 0.3
                )
                key_scores.append((key, score))
        
        # 점수가 낮은 순으로 정렬 (제거 대상)
        key_scores.sort(key=lambda x: x[1])
        
        # 상위 20% 제거
        remove_count = max(1, len(key_scores) // 5)
        for key, _ in key_scores[:remove_count]:
            self.redis_client.delete(key)
    
    def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    
    async def _call_llm_api(self, query: str) -> str:
        # 실제 LLM API 호출 로직
        # 여기서는 예시로 단순한 응답 반환
        await asyncio.sleep(0.1)  # API 호출 시뮬레이션
        return f"Generated response for: {query}"
    
    def get_metrics(self) -> Dict[str, Any]:
        if self.metrics['total_queries'] == 0:
            return {"message": "No queries processed yet"}
        
        avg_response_time = (
            sum(self.metrics['response_times']) / len(self.metrics['response_times'])
            if self.metrics['response_times'] else 0
        )
        
        return {
            'total_queries': self.metrics['total_queries'],
            'cache_hit_rate': self.metrics['cache_hits'] / self.metrics['total_queries'],
            'avg_response_time': avg_response_time,
            'cost_savings_estimate': self.metrics['cache_hits'] * 0.002  # API 비용 절약
        }

# 사용 예제
async def main():
    cache = ProductionSemanticCache(
        similarity_threshold=0.9,
        ttl=3600,
        max_cache_size=1000
    )
    
    # 테스트 쿼리들
    queries = [
        "배송비는 얼마인가요?",
        "배송비가 어떻게 되나요?",
        "배송 비용이 궁금합니다",
        "환불 정책은 어떻게 되나요?",
        "환불은 어떻게 하나요?"
    ]
    
    for query in queries:
        response = await cache.get_response(query)
        print(f"Query: {query}")
        print(f"Response: {response}")
        print("---")
    
    # 메트릭 확인
    metrics = cache.get_metrics()
    print(f"Cache Metrics: {metrics}")

if __name__ == "__main__":
    asyncio.run(main())

마무리

AI 시맨틱 캐싱은 LLM 비용 최적화를 위한 핵심 기술로, 당근마켓과 같은 실제 서비스에서 이미 큰 성과를 거두고 있다. 전통적인 캐싱 방식의 한계를 극복하고 의미적 유사성을 활용하여 더 효율적인 캐시 시스템을 구축할 수 있다.

성공적인 시맨틱 캐싱 도입을 위해서는 적절한 임베딩 모델 선택, 임계값 최적화, 지속적인 모니터링이 필요하다. 특히 False Positive를 최소화하면서 캐시 히트율을 최대화하는 균형점을 찾는 것이 중요하다.

앞으로 더 많은 기업들이 AI 서비스를 도입하면서 시맨틱 캐싱의 중요성은 계속 증가할 것으로 예상된다. 지금부터 시맨틱 캐싱 기술을 학습하고 적용해보는 것은 비용 효율적인 AI 서비스 구축을 위한 필수 역량이 될 것이다.

참고