개요

이 가이드에서는 문서를 입력받아 처리하고 벡터 데이터베이스에 저장하기까지의 완전한 파이프라인을 구축합니다.

전체 파이프라인 흐름

완전한 파이프라인 구현

import requests
from typing import List, Dict, Any
import os
from pathlib import Path

class DocumentPipeline:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.clovastudio.go.kr/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def process_document(self, file_path: str, 
                        apply_pii_masking: bool = False,
                        chunk_size: int = 512,
                        chunk_overlap: int = 50) -> Dict[str, Any]:
        """
        문서 처리 파이프라인
        
        Args:
            file_path: 처리할 문서 경로
            apply_pii_masking: PII 마스킹 적용 여부
            chunk_size: 청크 크기
            chunk_overlap: 청크 간 오버랩
        
        Returns:
            처리 결과 (청크, 임베딩 등)
        """
        print(f"📄 문서 처리 시작: {file_path}")
        
        # 1. 문서 파싱
        print("1️⃣ 문서 파싱 중...")
        text = self._parse_document(file_path)
        print(f"   ✅ 파싱 완료 (길이: {len(text)} 문자)")
        
        # 2. PII 마스킹 (선택적)
        if apply_pii_masking:
            print("2️⃣ PII 마스킹 중...")
            text = self._mask_pii(text)
            print(f"   ✅ PII 마스킹 완료")
        
        # 3. 텍스트 청킹
        print("3️⃣ 텍스트 청킹 중...")
        chunks = self._chunk_text(text, chunk_size, chunk_overlap)
        print(f"   ✅ 청킹 완료 ({len(chunks)}개 청크)")
        
        # 4. 임베딩 생성
        print("4️⃣ 임베딩 생성 중...")
        embeddings = self._create_embeddings(chunks)
        print(f"   ✅ 임베딩 완료")
        
        result = {
            "file_name": os.path.basename(file_path),
            "original_text": text,
            "chunks": chunks,
            "embeddings": embeddings,
            "metadata": {
                "num_chunks": len(chunks),
                "chunk_size": chunk_size,
                "chunk_overlap": chunk_overlap,
                "pii_masked": apply_pii_masking
            }
        }
        
        print("✨ 문서 처리 완료!")
        return result
    
    def _parse_document(self, file_path: str) -> str:
        """문서 파싱"""
        with open(file_path, 'rb') as f:
            files = {'file': f}
            headers = {"Authorization": f"Bearer {self.api_key}"}
            
            # 파일 확장자에 따라 적절한 파서 선택
            ext = Path(file_path).suffix.lower()
            endpoint = "rag42"  # 기본값
            
            response = requests.post(
                f"{self.base_url}/tools/parse/{endpoint}",
                headers=headers,
                files=files
            )
            response.raise_for_status()
            
            return response.json()['text']
    
    def _mask_pii(self, text: str) -> str:
        """PII 정보 마스킹"""
        data = {
            "text": text,
            "mask_types": ["email", "phone", "ssn", "credit_card"]
        }
        
        response = requests.post(
            f"{self.base_url}/tools/pii-mask",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        
        return response.json()['masked_text']
    
    def _chunk_text(self, text: str, chunk_size: int, overlap: int) -> List[str]:
        """텍스트를 청크로 분할"""
        data = {
            "text": text,
            "chunk_size": chunk_size,
            "overlap": overlap,
            "method": "semantic"  # semantic, fixed, sentence 등
        }
        
        response = requests.post(
            f"{self.base_url}/tools/chunk",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        
        return response.json()['chunks']
    
    def _create_embeddings(self, texts: List[str]) -> List[List[float]]:
        """텍스트 임베딩 생성"""
        data = {
            "texts": texts,
            "model": "clova-embedding"
        }
        
        response = requests.post(
            f"{self.base_url}/tools/embeddings",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        
        return response.json()['embeddings']

# 사용 예제
if __name__ == "__main__":
    pipeline = DocumentPipeline("YOUR_API_KEY")
    
    result = pipeline.process_document(
        file_path="./document.pdf",
        apply_pii_masking=True,
        chunk_size=512,
        chunk_overlap=50
    )
    
    print(f"\n처리 결과:")
    print(f"- 파일: {result['file_name']}")
    print(f"- 청크 수: {result['metadata']['num_chunks']}")
    print(f"- PII 마스킹: {result['metadata']['pii_masked']}")

배치 처리

여러 문서를 효율적으로 처리하는 방법
import concurrent.futures
from tqdm import tqdm

class BatchDocumentProcessor:
    def __init__(self, api_key: str, max_workers: int = 5):
        self.pipeline = DocumentPipeline(api_key)
        self.max_workers = max_workers
    
    def process_directory(self, directory: str, 
                         file_extensions: List[str] = ['.pdf', '.docx', '.txt'],
                         **kwargs) -> List[Dict[str, Any]]:
        """디렉토리 내 모든 문서 처리"""
        # 처리할 파일 목록 생성
        files_to_process = []
        for ext in file_extensions:
            files_to_process.extend(Path(directory).glob(f"**/*{ext}"))
        
        print(f"📁 {len(files_to_process)}개 파일 발견")
        
        # 병렬 처리
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(self.pipeline.process_document, str(f), **kwargs): f 
                for f in files_to_process
            }
            
            for future in tqdm(concurrent.futures.as_completed(futures), 
                             total=len(futures), 
                             desc="문서 처리 중"):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    file_path = futures[future]
                    print(f"❌ {file_path} 처리 실패: {e}")
        
        return results
    
    def save_results(self, results: List[Dict[str, Any]], output_dir: str):
        """처리 결과를 파일로 저장"""
        import json
        
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        for result in results:
            file_name = result['file_name']
            output_file = output_path / f"{file_name}.json"
            
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(result, f, ensure_ascii=False, indent=2)
        
        print(f"💾 결과 저장 완료: {output_dir}")

# 사용 예제
processor = BatchDocumentProcessor("YOUR_API_KEY", max_workers=5)
results = processor.process_directory(
    directory="./documents",
    file_extensions=['.pdf', '.docx'],
    apply_pii_masking=True,
    chunk_size=512
)
processor.save_results(results, "./processed_documents")

RAG42와 통합

처리된 문서를 RAG42 컬렉션에 직접 업로드
class RAGIntegratedPipeline:
    def __init__(self, api_key: str):
        self.pipeline = DocumentPipeline(api_key)
        self.api_key = api_key
        self.base_url = "https://api.clovastudio.go.kr/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def process_and_upload(self, file_path: str, collection_id: str, 
                          metadata: Dict[str, Any] = None) -> str:
        """문서 처리 후 RAG42에 업로드"""
        # 1. 문서 처리
        result = self.pipeline.process_document(file_path)
        
        # 2. RAG42에 업로드
        with open(file_path, 'rb') as f:
            files = {'file': f}
            data = {
                'collection_id': collection_id,
            }
            
            if metadata:
                data['metadata'] = metadata
            else:
                data['metadata'] = {
                    'processed': True,
                    'num_chunks': result['metadata']['num_chunks']
                }
            
            headers = {"Authorization": f"Bearer {self.api_key}"}
            
            response = requests.post(
                f"{self.base_url}/rag42/documents",
                headers=headers,
                files=files,
                data=data
            )
            response.raise_for_status()
            
            doc_id = response.json()['id']
            print(f"📤 RAG42에 업로드 완료: {doc_id}")
            
            return doc_id
    
    def process_directory_to_rag(self, directory: str, collection_id: str):
        """디렉토리 내 모든 문서를 처리하여 RAG42에 업로드"""
        files = list(Path(directory).glob("**/*.pdf"))
        
        for file_path in tqdm(files, desc="업로드 중"):
            try:
                self.process_and_upload(
                    str(file_path),
                    collection_id,
                    metadata={
                        'source': str(file_path),
                        'filename': file_path.name
                    }
                )
            except Exception as e:
                print(f"❌ {file_path} 업로드 실패: {e}")

# 사용 예제
rag_pipeline = RAGIntegratedPipeline("YOUR_API_KEY")
rag_pipeline.process_directory_to_rag("./documents", "collection_id_here")

고급 청킹 전략

class AdvancedChunker:
    """고급 청킹 전략 구현"""
    
    @staticmethod
    def semantic_chunking(text: str, max_chunk_size: int = 512) -> List[str]:
        """의미 단위로 청킹 (문단, 섹션 등)"""
        # 문단으로 분할
        paragraphs = text.split('\n\n')
        
        chunks = []
        current_chunk = ""
        
        for para in paragraphs:
            if len(current_chunk) + len(para) < max_chunk_size:
                current_chunk += para + "\n\n"
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = para + "\n\n"
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    @staticmethod
    def sentence_window_chunking(text: str, window_size: int = 3) -> List[Dict[str, str]]:
        """문장 윈도우 청킹 (문장 + 주변 컨텍스트)"""
        import re
        
        # 문장 분리
        sentences = re.split(r'[.!?]\s+', text)
        
        chunks = []
        for i, sentence in enumerate(sentences):
            # 주변 문장 포함
            start = max(0, i - window_size)
            end = min(len(sentences), i + window_size + 1)
            
            context = ' '.join(sentences[start:end])
            
            chunks.append({
                'main': sentence,
                'context': context
            })
        
        return chunks

모니터링 및 로깅

import logging
from datetime import datetime

class MonitoredPipeline(DocumentPipeline):
    """모니터링 기능이 추가된 파이프라인"""
    
    def __init__(self, api_key: str):
        super().__init__(api_key)
        self.setup_logging()
        self.stats = {
            'processed': 0,
            'failed': 0,
            'total_chunks': 0
        }
    
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('pipeline.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def process_document(self, file_path: str, **kwargs):
        start_time = datetime.now()
        
        try:
            self.logger.info(f"처리 시작: {file_path}")
            result = super().process_document(file_path, **kwargs)
            
            # 통계 업데이트
            self.stats['processed'] += 1
            self.stats['total_chunks'] += len(result['chunks'])
            
            elapsed = (datetime.now() - start_time).total_seconds()
            self.logger.info(f"처리 완료: {file_path} ({elapsed:.2f}초)")
            
            return result
            
        except Exception as e:
            self.stats['failed'] += 1
            self.logger.error(f"처리 실패: {file_path} - {str(e)}")
            raise
    
    def get_stats(self):
        """처리 통계 반환"""
        return self.stats

다음 단계

프로덕션 환경에서는 처리 실패 시 재시도 로직과 에러 알림 시스템을 추가하는 것이 좋습니다.