FastAPI로 구축한 RoboDine 백엔드 시스템
들어가며
RoboDine 프로젝트에서 가장 중요한 구성 요소 중 하나는 모든 시스템을 연결하는 중앙 백엔드 서버였습니다. 저는 FastAPI를 사용하여 WebSocket, TCP, UDP, RTSP 등 다양한 프로토콜을 지원하는 통합 서버를 구축했습니다. 이 글에서는 실제 구현 과정과 겪었던 기술적 도전들을 상세히 공유하겠습니다.
FastAPI 선택 이유
기술적 장점
- 고성능: ASGI 기반의 비동기 처리로 높은 동시성 지원
- 자동 문서화: OpenAPI 스펙 기반 자동 API 문서 생성
- 타입 힌트: Python 타입 힌트를 활용한 데이터 검증
- WebSocket 지원: 실시간 통신을 위한 내장 WebSocket 지원
프로젝트 요구사항 적합성
# requirements.txt에서 정의한 핵심 dependencies
fastapi==0.68.0
uvicorn==0.15.0
websockets==10.0
python-multipart==0.0.5
sqlmodel==0.0.6
opencv-python==4.5.3.56
ultralytics==8.0.0
백엔드 아키텍처 설계
실제 구현된 모듈 구조
robodine_service/backend/
├── app/
│ ├── __init__.py
│ ├── core/ # 핵심 설정 및 유틸리티
│ ├── models/ # 데이터 모델
│ │ ├── robot.py
│ │ ├── order.py
│ │ ├── table.py
│ │ └── ...
│ ├── routes/ # API 라우터들
│ │ ├── websockets.py # WebSocket 통신
│ │ ├── robot.py # 로봇 제어 API
│ │ ├── orders.py # 주문 관리 API
│ │ ├── streaming.py # 비디오 스트리밍
│ │ ├── tables.py # 테이블 관리
│ │ ├── customers.py # 고객 관리
│ │ ├── inventories.py # 재고 관리
│ │ ├── menu.py # 메뉴 관리
│ │ ├── events.py # 이벤트 로깅
│ │ ├── cookbot.py # 조리로봇 제어
│ │ ├── albabot.py # 서빙로봇 제어
│ │ └── auth.py # 인증
│ └── services/ # 비즈니스 로직
│ ├── streaming_service.py
│ ├── ros_service.py
│ ├── cleaning_service.py
│ └── emergency_service.py
└── run.py # 메인 실행 파일
실제 WebSocket 구현
실제 프로젝트에서 구현한 WebSocket 시스템은 토픽 기반의 정교한 구조를 가지고 있습니다:
# routes/websockets.py에서 실제 구현된 WebSocket 관리자
class ConnectionManager:
def __init__(self):
# 토픽별 활성 연결 저장
self.active_connections: Dict[str, List[WebSocket]] = {
"robots": [],
"tables": [],
"events": [],
"orders": [],
"status": [],
"systemlogs": [],
"customers": [],
"inventory": [],
"video_streams": [],
"notifications": [],
"commands": [],
"chat": [],
"menu": []
}
self.max_connections_per_topic = 100
async def connect(self, websocket: WebSocket, topic: str):
"""토픽별 연결 관리 및 제한"""
if len(self.active_connections.get(topic, [])) >= self.max_connections_per_topic:
await websocket.close(code=1008, reason="Too many connections")
return False
await websocket.accept()
self.active_connections[topic].append(websocket)
logger.info(f"New WebSocket connection for topic: {topic}")
return True
async def broadcast_update(self, data: Any, topic: str):
"""토픽별 데이터 브로드캐스팅"""
message = {"type": "update", "topic": topic, "data": data}
await self.broadcast(message, topic)
토픽 기반 WebSocket 엔드포인트
@router.websocket("/ws/{topic}")
async def websocket_topic_endpoint(websocket: WebSocket, topic: str):
"""통합 WebSocket 엔드포인트 - 토픽별 구독"""
valid_topics = [
"robots", "tables", "events", "orders", "status",
"systemlogs", "customers", "inventory", "video_streams",
"notifications", "commands", "chat", "menu"
]
if topic not in valid_topics:
await websocket.close(code=1003)
return
connected = await manager.connect(websocket, topic)
if not connected:
return
try:
# 초기 데이터 전송 (robots 토픽의 경우)
if topic == "robots":
# 현재 로봇 상태를 즉시 전송
current_robots = await get_current_robots_status()
await websocket.send_text(json.dumps({
"type": "init",
"topic": "robots",
"data": current_robots
}))
while True:
data = await websocket.receive_text()
message = json.loads(data)
# 클라이언트 메시지 처리
if message.get("type") == "ping":
await websocket.send_text(json.dumps({"type": "pong"}))
except WebSocketDisconnect:
manager.disconnect(websocket, topic)
멀티 프로토콜 통신 시스템
실제 구현된 통신 프로토콜
실제 프로젝트에서는 다음과 같은 통신 방식을 구현했습니다:
프로토콜 | 용도 | 포트 | 구현 위치 |
---|---|---|---|
WebSocket | 실시간 클라이언트 통신 | 8000/ws | /routes/websockets.py |
HTTP/REST | 기본 CRUD 작업 | 8000 | 각 /routes/*.py |
RTSP | 비디오 스트리밍 | 8554 | /routes/streaming.py |
ROS2 | 로봇 간 통신 | - | /services/ros_service.py |
실시간 데이터 브로드캐스팅 시스템
# 실제 구현된 브로드캐스팅 함수들
async def broadcast_robots_update(robots_data):
"""로봇 상태 업데이트 브로드캐스팅"""
await manager.broadcast_update(robots_data, "robots")
async def broadcast_orders_update(orders_data):
"""주문 상태 업데이트 브로드캐스팅"""
await manager.broadcast_update(orders_data, "orders")
async def broadcast_events_update(events_data):
"""시스템 이벤트 브로드캐스팅"""
await manager.broadcast_update(events_data, "events")
# 자동 핑-퐁 시스템
async def send_ping_to_all(self):
"""모든 연결에 30초마다 핑 전송"""
while not self.shutting_down:
ping_message = {
"type": "ping",
"topic": "status",
"data": {"timestamp": asyncio.get_event_loop().time()}
}
for topic, connections in self.active_connections.items():
for conn in connections[:]:
try:
await conn.send_text(json.dumps(ping_message))
except Exception:
self.disconnect(conn, topic)
await asyncio.sleep(30) # 30초 간격
로봇 제어 API 구현
조리 로봇(COOKBOT) 제어
# routes/cookbot.py에서 실제 구현
from fastapi import APIRouter, HTTPException
from ..services.ros_service import ROSService
router = APIRouter()
ros_service = ROSService()
@router.post("/cookbot/{robot_id}/start_cooking")
async def start_cooking(robot_id: str, recipe_data: dict):
"""조리 시작 명령"""
try:
result = await ros_service.send_cooking_command(robot_id, recipe_data)
await broadcast_robots_update({
"robot_id": robot_id,
"status": "cooking",
"recipe": recipe_data
})
return {"status": "success", "data": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/cookbot/{robot_id}/emergency_stop")
async def emergency_stop(robot_id: str):
"""긴급 정지"""
await ros_service.emergency_stop(robot_id)
await broadcast_robots_update({
"robot_id": robot_id,
"status": "emergency_stopped"
})
return {"status": "stopped"}
서빙 로봇(ALBABOT) 제어
# routes/albabot.py에서 실제 구현
@router.post("/albabot/{robot_id}/move_to_table")
async def move_to_table(robot_id: str, table_id: str):
"""테이블로 이동 명령"""
try:
# ROS2 Navigation2를 통한 이동 명령
navigation_result = await ros_service.navigate_to_table(robot_id, table_id)
# 실시간 위치 업데이트
await broadcast_robots_update({
"robot_id": robot_id,
"status": "moving",
"destination": table_id,
"eta": navigation_result.get("eta")
})
return {"status": "moving", "destination": table_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
비디오 스트리밍 시스템
RTSP 스트리밍 구현
# routes/streaming.py에서 실제 구현 (일부)
import cv2
import asyncio
from fastapi.responses import StreamingResponse
@router.get("/video/live/{camera_id}")
async def get_live_video_stream(camera_id: str):
"""실시간 비디오 스트림"""
async def generate_frames():
cap = cv2.VideoCapture(f"rtsp://camera_{camera_id}")
try:
while True:
ret, frame = cap.read()
if not ret:
break
# 프레임 압축 및 전송
_, buffer = cv2.imencode('.jpg', frame,
[cv2.IMWRITE_JPEG_QUALITY, 70])
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' +
buffer.tobytes() + b'\r\n')
await asyncio.sleep(0.033) # ~30 FPS
finally:
cap.release()
return StreamingResponse(
generate_frames(),
media_type="multipart/x-mixed-replace; boundary=frame"
)
통합 서버 실행 시스템
모든 프로토콜을 하나의 프로세스에서 관리하는 메인 실행 파일:
# run.py
import asyncio
import uvicorn
from app.main import app
from services.tcp_server import tcp_server
from services.udp_receiver import udp_receiver
import threading
class RoboDineServer:
def __init__(self):
self.tasks = []
async def start_all_services(self):
"""모든 서비스 시작"""
print("Starting RoboDine Backend Services...")
# TCP 서버 시작
tcp_task = asyncio.create_task(tcp_server.start_server())
self.tasks.append(tcp_task)
# UDP 수신기 시작
udp_task = asyncio.create_task(udp_receiver.start_receiver())
self.tasks.append(udp_task)
print("All background services started")
# FastAPI 서버는 메인 스레드에서 실행
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=False,
access_log=True
)
async def shutdown(self):
"""모든 서비스 종료"""
print("Shutting down services...")
for task in self.tasks:
task.cancel()
udp_receiver.stop()
await asyncio.gather(*self.tasks, return_exceptions=True)
if __name__ == "__main__":
server = RoboDineServer()
try:
asyncio.run(server.start_all_services())
except KeyboardInterrupt:
print("Server stopped by user")
finally:
asyncio.run(server.shutdown())
개발 과정에서의 도전과 해결
1. 멀티 프로토콜 통합의 복잡성
문제: 하나의 애플리케이션에서 WebSocket, TCP, UDP를 모두 처리하면서 발생하는 이벤트 루프 충돌
해결: 각 프로토콜을 독립적인 asyncio 태스크로 분리하고, 공통 메시지 큐를 통한 통신
2. 실시간 성능 요구사항
문제: 로봇 제어 명령의 지연이 시스템 전체 성능에 미치는 영향
해결:
- TCP 연결의 Keep-Alive 설정
- UDP를 통한 비전 데이터의 무손실 전송
- WebSocket 메시지 우선순위 처리
3. 메모리 및 리소스 관리
문제: 장시간 운영 시 메모리 누수와 연결 누적
해결:
- 주기적인 비활성 연결 정리
- 가비지 컬렉션 최적화
- 리소스 모니터링 대시보드 구현
결론
FastAPI를 사용한 RoboDine 백엔드 개발은 매우 도전적이면서도 흥미로운 경험이었습니다. 멀티 프로토콜 통신, 실시간 데이터 처리, 로봇 제어 등 다양한 요구사항을 하나의 시스템에서 처리하면서 많은 것을 배웠습니다.
특히 비동기 프로그래밍의 중요성과 복잡한 시스템에서의 에러 처리, 성능 최적화에 대한 깊은 이해를 얻을 수 있었습니다.
다음 포스트에서는 이 백엔드와 연동되는 React 프론트엔드 개발 과정을 다루겠습니다.
RoboDine 백엔드 시스템은 프로젝트의 핵심 인프라 역할을 성공적으로 수행했습니다. 실제 코드와 더 자세한 구현 내용은 GitHub 저장소에서 확인하실 수 있습니다.
Enjoy Reading This Article?
Here are some more articles you might like to read next: