RoboDine 시스템 아키텍처 설계와 구현

프로젝트 개요

RoboDine은 2025년 4월부터 6월까지 진행된 ROS2와 AI 기술을 활용한 레스토랑 자동화 시스템 프로젝트입니다. 6명의 팀원이 협력하여 주문부터 조리, 서빙까지의 전체 프로세스를 자동화하는 통합 시스템을 개발했습니다. 저는 이 프로젝트에서 시스템 아키텍처 설계, 통신 체계 구축, 웹 기반 UI 개발을 담당했습니다.

기술적 목표

  • 완전 자동화: 인간의 개입 없이 레스토랑 운영 자동화
  • 실시간 모니터링: 모든 시스템 구성요소의 실시간 상태 추적
  • 확장 가능한 아키텍처: 추가 로봇과 기능의 손쉬운 통합
  • 안정적인 서비스: 24시간 무중단 운영 가능한 시스템

RoboDine 시스템 구성 요소

프로젝트는 다음과 같은 주요 모듈들로 구성되어 있습니다:

  • 🍳 COOKBOT: 6축 로봇팔 기반 조리 시스템 (myCobot)
  • 🤖 ALBABOT: 자율주행 서빙 로봇들
  • 🎛️ RoboDine Service: FastAPI 기반 중앙 제어 시스템
  • 📱 키오스크 & 대시보드: React 기반 사용자 인터페이스
  • 🤖 CookGPT: YOLOv8 기반 비전 시스템
  • 📡 통신 인프라: WebSocket, TCP, UDP, RTSP 멀티 프로토콜

시스템 아키텍처 설계

계층화된 아키텍처

복잡한 로봇 시스템을 효과적으로 관리하기 위해 계층화된 아키텍처를 채택했습니다:

┌─────────────────────────────────────┐
│        Application Layer           │  ← 비즈니스 로직, 사용자 인터페이스
├─────────────────────────────────────┤
│        Orchestration Layer         │  ← 작업 스케줄링, 로봇 간 협업
├─────────────────────────────────────┤
│        Communication Layer         │  ← 실시간 통신, 상태 동기화
├─────────────────────────────────────┤
│        Hardware Abstraction        │  ← 로봇 제어, 센서 데이터 처리
└─────────────────────────────────────┘

핵심 기술적 도전과 해결책

1. 상태 동기화 문제

다중 로봇 환경에서 가장 큰 도전 중 하나는 모든 로봇과 시스템 간의 상태를 일관성 있게 유지하는 것이었습니다.

해결 방안: 중앙집중식 상태 관리

class SystemStateManager:
    def __init__(self):
        self.global_state = {
            "robots": {},      # 로봇 상태
            "tables": {},      # 테이블 상태  
            "orders": {},      # 주문 상태
            "inventory": {}    # 재고 상태
        }
        self.state_lock = asyncio.Lock()
        
    async def update_robot_state(self, robot_id: str, new_state: dict):
        """로봇 상태 업데이트 및 관련 시스템에 브로드캐스트"""
        async with self.state_lock:
            old_state = self.global_state["robots"].get(robot_id, {})
            self.global_state["robots"][robot_id] = {
                **old_state,
                **new_state,
                "last_updated": datetime.now().isoformat()
            }
            
            # 상태 변경 이벤트 발생
            await self.broadcast_state_change("robots", robot_id, new_state)
            
    async def get_available_robots(self, robot_type: str = None):
        """사용 가능한 로봇 목록 조회"""
        available_robots = []
        
        for robot_id, state in self.global_state["robots"].items():
            if state.get("status") == "idle":
                if robot_type is None or state.get("type") == robot_type:
                    available_robots.append(robot_id)
                    
        return available_robots

2. 작업 스케줄링과 우선순위 관리

여러 주문이 동시에 들어오고, 각 로봇이 서로 다른 작업을 수행해야 하는 상황에서 효율적인 스케줄링이 필요했습니다.

우선순위 기반 작업 큐

import heapq
from dataclasses import dataclass
from enum import Enum

class TaskPriority(Enum):
    EMERGENCY = 1      # 비상 상황 (청소 등)
    HIGH = 2           # 긴급 주문
    NORMAL = 3         # 일반 주문
    LOW = 4            # 예방 정비 등

@dataclass
class Task:
    id: str
    priority: TaskPriority
    robot_type_required: str
    estimated_duration: int  # 분 단위
    dependencies: List[str] = None
    created_at: datetime = None
    
    def __lt__(self, other):
        return self.priority.value < other.priority.value

class TaskScheduler:
    def __init__(self):
        self.task_queue = []
        self.active_tasks = {}
        self.completed_tasks = {}
        
    async def add_task(self, task: Task):
        """작업을 큐에 추가"""
        heapq.heappush(self.task_queue, task)
        await self.try_assign_tasks()
        
    async def try_assign_tasks(self):
        """사용 가능한 로봇에 작업 할당"""
        while self.task_queue:
            task = heapq.heappop(self.task_queue)
            
            # 의존성 체크
            if not await self.check_dependencies(task):
                heapq.heappush(self.task_queue, task)
                break
                
            # 적절한 로봇 찾기
            suitable_robot = await self.find_suitable_robot(task)
            if suitable_robot:
                await self.assign_task_to_robot(task, suitable_robot)
            else:
                # 적절한 로봇이 없으면 다시 큐에 추가
                heapq.heappush(self.task_queue, task)
                break

ROS2 기반 통신 아키텍처

로봇 간 통신과 제어를 위해 ROS2를 활용했습니다.

토픽 설계

# ROS2 토픽 구조
ROS2_TOPICS = {
    # 로봇 상태 관련
    "/robot/{robot_id}/status": "로봇 상태 정보",
    "/robot/{robot_id}/battery": "배터리 정보", 
    "/robot/{robot_id}/location": "위치 정보",
    
    # 작업 관련
    "/tasks/assignments": "작업 할당",
    "/tasks/updates": "작업 진행 상황",
    "/tasks/completion": "작업 완료 알림",
    
    # 환경 정보
    "/environment/obstacles": "장애물 정보",
    "/environment/occupancy": "공간 점유 상태",
    
    # 시스템 제어
    "/system/emergency": "비상 상황",
    "/system/commands": "시스템 명령"
}

ROS2-HTTP 브리지 구현

웹 기반 인터페이스와 ROS2 시스템 간의 통신을 위한 브리지를 구현했습니다:

class ROS2HTTPBridge:
    def __init__(self):
        rclpy.init()
        self.node = rclpy.create_node('http_bridge')
        self.publishers = {}
        self.subscribers = {}
        self.message_queue = asyncio.Queue()
        
    async def publish_to_ros(self, topic: str, message: dict):
        """HTTP 요청을 ROS2 토픽으로 발행"""
        if topic not in self.publishers:
            # 동적으로 publisher 생성
            msg_type = self.get_message_type(topic)
            self.publishers[topic] = self.node.create_publisher(
                msg_type, topic, 10
            )
        
        ros_message = self.convert_dict_to_ros_message(message, topic)
        self.publishers[topic].publish(ros_message)
        
    def ros_callback(self, topic: str):
        """ROS2 메시지를 HTTP로 전달하기 위한 콜백"""
        def callback(msg):
            http_message = self.convert_ros_to_dict(msg)
            asyncio.create_task(
                self.message_queue.put((topic, http_message))
            )
        return callback
        
    async def get_ros_messages(self):
        """ROS2 메시지를 HTTP 응답으로 변환"""
        while True:
            topic, message = await self.message_queue.get()
            yield {
                "topic": topic,
                "data": message,
                "timestamp": datetime.now().isoformat()
            }

사용자 인터페이스 설계

키오스크 GUI 시스템

고객이 사용하는 키오스크 인터페이스는 직관적이고 접근성이 높도록 설계했습니다:

// React 기반 키오스크 인터페이스
interface KioskState {
  currentOrder: Order;
  availableMenus: MenuItem[];
  systemStatus: 'online' | 'offline' | 'maintenance';
  estimatedWaitTime: number;
}

const KioskComponent: React.FC = () => {
  const [state, setState] = useState<KioskState>({
    currentOrder: { items: [], total: 0 },
    availableMenus: [],
    systemStatus: 'online',
    estimatedWaitTime: 0
  });

  // WebSocket을 통한 실시간 상태 업데이트
  useEffect(() => {
    const ws = new WebSocket('ws://localhost:8000/ws/orders');
    
    ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      if (message.type === 'wait_time_update') {
        setState(prev => ({
          ...prev,
          estimatedWaitTime: message.data.estimatedTime
        }));
      }
    };
    
    return () => ws.close();
  }, []);

  return (
    <div className="kiosk-interface">
      <StatusBar 
        systemStatus={state.systemStatus}
        waitTime={state.estimatedWaitTime}
      />
      <MenuGrid menus={state.availableMenus} />
      <OrderSummary order={state.currentOrder} />
      <PaymentSection />
    </div>
  );
};

관제 시스템 대시보드

로봇 상태와 시스템 전체를 모니터링하는 대시보드입니다:

// 실시간 모니터링 대시보드
class MonitoringDashboard {
  constructor() {
    this.websockets = new Map();
    this.initializeConnections();
  }

  initializeConnections() {
    const topics = ['robots', 'orders', 'system'];
    
    topics.forEach(topic => {
      const ws = new WebSocket(`ws://localhost:8000/ws/${topic}`);
      
      ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        this.updateUI(topic, data);
      };
      
      this.websockets.set(topic, ws);
    });
  }

  updateUI(topic, data) {
    switch(topic) {
      case 'robots':
        this.updateRobotStatus(data);
        break;
      case 'orders':
        this.updateOrderQueue(data);
        break;
      case 'system':
        this.updateSystemMetrics(data);
        break;
    }
  }

  updateRobotStatus(robotData) {
    const robotElements = document.querySelectorAll('.robot-card');
    robotElements.forEach(element => {
      const robotId = element.dataset.robotId;
      if (robotData[robotId]) {
        this.renderRobotCard(element, robotData[robotId]);
      }
    });
  }
}

성능 최적화와 확장성

비동기 처리 최적화

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncTaskProcessor:
    def __init__(self, max_workers=10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.task_semaphore = asyncio.Semaphore(max_workers)
        
    async def process_heavy_task(self, task_data):
        """CPU 집약적 작업을 별도 스레드에서 처리"""
        async with self.task_semaphore:
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                self.executor, 
                self.heavy_computation, 
                task_data
            )
            return result
    
    def heavy_computation(self, data):
        """실제 연산 작업 (6D 포즈 추정 등)"""
        # 시간이 오래 걸리는 작업들
        result = perform_computer_vision_task(data)
        return result

캐싱 전략

from functools import lru_cache
import redis

class CacheManager:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.local_cache = {}
    
    @lru_cache(maxsize=1000)
    def get_robot_capabilities(self, robot_id: str):
        """로봇 능력 정보 캐싱 (변경 빈도가 낮음)"""
        return self.fetch_robot_capabilities(robot_id)
    
    async def get_cached_data(self, key: str, fetch_func, ttl: int = 300):
        """Redis 캐시 with fallback"""
        # Redis에서 먼저 확인
        cached_data = self.redis_client.get(key)
        if cached_data:
            return json.loads(cached_data)
        
        # 캐시 미스 시 데이터 가져오기
        fresh_data = await fetch_func()
        self.redis_client.setex(key, ttl, json.dumps(fresh_data))
        
        return fresh_data

테스트와 검증

통합 테스트 시나리오

import pytest
import asyncio

class IntegrationTestSuite:
    @pytest.mark.asyncio
    async def test_full_order_workflow(self):
        """전체 주문 처리 워크플로우 테스트"""
        
        # 1. 주문 생성
        order_data = {
            "items": [{"menu_id": "salad_001", "quantity": 1}],
            "table_number": 5
        }
        
        order_id = await self.create_order(order_data)
        assert order_id is not None
        
        # 2. 로봇 할당 확인
        await asyncio.sleep(2)  # 스케줄링 대기
        assigned_robots = await self.get_assigned_robots(order_id)
        assert len(assigned_robots) > 0
        
        # 3. 조리 과정 시뮬레이션
        cookbot_id = assigned_robots[0]
        await self.simulate_cooking_process(cookbot_id, order_id)
        
        # 4. 서빙 과정 시뮬레이션  
        albabot_id = assigned_robots[1]
        await self.simulate_serving_process(albabot_id, order_id)
        
        # 5. 최종 상태 확인
        final_status = await self.get_order_status(order_id)
        assert final_status == "completed"

    async def simulate_cooking_process(self, robot_id: str, order_id: str):
        """조리 과정 시뮬레이션"""
        steps = [
            "ingredient_preparation",
            "cooking_start", 
            "cooking_complete",
            "plating"
        ]
        
        for step in steps:
            await self.update_robot_status(robot_id, {
                "current_task": step,
                "order_id": order_id
            })
            await asyncio.sleep(1)

배운 교훈과 개선 방향

1. 시스템 복잡성 관리

다중 로봇 시스템의 복잡성은 예상보다 훨씬 높았습니다. 각 컴포넌트가 독립적으로는 잘 작동하지만, 전체 시스템으로 통합했을 때 예상치 못한 상호작용이 발생했습니다.

해결책: 점진적 통합과 철저한 로깅 시스템을 구축했습니다.

2. 실시간성 vs 안정성

실시간 응답성을 높이려고 하면 시스템의 안정성이 떨어지는 트레이드오프가 있었습니다.

해결책: 적응적 타임아웃과 degraded mode 운영을 도입했습니다.

3. 사용자 경험의 중요성

기술적으로 완벽해도 사용자가 이해하기 어려우면 의미가 없다는 것을 깨달았습니다.

해결책: 사용자 피드백을 반영한 반복적인 UI/UX 개선을 진행했습니다.

결론

다중 로봇 시스템 설계는 단순히 로봇 개발에 그치지 않고, 시스템 엔지니어링, 네트워크, 사용자 경험 등 다양한 분야의 종합적인 접근이 필요한 복합적인 프로젝트였습니다.

프로젝트를 통해 로봇공학의 미래가 단순히 개별 로봇의 성능 향상에 있는 것이 아니라, 로봇들이 어떻게 협력하고 인간과 상호작용하느냐에 있다는 것을 깨달았습니다.

앞으로는 AI를 활용한 더 지능적인 작업 스케줄링과 예측 기반 시스템 최적화에 집중해보고 싶습니다.


이 글이 로봇 시스템 개발에 관심 있는 분들께 도움이 되었기를 바랍니다. 프로젝트에 대한 더 자세한 내용이나 기술적 질문이 있으시면 언제든 댓글로 남겨주세요.




Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • RoboDine 프로젝트 완료 - 2개월간의 로봇 레스토랑 개발 여정
  • RoboDine 데이터베이스 설계 - SQLModel과 관계형 데이터 모델링
  • RoboDine 키오스크 개발 - React 기반 고객 주문 인터페이스
  • FastAPI로 구축한 RoboDine 백엔드 시스템
  • RoboDine 운영자 대시보드 개발 - React 기반 실시간 관제 시스템