FastAPI로 구축한 RoboDine 백엔드 시스템

들어가며

RoboDine 프로젝트에서 가장 중요한 구성 요소 중 하나는 모든 시스템을 연결하는 중앙 백엔드 서버였습니다. 저는 FastAPI를 사용하여 WebSocket, TCP, UDP, RTSP 등 다양한 프로토콜을 지원하는 통합 서버를 구축했습니다. 이 글에서는 실제 구현 과정과 겪었던 기술적 도전들을 상세히 공유하겠습니다.

FastAPI 선택 이유

기술적 장점

  • 고성능: ASGI 기반의 비동기 처리로 높은 동시성 지원
  • 자동 문서화: OpenAPI 스펙 기반 자동 API 문서 생성
  • 타입 힌트: Python 타입 힌트를 활용한 데이터 검증
  • WebSocket 지원: 실시간 통신을 위한 내장 WebSocket 지원

프로젝트 요구사항 적합성

# requirements.txt에서 정의한 핵심 dependencies
fastapi==0.68.0
uvicorn==0.15.0
websockets==10.0
python-multipart==0.0.5
sqlmodel==0.0.6
opencv-python==4.5.3.56
ultralytics==8.0.0

백엔드 아키텍처 설계

실제 구현된 모듈 구조

robodine_service/backend/
├── app/
│   ├── __init__.py
│   ├── core/                  # 핵심 설정 및 유틸리티
│   ├── models/                # 데이터 모델
│   │   ├── robot.py
│   │   ├── order.py
│   │   ├── table.py
│   │   └── ...
│   ├── routes/                # API 라우터들
│   │   ├── websockets.py      # WebSocket 통신
│   │   ├── robot.py          # 로봇 제어 API
│   │   ├── orders.py         # 주문 관리 API
│   │   ├── streaming.py      # 비디오 스트리밍
│   │   ├── tables.py         # 테이블 관리
│   │   ├── customers.py      # 고객 관리
│   │   ├── inventories.py    # 재고 관리
│   │   ├── menu.py           # 메뉴 관리
│   │   ├── events.py         # 이벤트 로깅
│   │   ├── cookbot.py        # 조리로봇 제어
│   │   ├── albabot.py        # 서빙로봇 제어
│   │   └── auth.py           # 인증
│   └── services/              # 비즈니스 로직
│       ├── streaming_service.py
│       ├── ros_service.py
│       ├── cleaning_service.py
│       └── emergency_service.py
└── run.py                     # 메인 실행 파일

실제 WebSocket 구현

실제 프로젝트에서 구현한 WebSocket 시스템은 토픽 기반의 정교한 구조를 가지고 있습니다:

# routes/websockets.py에서 실제 구현된 WebSocket 관리자
class ConnectionManager:
    def __init__(self):
        # 토픽별 활성 연결 저장
        self.active_connections: Dict[str, List[WebSocket]] = {
            "robots": [],
            "tables": [],
            "events": [],
            "orders": [],
            "status": [],
            "systemlogs": [],
            "customers": [],
            "inventory": [],
            "video_streams": [],
            "notifications": [],
            "commands": [],
            "chat": [],
            "menu": []
        }
        self.max_connections_per_topic = 100
        
    async def connect(self, websocket: WebSocket, topic: str):
        """토픽별 연결 관리 및 제한"""
        if len(self.active_connections.get(topic, [])) >= self.max_connections_per_topic:
            await websocket.close(code=1008, reason="Too many connections")
            return False
            
        await websocket.accept()
        self.active_connections[topic].append(websocket)
        logger.info(f"New WebSocket connection for topic: {topic}")
        return True
        
    async def broadcast_update(self, data: Any, topic: str):
        """토픽별 데이터 브로드캐스팅"""
        message = {"type": "update", "topic": topic, "data": data}
        await self.broadcast(message, topic)

토픽 기반 WebSocket 엔드포인트

@router.websocket("/ws/{topic}")
async def websocket_topic_endpoint(websocket: WebSocket, topic: str):
    """통합 WebSocket 엔드포인트 - 토픽별 구독"""
    valid_topics = [
        "robots", "tables", "events", "orders", "status", 
        "systemlogs", "customers", "inventory", "video_streams", 
        "notifications", "commands", "chat", "menu"
    ]
    
    if topic not in valid_topics:
        await websocket.close(code=1003)
        return
        
    connected = await manager.connect(websocket, topic)
    if not connected:
        return
        
    try:
        # 초기 데이터 전송 (robots 토픽의 경우)
        if topic == "robots":
            # 현재 로봇 상태를 즉시 전송
            current_robots = await get_current_robots_status()
            await websocket.send_text(json.dumps({
                "type": "init",
                "topic": "robots", 
                "data": current_robots
            }))
            
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            
            # 클라이언트 메시지 처리
            if message.get("type") == "ping":
                await websocket.send_text(json.dumps({"type": "pong"}))
                
    except WebSocketDisconnect:
        manager.disconnect(websocket, topic)

멀티 프로토콜 통신 시스템

실제 구현된 통신 프로토콜

실제 프로젝트에서는 다음과 같은 통신 방식을 구현했습니다:

프로토콜 용도 포트 구현 위치
WebSocket 실시간 클라이언트 통신 8000/ws /routes/websockets.py
HTTP/REST 기본 CRUD 작업 8000 /routes/*.py
RTSP 비디오 스트리밍 8554 /routes/streaming.py
ROS2 로봇 간 통신 - /services/ros_service.py

실시간 데이터 브로드캐스팅 시스템

# 실제 구현된 브로드캐스팅 함수들
async def broadcast_robots_update(robots_data):
    """로봇 상태 업데이트 브로드캐스팅"""
    await manager.broadcast_update(robots_data, "robots")

async def broadcast_orders_update(orders_data):
    """주문 상태 업데이트 브로드캐스팅"""
    await manager.broadcast_update(orders_data, "orders")

async def broadcast_events_update(events_data):
    """시스템 이벤트 브로드캐스팅"""
    await manager.broadcast_update(events_data, "events")

# 자동 핑-퐁 시스템
async def send_ping_to_all(self):
    """모든 연결에 30초마다 핑 전송"""
    while not self.shutting_down:
        ping_message = {
            "type": "ping",
            "topic": "status",
            "data": {"timestamp": asyncio.get_event_loop().time()}
        }
        
        for topic, connections in self.active_connections.items():
            for conn in connections[:]:
                try:
                    await conn.send_text(json.dumps(ping_message))
                except Exception:
                    self.disconnect(conn, topic)
                    
        await asyncio.sleep(30)  # 30초 간격

로봇 제어 API 구현

조리 로봇(COOKBOT) 제어

# routes/cookbot.py에서 실제 구현
from fastapi import APIRouter, HTTPException
from ..services.ros_service import ROSService

router = APIRouter()
ros_service = ROSService()

@router.post("/cookbot/{robot_id}/start_cooking")
async def start_cooking(robot_id: str, recipe_data: dict):
    """조리 시작 명령"""
    try:
        result = await ros_service.send_cooking_command(robot_id, recipe_data)
        await broadcast_robots_update({
            "robot_id": robot_id,
            "status": "cooking",
            "recipe": recipe_data
        })
        return {"status": "success", "data": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/cookbot/{robot_id}/emergency_stop")
async def emergency_stop(robot_id: str):
    """긴급 정지"""
    await ros_service.emergency_stop(robot_id)
    await broadcast_robots_update({
        "robot_id": robot_id,
        "status": "emergency_stopped"
    })
    return {"status": "stopped"}

서빙 로봇(ALBABOT) 제어

# routes/albabot.py에서 실제 구현
@router.post("/albabot/{robot_id}/move_to_table")
async def move_to_table(robot_id: str, table_id: str):
    """테이블로 이동 명령"""
    try:
        # ROS2 Navigation2를 통한 이동 명령
        navigation_result = await ros_service.navigate_to_table(robot_id, table_id)
        
        # 실시간 위치 업데이트
        await broadcast_robots_update({
            "robot_id": robot_id,
            "status": "moving",
            "destination": table_id,
            "eta": navigation_result.get("eta")
        })
        
        return {"status": "moving", "destination": table_id}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

비디오 스트리밍 시스템

RTSP 스트리밍 구현

# routes/streaming.py에서 실제 구현 (일부)
import cv2
import asyncio
from fastapi.responses import StreamingResponse

@router.get("/video/live/{camera_id}")
async def get_live_video_stream(camera_id: str):
    """실시간 비디오 스트림"""
    async def generate_frames():
        cap = cv2.VideoCapture(f"rtsp://camera_{camera_id}")
        
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                    
                # 프레임 압축 및 전송
                _, buffer = cv2.imencode('.jpg', frame, 
                    [cv2.IMWRITE_JPEG_QUALITY, 70])
                
                yield (b'--frame\r\n'
                       b'Content-Type: image/jpeg\r\n\r\n' + 
                       buffer.tobytes() + b'\r\n')
                       
                await asyncio.sleep(0.033)  # ~30 FPS
        finally:
            cap.release()
    
    return StreamingResponse(
        generate_frames(), 
        media_type="multipart/x-mixed-replace; boundary=frame"
    )

통합 서버 실행 시스템

모든 프로토콜을 하나의 프로세스에서 관리하는 메인 실행 파일:

# run.py
import asyncio
import uvicorn
from app.main import app
from services.tcp_server import tcp_server
from services.udp_receiver import udp_receiver
import threading

class RoboDineServer:
    def __init__(self):
        self.tasks = []
        
    async def start_all_services(self):
        """모든 서비스 시작"""
        print("Starting RoboDine Backend Services...")
        
        # TCP 서버 시작
        tcp_task = asyncio.create_task(tcp_server.start_server())
        self.tasks.append(tcp_task)
        
        # UDP 수신기 시작
        udp_task = asyncio.create_task(udp_receiver.start_receiver())
        self.tasks.append(udp_task)
        
        print("All background services started")
        
        # FastAPI 서버는 메인 스레드에서 실행
        uvicorn.run(
            "app.main:app",
            host="0.0.0.0",
            port=8000,
            reload=False,
            access_log=True
        )
    
    async def shutdown(self):
        """모든 서비스 종료"""
        print("Shutting down services...")
        
        for task in self.tasks:
            task.cancel()
            
        udp_receiver.stop()
        
        await asyncio.gather(*self.tasks, return_exceptions=True)

if __name__ == "__main__":
    server = RoboDineServer()
    try:
        asyncio.run(server.start_all_services())
    except KeyboardInterrupt:
        print("Server stopped by user")
    finally:
        asyncio.run(server.shutdown())

개발 과정에서의 도전과 해결

1. 멀티 프로토콜 통합의 복잡성

문제: 하나의 애플리케이션에서 WebSocket, TCP, UDP를 모두 처리하면서 발생하는 이벤트 루프 충돌

해결: 각 프로토콜을 독립적인 asyncio 태스크로 분리하고, 공통 메시지 큐를 통한 통신

2. 실시간 성능 요구사항

문제: 로봇 제어 명령의 지연이 시스템 전체 성능에 미치는 영향

해결:

  • TCP 연결의 Keep-Alive 설정
  • UDP를 통한 비전 데이터의 무손실 전송
  • WebSocket 메시지 우선순위 처리

3. 메모리 및 리소스 관리

문제: 장시간 운영 시 메모리 누수와 연결 누적

해결:

  • 주기적인 비활성 연결 정리
  • 가비지 컬렉션 최적화
  • 리소스 모니터링 대시보드 구현

결론

FastAPI를 사용한 RoboDine 백엔드 개발은 매우 도전적이면서도 흥미로운 경험이었습니다. 멀티 프로토콜 통신, 실시간 데이터 처리, 로봇 제어 등 다양한 요구사항을 하나의 시스템에서 처리하면서 많은 것을 배웠습니다.

특히 비동기 프로그래밍의 중요성과 복잡한 시스템에서의 에러 처리, 성능 최적화에 대한 깊은 이해를 얻을 수 있었습니다.

다음 포스트에서는 이 백엔드와 연동되는 React 프론트엔드 개발 과정을 다루겠습니다.


RoboDine 백엔드 시스템은 프로젝트의 핵심 인프라 역할을 성공적으로 수행했습니다. 실제 코드와 더 자세한 구현 내용은 GitHub 저장소에서 확인하실 수 있습니다.




Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • RoboDine 프로젝트 완료 - 2개월간의 로봇 레스토랑 개발 여정
  • RoboDine 데이터베이스 설계 - SQLModel과 관계형 데이터 모델링
  • RoboDine 키오스크 개발 - React 기반 고객 주문 인터페이스
  • RoboDine 운영자 대시보드 개발 - React 기반 실시간 관제 시스템
  • RoboDine 프로젝트의 컴퓨터 비전 기술 구현