100K PV 웹사이트 운영 경험을 바탕으로 실측한 Threading, Multiprocessing, asyncio 성능 비교
📋 목차
들어가며 - 왜 병렬처리가 필요했을까?
안녕하세요. 저는 부동산 실거래가 정보를 제공하는 aptmeter.kr과 일 10만 페이지뷰를 기록하는 열차 시간표 사이트를 운영하고 있습니다. 오늘은 실제 프로젝트에서 겪었던 크롤링 성능 문제와 해결 과정을 공유하려고 합니다.
단일 크롤링의 한계
처음 aptmeter를 개발할 때는 단순한 for 루프로 데이터를 수집했습니다. 서울시 아파트 5,000개의 실거래가 데이터를 수집하는데 무려 8시간이 걸렸죠. 문제는 대부분의 시간이 네트워크 응답을 기다리는 시간이었다는 점입니다.
# 초기 버전 - 단일 크롤링
import requests
from bs4 import BeautifulSoup
import time
def crawl_single(urls):
results = []
start_time = time.time()
for url in urls:
response = requests.get(url)
data = BeautifulSoup(response.text, 'html.parser')
results.append(data)
time.sleep(0.1) # 서버 부담 방지
elapsed = time.time() - start_time
print(f"처리 시간: {elapsed:.2f}초")
return results
# 5,000개 URL 처리: 약 28,800초 (8시간)
왜 이렇게 느렸을까?
크롤링 작업의 시간을 분석해보니 다음과 같았습니다:
- 네트워크 대기: 90% (서버 응답 기다림)
- HTML 파싱: 8% (BeautifulSoup 처리)
- 데이터 저장: 2% (DB 쓰기)
문제는 명확했습니다. CPU는 놀고 있는데 네트워크 응답만 계속 기다리고 있었던 것이죠. 이것이 바로 I/O Bound 작업의 특징입니다.
병렬처리로 얻은 놀라운 개선
처리 시간
업데이트 주기
데이터 신선도
이 글에서는 제가 직접 테스트하고 측정한 세 가지 병렬처리 방식(Threading, Multiprocessing, asyncio)의 실제 성능을 비교하고, 여러분의 프로젝트에 맞는 최적의 방법을 선택할 수 있도록 도와드리겠습니다.
크롤링 병렬처리 3가지 방식 완벽 비교
Python에서 병렬처리를 구현하는 방법은 크게 세 가지입니다. 각각의 특징과 실제 성능을 상세히 비교해보겠습니다.
1. Threading (ThreadPoolExecutor) - 가장 쉬운 시작
개념과 작동 원리
Threading은 하나의 프로세스 안에서 여러 개의 스레드를 만들어 동시에 작업을 처리하는 방식입니다. 마치 한 사람이 여러 개의 창을 띄워놓고 번갈아가며 일하는 것과 비슷하죠.
• 하나의 프로세스 내에서 메모리 공유
• GIL(Global Interpreter Lock)로 인해 진정한 병렬 실행은 아님
• I/O 작업(네트워크, 파일)에 최적화
• 구현이 간단하고 메모리 효율적
실제 구현 코드
from concurrent.futures import ThreadPoolExecutor
import requests
from bs4 import BeautifulSoup
import time
def fetch_page(url):
"""단일 페이지를 크롤링하는 함수"""
try:
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
return {'url': url, 'title': soup.title.string if soup.title else 'No title'}
except Exception as e:
return {'url': url, 'error': str(e)}
def crawl_with_threading(urls, max_workers=5):
"""Threading 방식으로 크롤링"""
start_time = time.time()
results = []
# ThreadPoolExecutor 사용
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# map 함수로 모든 URL 처리
results = list(executor.map(fetch_page, urls))
elapsed = time.time() - start_time
print(f"Threading 처리 시간: {elapsed:.2f}초")
print(f"초당 처리량: {len(urls)/elapsed:.2f} pages/sec")
return results
# 사용 예시
urls = ['https://example.com/page1', 'https://example.com/page2', ...]
results = crawl_with_threading(urls, max_workers=5)
실측 성능 데이터 (1,000개 URL 기준)
| 측정 항목 | 단일 처리 | Threading (5 workers) | 개선율 |
|---|---|---|---|
| 처리 시간 | 580초 | 125초 | 4.6배 향상 |
| RAM 사용량 | 150MB | 250MB | +100MB |
| CPU 사용률 | 10% | 25% | +15%p |
| 초당 처리량 | 1.7 pages/s | 8.0 pages/s | 4.7배 향상 |
장단점 분석
• 구현이 매우 간단함 (코드 3-4줄 추가)
• 메모리 사용량이 적음 (공유 메모리 구조)
• 디버깅이 쉬움
• 작은 규모 프로젝트에 최적
• GIL로 인해 CPU 집약적 작업에는 부적합
• Multiprocessing보다 속도가 약간 느림
• 대규모 크롤링(10,000개 이상)에는 한계
2. Multiprocessing (ProcessPoolExecutor) - 진정한 병렬처리
개념과 작동 원리
Multiprocessing은 여러 개의 독립적인 프로세스를 생성하여 각각의 CPU 코어에서 실제로 동시에 작업을 수행합니다. 마치 여러 사람이 각자의 컴퓨터에서 독립적으로 일하는 것과 같습니다.
• 각 프로세스가 독립적인 메모리 공간 사용
• GIL 제약 없이 진정한 병렬 실행
• CPU 집약적 작업에도 효과적
• 메모리 사용량이 큼
실제 구현 코드
from concurrent.futures import ProcessPoolExecutor
import requests
from bs4 import BeautifulSoup
import time
def fetch_and_parse(url):
"""페이지를 가져와서 파싱하는 함수"""
try:
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
# 복잡한 파싱 작업 수행
data = {
'url': url,
'title': soup.title.string if soup.title else '',
'links': len(soup.find_all('a')),
'images': len(soup.find_all('img'))
}
return data
except Exception as e:
return {'url': url, 'error': str(e)}
def crawl_with_multiprocessing(urls, max_workers=5):
"""Multiprocessing 방식으로 크롤링"""
start_time = time.time()
# ProcessPoolExecutor 사용
with ProcessPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(fetch_and_parse, urls))
elapsed = time.time() - start_time
print(f"Multiprocessing 처리 시간: {elapsed:.2f}초")
print(f"초당 처리량: {len(urls)/elapsed:.2f} pages/sec")
return results
# 사용 예시
urls = ['https://example.com/page{}'.format(i) for i in range(1000)]
results = crawl_with_multiprocessing(urls, max_workers=5)
실측 성능 데이터 (1,000개 URL 기준)
| 측정 항목 | Threading (5) | Multiprocessing (5) | 비교 |
|---|---|---|---|
| 처리 시간 | 125초 | 110초 | 12% 더 빠름 |
| RAM 사용량 | 250MB | 850MB | 3.4배 더 많음 |
| CPU 사용률 | 25% | 65% | 2.6배 더 높음 |
| 초당 처리량 | 8.0 pages/s | 9.1 pages/s | 14% 향상 |
언제 사용해야 할까?
• HTML 파싱 작업이 복잡한 경우
• CPU 연산이 많이 필요한 경우
• 서버 리소스가 충분한 경우
• 최대 성능이 필요한 경우
• 메모리가 제한적인 환경에서는 부적합
• 프로세스 생성 오버헤드 고려 필요
• Windows에서는 if __name__ == '__main__' 필수
3. asyncio + aiohttp - 최신 비동기 방식
개념과 작동 원리
asyncio는 단일 스레드에서 비동기 I/O를 처리하는 방식입니다. 한 사람이 여러 일을 시작해놓고, 대기 시간 동안 다른 일을 처리하는 효율적인 멀티태스킹과 같습니다.
• 이벤트 루프 기반 비동기 처리
• 단일 스레드지만 수천 개 동시 연결 가능
• 메모리 효율이 가장 좋음
• 코드가 약간 복잡해질 수 있음
실제 구현 코드
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
async def fetch_page_async(session, url):
"""비동기로 페이지를 가져오는 함수"""
try:
async with session.get(url, timeout=10) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
return {
'url': url,
'title': soup.title.string if soup.title else '',
'status': response.status
}
except Exception as e:
return {'url': url, 'error': str(e)}
async def crawl_with_asyncio(urls, concurrent_limit=10):
"""asyncio 방식으로 크롤링"""
start_time = time.time()
# 동시 연결 수 제한을 위한 세마포어
semaphore = asyncio.Semaphore(concurrent_limit)
async def bounded_fetch(session, url):
async with semaphore:
return await fetch_page_async(session, url)
# aiohttp 세션 생성
async with aiohttp.ClientSession() as session:
tasks = [bounded_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start_time
print(f"asyncio 처리 시간: {elapsed:.2f}초")
print(f"초당 처리량: {len(urls)/elapsed:.2f} pages/sec")
return results
# 사용 예시
urls = ['https://example.com/page{}'.format(i) for i in range(1000)]
results = asyncio.run(crawl_with_asyncio(urls, concurrent_limit=10))
실측 성능 데이터 (1,000개 URL 기준)
| 측정 항목 | Threading (5) | asyncio (10) | 비교 |
|---|---|---|---|
| 처리 시간 | 125초 | 88초 | 42% 더 빠름 |
| RAM 사용량 | 250MB | 220MB | 12% 절약 |
| CPU 사용률 | 25% | 18% | 28% 절약 |
| 초당 처리량 | 8.0 pages/s | 11.4 pages/s | 43% 향상 |
왜 asyncio가 가장 빠를까?
asyncio는 다른 방식과 달리 네트워크 대기 시간을 정말 효율적으로 활용합니다:
- 오버헤드 최소화: 스레드/프로세스 전환 비용이 없음
- 메모리 효율: 단일 스레드로 수천 개 요청 처리
- 네트워크 최적화: aiohttp가 keep-alive 자동 관리
- 동시성: 10-50개를 동시에 처리해도 부담 없음
• 대규모 웹 크롤링 (1,000개 이상)
• API 호출이 많은 경우
• 메모리가 제한적인 환경
• 최신 Python 사용 가능 시 (3.7+)
MaxWorkers 최적값 찾기 - 실전 측정 데이터
병렬처리에서 가장 중요한 설정 중 하나가 바로 동시 작업 수(max_workers)입니다. 너무 적으면 성능 향상이 미미하고, 너무 많으면 오히려 역효과가 납니다.
실험 설정
- 테스트 대상: 1,000개의 웹페이지 크롤링
- 환경: 4코어 8스레드 CPU, 16GB RAM
- 네트워크: 100Mbps
- 측정 방식: Threading 기준, workers 1/3/5/10/20 비교
Workers 수에 따른 성능 변화
| Workers | 처리 시간 | RAM 사용 | CPU 사용 | 초당 처리 |
|---|---|---|---|---|
| 1 (단일) | 580초 | 150MB | 10% | 1.7 pages/s |
| 3 | 205초 | 200MB | 18% | 4.9 pages/s |
| 5 | 125초 | 250MB | 25% | 8.0 pages/s |
| 10 | 98초 | 350MB | 35% | 10.2 pages/s |
| 20 | 95초 | 550MB | 42% | 10.5 pages/s |
핵심 인사이트
1. 성능 향상의 한계점
Workers 10개까지는 성능이 크게 향상되지만, 20개로 늘려도 겨우 3% 개선됩니다. 이는 네트워크 대역폭이 병목이 되기 때문입니다.
2. 최적의 Workers 수 = 5-10개
• Workers 5: 성능/리소스 균형점
• Workers 10: 최대 성능 (리소스 여유 시)
• Workers 20+: 비효율적 (리소스만 낭비)
3. 환경별 권장 설정
• 개인 PC: 3-5개
• 서버 환경: 5-10개
• 클라우드 (고사양): 10-20개
동적 Workers 설정 코드
import os
import psutil
def get_optimal_workers():
"""시스템 리소스 기반으로 최적 workers 수 계산"""
# CPU 코어 수
cpu_count = os.cpu_count() or 4
# 사용 가능한 메모리 (GB)
available_memory = psutil.virtual_memory().available / (1024**3)
# CPU 기반 계산: 코어당 2개
cpu_based = cpu_count * 2
# 메모리 기반 계산: GB당 1개 worker
# (worker당 약 100-150MB 사용 가정)
memory_based = int(available_memory * 7) # 여유있게 설정
# 둘 중 작은 값 선택하되, 최소 3, 최대 20
optimal = min(cpu_based, memory_based)
optimal = max(3, min(optimal, 20))
print(f"CPU 코어: {cpu_count}")
print(f"사용 가능 메모리: {available_memory:.1f}GB")
print(f"권장 workers: {optimal}")
return optimal
# 사용 예시
max_workers = get_optimal_workers()
crawl_with_threading(urls, max_workers=max_workers)
따라하기 쉬운 단계별 튜토리얼
실제로 크롤링 프로젝트에 병렬처리를 적용하는 전체 과정을 단계별로 설명하겠습니다.
Step 1: 환경 설정
# 필요한 패키지 설치
pip install requests beautifulsoup4 aiohttp psutil
# 또는 requirements.txt 생성
cat > requirements.txt << EOF
requests==2.31.0
beautifulsoup4==4.12.2
aiohttp==3.9.1
psutil==5.9.6
lxml==4.9.3
EOF
pip install -r requirements.txt
Step 2: 기본 크롤러 작성
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import time
from typing import List, Dict
class SimpleCrawler:
"""간단한 병렬 크롤러"""
def __init__(self, max_workers=5, timeout=10):
self.max_workers = max_workers
self.timeout = timeout
self.session = requests.Session()
def fetch_url(self, url: str) -> Dict:
"""단일 URL 크롤링"""
try:
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
return {
'url': url,
'status': response.status_code,
'title': soup.title.string if soup.title else '',
'links_count': len(soup.find_all('a')),
'success': True
}
except Exception as e:
return {
'url': url,
'error': str(e),
'success': False
}
def crawl(self, urls: List[str]) -> List[Dict]:
"""여러 URL을 병렬로 크롤링"""
start_time = time.time()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(self.fetch_url, urls))
elapsed = time.time() - start_time
success_count = sum(1 for r in results if r.get('success'))
print(f"\n크롤링 완료!")
print(f"총 처리: {len(urls)}개")
print(f"성공: {success_count}개")
print(f"실패: {len(urls) - success_count}개")
print(f"소요 시간: {elapsed:.2f}초")
print(f"처리 속도: {len(urls)/elapsed:.2f} pages/sec")
return results
# 사용 예시
if __name__ == '__main__':
urls = [
'https://example.com',
'https://python.org',
'https://github.com',
# ... 더 많은 URL
]
crawler = SimpleCrawler(max_workers=5)
results = crawler.crawl(urls)
# 결과 저장
import json
with open('results.json', 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
Step 3: 성능 모니터링 추가
import psutil
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class PerformanceMetrics:
"""성능 측정 데이터 클래스"""
start_memory: float
end_memory: float
peak_memory: float
start_cpu: float
avg_cpu: float
elapsed_time: float
pages_per_second: float
def print_report(self):
"""성능 리포트 출력"""
print("\n" + "="*50)
print("성능 측정 결과")
print("="*50)
print(f"처리 시간: {self.elapsed_time:.2f}초")
print(f"초당 처리량: {self.pages_per_second:.2f} pages/sec")
print(f"메모리 사용: {self.start_memory:.0f}MB → {self.end_memory:.0f}MB")
print(f"메모리 증가: +{self.end_memory - self.start_memory:.0f}MB")
print(f"최대 메모리: {self.peak_memory:.0f}MB")
print(f"평균 CPU: {self.avg_cpu:.1f}%")
print("="*50)
class MonitoredCrawler(SimpleCrawler):
"""성능 모니터링 기능이 추가된 크롤러"""
def _get_memory_usage(self) -> float:
"""현재 메모리 사용량 (MB)"""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
def crawl_with_monitoring(self, urls: List[str]) -> tuple:
"""모니터링과 함께 크롤링"""
# 시작 시점 측정
start_memory = self._get_memory_usage()
start_cpu = psutil.cpu_percent(interval=1)
start_time = time.time()
peak_memory = start_memory
cpu_samples = [start_cpu]
# 크롤링 실행
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = []
for result in executor.map(self.fetch_url, urls):
results.append(result)
# 중간 메모리 측정
current_memory = self._get_memory_usage()
peak_memory = max(peak_memory, current_memory)
cpu_samples.append(psutil.cpu_percent())
# 종료 시점 측정
end_time = time.time()
end_memory = self._get_memory_usage()
elapsed = end_time - start_time
# 성능 메트릭 생성
metrics = PerformanceMetrics(
start_memory=start_memory,
end_memory=end_memory,
peak_memory=peak_memory,
start_cpu=start_cpu,
avg_cpu=sum(cpu_samples) / len(cpu_samples),
elapsed_time=elapsed,
pages_per_second=len(urls) / elapsed
)
return results, metrics
# 사용 예시
crawler = MonitoredCrawler(max_workers=5)
results, metrics = crawler.crawl_with_monitoring(urls)
metrics.print_report()
Step 4: 에러 처리 및 재시도 로직
from typing import Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def retry_on_failure(max_retries=3, delay=1):
"""재시도 데코레이터"""
def decorator(func: Callable):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"최종 실패 after {max_retries} attempts: {e}")
raise
logger.warning(f"시도 {attempt + 1} 실패, {delay}초 후 재시도: {e}")
time.sleep(delay)
return None
return wrapper
return decorator
class RobustCrawler(MonitoredCrawler):
"""안정성이 강화된 크롤러"""
@retry_on_failure(max_retries=3, delay=2)
def fetch_url(self, url: str) -> Dict:
"""재시도 로직이 포함된 크롤링"""
try:
response = self.session.get(
url,
timeout=self.timeout,
headers={'User-Agent': 'Mozilla/5.0'} # User-Agent 추가
)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml') # lxml 파서 사용
return {
'url': url,
'status': response.status_code,
'title': soup.title.string if soup.title else '',
'content_length': len(response.text),
'success': True,
'timestamp': time.time()
}
except requests.Timeout:
logger.error(f"타임아웃: {url}")
return {'url': url, 'error': 'Timeout', 'success': False}
except requests.HTTPError as e:
logger.error(f"HTTP 오류 {e.response.status_code}: {url}")
return {'url': url, 'error': f'HTTP {e.response.status_code}', 'success': False}
except Exception as e:
logger.error(f"예상치 못한 오류: {url} - {e}")
return {'url': url, 'error': str(e), 'success': False}
성능 모니터링과 트러블슈팅
자주 겪는 문제와 해결법
문제 1: 메모리 부족 오류 (MemoryError)
• "MemoryError" 발생
• 시스템이 느려지거나 멈춤
• 크롤링 중간에 프로세스 종료
원인
• max_workers가 너무 많음
• 크롤링한 데이터를 메모리에 계속 보관
• 대용량 페이지 처리
해결 방법
def crawl_in_batches(urls, batch_size=100, max_workers=5):
"""배치 단위로 처리하여 메모리 관리"""
all_results = []
for i in range(0, len(urls), batch_size):
batch = urls[i:i+batch_size]
logger.info(f"배치 {i//batch_size + 1} 처리 중... ({len(batch)}개)")
# 배치 처리
crawler = SimpleCrawler(max_workers=max_workers)
batch_results = crawler.crawl(batch)
# 결과 저장 및 메모리 해제
save_results(batch_results, f'batch_{i//batch_size}.json')
all_results.extend(batch_results)
# 명시적 가비지 컬렉션
import gc
gc.collect()
logger.info(f"현재 메모리: {psutil.Process().memory_info().rss / 1024 / 1024:.0f}MB")
return all_results
문제 2: CPU 과부하
• CPU 사용률 100%
• 시스템 전체가 느려짐
• 다른 프로그램 실행 불가
해결 방법
import os
# CPU 친화도 설정 (특정 코어만 사용)
if os.name == 'posix': # Linux/Mac
# 처음 2개 코어만 사용
os.sched_setaffinity(0, {0, 1})
# 또는 nice 값 설정 (우선순위 낮춤)
if os.name == 'posix':
os.nice(10) # 낮은 우선순위
# Workers 수 제한
max_workers = min(5, os.cpu_count() - 1) # 최소 1개 코어는 시스템용으로 남김
문제 3: 타임아웃 빈발
• "Connection timeout" 에러 다수
• 성공률이 낮음
• 특정 사이트만 실패
해결 방법
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_robust_session():
"""재시도 로직이 있는 세션 생성"""
session = requests.Session()
# 재시도 전략 설정
retry_strategy = Retry(
total=3, # 총 3번 재시도
backoff_factor=1, # 1초, 2초, 4초 대기
status_forcelist=[429, 500, 502, 503, 504], # 재시도할 HTTP 상태
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # 연결 풀 크기
pool_maxsize=10
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 기본 헤더 설정
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
return session
# 사용 예시
class ImprovedCrawler(SimpleCrawler):
def __init__(self, max_workers=5, timeout=15):
super().__init__(max_workers, timeout)
self.session = create_robust_session()
문제 4: IP 차단 / Rate Limiting
• "429 Too Many Requests" 응답
• "403 Forbidden" 응답
• 중간부터 모든 요청 실패
해결 방법
import time
import random
from threading import Semaphore
class RateLimitedCrawler(SimpleCrawler):
"""요청 속도 제한이 있는 크롤러"""
def __init__(self, max_workers=5, timeout=10, requests_per_second=2):
super().__init__(max_workers, timeout)
self.min_delay = 1.0 / requests_per_second
self.semaphore = Semaphore(max_workers)
def fetch_url(self, url: str) -> Dict:
"""속도 제한을 준수하며 크롤링"""
with self.semaphore:
# 랜덤 지연 추가 (사람처럼 보이게)
delay = self.min_delay + random.uniform(0, 0.5)
time.sleep(delay)
return super().fetch_url(url)
# 또는 프록시 로테이션
class ProxyCrawler(SimpleCrawler):
"""프록시를 순환하며 사용하는 크롤러"""
def __init__(self, max_workers=5, timeout=10, proxies=None):
super().__init__(max_workers, timeout)
self.proxies = proxies or []
self.proxy_index = 0
def get_next_proxy(self):
"""다음 프록시 반환"""
if not self.proxies:
return None
proxy = self.proxies[self.proxy_index]
self.proxy_index = (self.proxy_index + 1) % len(self.proxies)
return {'http': proxy, 'https': proxy}
def fetch_url(self, url: str) -> Dict:
"""프록시를 사용하여 크롤링"""
proxy = self.get_next_proxy()
try:
response = self.session.get(url, timeout=self.timeout, proxies=proxy)
# ... 나머지 처리
except Exception as e:
return {'url': url, 'error': str(e), 'success': False}
실시간 모니터링 대시보드
from collections import deque
import threading
class CrawlerMonitor:
"""실시간 크롤링 모니터"""
def __init__(self, window_size=100):
self.success_count = 0
self.error_count = 0
self.recent_times = deque(maxlen=window_size)
self.lock = threading.Lock()
self.start_time = time.time()
def record_success(self, elapsed_time):
"""성공 기록"""
with self.lock:
self.success_count += 1
self.recent_times.append(elapsed_time)
def record_error(self):
"""에러 기록"""
with self.lock:
self.error_count += 1
def get_stats(self):
"""현재 통계 반환"""
with self.lock:
total = self.success_count + self.error_count
success_rate = (self.success_count / total * 100) if total > 0 else 0
avg_time = sum(self.recent_times) / len(self.recent_times) if self.recent_times else 0
elapsed = time.time() - self.start_time
rate = total / elapsed if elapsed > 0 else 0
return {
'total': total,
'success': self.success_count,
'errors': self.error_count,
'success_rate': success_rate,
'avg_response_time': avg_time,
'requests_per_second': rate,
'elapsed_time': elapsed
}
def print_stats(self):
"""통계 출력"""
stats = self.get_stats()
print(f"\r진행: {stats['total']}개 | "
f"성공: {stats['success']} ({stats['success_rate']:.1f}%) | "
f"에러: {stats['errors']} | "
f"속도: {stats['requests_per_second']:.1f} req/s | "
f"평균: {stats['avg_response_time']:.2f}s",
end='', flush=True)
# 사용 예시
class MonitoredCrawler(SimpleCrawler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.monitor = CrawlerMonitor()
def fetch_url(self, url: str) -> Dict:
start = time.time()
result = super().fetch_url(url)
elapsed = time.time() - start
if result.get('success'):
self.monitor.record_success(elapsed)
else:
self.monitor.record_error()
# 100개마다 통계 출력
if (self.monitor.success_count + self.monitor.error_count) % 100 == 0:
self.monitor.print_stats()
return result
상황별 최적 방식 선택 가이드
지금까지 세 가지 병렬처리 방식을 자세히 살펴봤습니다. 이제 여러분의 상황에 맞는 최적의 방식을 선택하는 방법을 정리하겠습니다.
선택 플로우차트
1. 소규모 크롤링 (100-1,000개)
→ Threading 추천
• 구현 간단, 디버깅 쉬움
• 충분한 성능
• 리소스 효율적
2. 중규모 크롤링 (1,000-10,000개)
→ asyncio 추천
• 최고의 성능/리소스 비율
• 대규모 확장 가능
• 메모리 효율적
3. 대규모 크롤링 (10,000개 이상)
→ asyncio + 배치 처리
• 메모리 관리 필수
• 분산 처리 고려
• 모니터링 필수
4. CPU 집약적 작업 (복잡한 파싱)
→ Multiprocessing 추천
• 실제 병렬 연산
• GIL 제약 없음
• 서버 환경 권장
최종 비교표
| 기준 | Threading | Multiprocessing | asyncio |
|---|---|---|---|
| 난이도 | ⭐ 쉬움 | ⭐⭐ 보통 | ⭐⭐⭐ 어려움 |
| 성능 | ⭐⭐⭐ 좋음 | ⭐⭐⭐⭐ 매우 좋음 | ⭐⭐⭐⭐⭐ 최고 |
| 메모리 효율 | ⭐⭐⭐⭐ 효율적 | ⭐⭐ 비효율적 | ⭐⭐⭐⭐⭐ 매우 효율적 |
| CPU 효율 | ⭐⭐ 제한적 | ⭐⭐⭐⭐⭐ 최고 | ⭐⭐⭐ 좋음 |
| 확장성 | ⭐⭐⭐ 중간 | ⭐⭐⭐ 중간 | ⭐⭐⭐⭐⭐ 매우 높음 |
| 디버깅 | ⭐⭐⭐⭐ 쉬움 | ⭐⭐ 어려움 | ⭐⭐⭐ 보통 |
실전 프로젝트 적용 사례
• 선택: asyncio + aiohttp
• 이유: 5,000개 아파트 데이터를 40분만에 수집
• 결과: 8시간 → 40분 (12배 향상)
• Workers: 10개 (concurrent_limit=10)
✅ 열차 시간표 사이트
• 선택: Threading (ThreadPoolExecutor)
• 이유: 공공 API 호출 위주, 간단한 구조
• 결과: 충분한 성능, 유지보수 쉬움
• Workers: 5개
✅ 병원 정보 사이트 (AdSense 준비 중)
• 선택: asyncio
• 이유: Gemini API로 콘텐츠 생성 시 대량 요청
• 예상: 1,000개 페이지 콘텐츠 30분 생성
• Workers: concurrent_limit=15
마치며
크롤링 병렬처리는 단순히 속도만 개선하는 것이 아닙니다. 제한된 리소스로 최대의 효율을 내고, 안정적으로 대량의 데이터를 수집하며, 사용자에게 더 신선한 정보를 제공할 수 있게 합니다.
제 경험상 가장 중요한 것은:
- 단계적 접근: Threading으로 시작해서 필요시 asyncio로 전환
- 측정 기반 최적화: 추측이 아닌 실제 데이터로 결정
- 안정성 우선: 속도보다 에러 처리가 중요
- 점진적 확장: workers 수를 서서히 늘리며 테스트
이 가이드가 여러분의 크롤링 프로젝트에 도움이 되길 바랍니다. 궁금한 점이나 추가로 다뤘으면 하는 내용이 있다면 댓글로 알려주세요!
• Python asyncio 공식 문서: docs.python.org/3/library/asyncio
• aiohttp 튜토리얼: docs.aiohttp.org
• Scrapy 프레임워크: 대규모 크롤링에 특화
• Selenium + 병렬처리: 동적 페이지 크롤링
이 글이 도움이 되셨다면 공유해주세요! 더 많은 분들이 효율적인 크롤링을 할 수 있도록 도와주시면 감사하겠습니다. 😊