RoboDine 시스템 아키텍처 설계와 구현
프로젝트 개요
RoboDine은 2025년 4월부터 6월까지 진행된 ROS2와 AI 기술을 활용한 레스토랑 자동화 시스템 프로젝트입니다. 6명의 팀원이 협력하여 주문부터 조리, 서빙까지의 전체 프로세스를 자동화하는 통합 시스템을 개발했습니다. 저는 이 프로젝트에서 시스템 아키텍처 설계, 통신 체계 구축, 웹 기반 UI 개발을 담당했습니다.
기술적 목표
- 완전 자동화: 인간의 개입 없이 레스토랑 운영 자동화
- 실시간 모니터링: 모든 시스템 구성요소의 실시간 상태 추적
- 확장 가능한 아키텍처: 추가 로봇과 기능의 손쉬운 통합
- 안정적인 서비스: 24시간 무중단 운영 가능한 시스템
RoboDine 시스템 구성 요소
프로젝트는 다음과 같은 주요 모듈들로 구성되어 있습니다:
- 🍳 COOKBOT: 6축 로봇팔 기반 조리 시스템 (myCobot)
- 🤖 ALBABOT: 자율주행 서빙 로봇들
- 🎛️ RoboDine Service: FastAPI 기반 중앙 제어 시스템
- 📱 키오스크 & 대시보드: React 기반 사용자 인터페이스
- 🤖 CookGPT: YOLOv8 기반 비전 시스템
- 📡 통신 인프라: WebSocket, TCP, UDP, RTSP 멀티 프로토콜
시스템 아키텍처 설계
계층화된 아키텍처
복잡한 로봇 시스템을 효과적으로 관리하기 위해 계층화된 아키텍처를 채택했습니다:
┌─────────────────────────────────────┐
│ Application Layer │ ← 비즈니스 로직, 사용자 인터페이스
├─────────────────────────────────────┤
│ Orchestration Layer │ ← 작업 스케줄링, 로봇 간 협업
├─────────────────────────────────────┤
│ Communication Layer │ ← 실시간 통신, 상태 동기화
├─────────────────────────────────────┤
│ Hardware Abstraction │ ← 로봇 제어, 센서 데이터 처리
└─────────────────────────────────────┘
핵심 기술적 도전과 해결책
1. 상태 동기화 문제
다중 로봇 환경에서 가장 큰 도전 중 하나는 모든 로봇과 시스템 간의 상태를 일관성 있게 유지하는 것이었습니다.
해결 방안: 중앙집중식 상태 관리
class SystemStateManager:
def __init__(self):
self.global_state = {
"robots": {}, # 로봇 상태
"tables": {}, # 테이블 상태
"orders": {}, # 주문 상태
"inventory": {} # 재고 상태
}
self.state_lock = asyncio.Lock()
async def update_robot_state(self, robot_id: str, new_state: dict):
"""로봇 상태 업데이트 및 관련 시스템에 브로드캐스트"""
async with self.state_lock:
old_state = self.global_state["robots"].get(robot_id, {})
self.global_state["robots"][robot_id] = {
**old_state,
**new_state,
"last_updated": datetime.now().isoformat()
}
# 상태 변경 이벤트 발생
await self.broadcast_state_change("robots", robot_id, new_state)
async def get_available_robots(self, robot_type: str = None):
"""사용 가능한 로봇 목록 조회"""
available_robots = []
for robot_id, state in self.global_state["robots"].items():
if state.get("status") == "idle":
if robot_type is None or state.get("type") == robot_type:
available_robots.append(robot_id)
return available_robots
2. 작업 스케줄링과 우선순위 관리
여러 주문이 동시에 들어오고, 각 로봇이 서로 다른 작업을 수행해야 하는 상황에서 효율적인 스케줄링이 필요했습니다.
우선순위 기반 작업 큐
import heapq
from dataclasses import dataclass
from enum import Enum
class TaskPriority(Enum):
EMERGENCY = 1 # 비상 상황 (청소 등)
HIGH = 2 # 긴급 주문
NORMAL = 3 # 일반 주문
LOW = 4 # 예방 정비 등
@dataclass
class Task:
id: str
priority: TaskPriority
robot_type_required: str
estimated_duration: int # 분 단위
dependencies: List[str] = None
created_at: datetime = None
def __lt__(self, other):
return self.priority.value < other.priority.value
class TaskScheduler:
def __init__(self):
self.task_queue = []
self.active_tasks = {}
self.completed_tasks = {}
async def add_task(self, task: Task):
"""작업을 큐에 추가"""
heapq.heappush(self.task_queue, task)
await self.try_assign_tasks()
async def try_assign_tasks(self):
"""사용 가능한 로봇에 작업 할당"""
while self.task_queue:
task = heapq.heappop(self.task_queue)
# 의존성 체크
if not await self.check_dependencies(task):
heapq.heappush(self.task_queue, task)
break
# 적절한 로봇 찾기
suitable_robot = await self.find_suitable_robot(task)
if suitable_robot:
await self.assign_task_to_robot(task, suitable_robot)
else:
# 적절한 로봇이 없으면 다시 큐에 추가
heapq.heappush(self.task_queue, task)
break
ROS2 기반 통신 아키텍처
로봇 간 통신과 제어를 위해 ROS2를 활용했습니다.
토픽 설계
# ROS2 토픽 구조
ROS2_TOPICS = {
# 로봇 상태 관련
"/robot/{robot_id}/status": "로봇 상태 정보",
"/robot/{robot_id}/battery": "배터리 정보",
"/robot/{robot_id}/location": "위치 정보",
# 작업 관련
"/tasks/assignments": "작업 할당",
"/tasks/updates": "작업 진행 상황",
"/tasks/completion": "작업 완료 알림",
# 환경 정보
"/environment/obstacles": "장애물 정보",
"/environment/occupancy": "공간 점유 상태",
# 시스템 제어
"/system/emergency": "비상 상황",
"/system/commands": "시스템 명령"
}
ROS2-HTTP 브리지 구현
웹 기반 인터페이스와 ROS2 시스템 간의 통신을 위한 브리지를 구현했습니다:
class ROS2HTTPBridge:
def __init__(self):
rclpy.init()
self.node = rclpy.create_node('http_bridge')
self.publishers = {}
self.subscribers = {}
self.message_queue = asyncio.Queue()
async def publish_to_ros(self, topic: str, message: dict):
"""HTTP 요청을 ROS2 토픽으로 발행"""
if topic not in self.publishers:
# 동적으로 publisher 생성
msg_type = self.get_message_type(topic)
self.publishers[topic] = self.node.create_publisher(
msg_type, topic, 10
)
ros_message = self.convert_dict_to_ros_message(message, topic)
self.publishers[topic].publish(ros_message)
def ros_callback(self, topic: str):
"""ROS2 메시지를 HTTP로 전달하기 위한 콜백"""
def callback(msg):
http_message = self.convert_ros_to_dict(msg)
asyncio.create_task(
self.message_queue.put((topic, http_message))
)
return callback
async def get_ros_messages(self):
"""ROS2 메시지를 HTTP 응답으로 변환"""
while True:
topic, message = await self.message_queue.get()
yield {
"topic": topic,
"data": message,
"timestamp": datetime.now().isoformat()
}
사용자 인터페이스 설계
키오스크 GUI 시스템
고객이 사용하는 키오스크 인터페이스는 직관적이고 접근성이 높도록 설계했습니다:
// React 기반 키오스크 인터페이스
interface KioskState {
currentOrder: Order;
availableMenus: MenuItem[];
systemStatus: 'online' | 'offline' | 'maintenance';
estimatedWaitTime: number;
}
const KioskComponent: React.FC = () => {
const [state, setState] = useState<KioskState>({
currentOrder: { items: [], total: 0 },
availableMenus: [],
systemStatus: 'online',
estimatedWaitTime: 0
});
// WebSocket을 통한 실시간 상태 업데이트
useEffect(() => {
const ws = new WebSocket('ws://localhost:8000/ws/orders');
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'wait_time_update') {
setState(prev => ({
...prev,
estimatedWaitTime: message.data.estimatedTime
}));
}
};
return () => ws.close();
}, []);
return (
<div className="kiosk-interface">
<StatusBar
systemStatus={state.systemStatus}
waitTime={state.estimatedWaitTime}
/>
<MenuGrid menus={state.availableMenus} />
<OrderSummary order={state.currentOrder} />
<PaymentSection />
</div>
);
};
관제 시스템 대시보드
로봇 상태와 시스템 전체를 모니터링하는 대시보드입니다:
// 실시간 모니터링 대시보드
class MonitoringDashboard {
constructor() {
this.websockets = new Map();
this.initializeConnections();
}
initializeConnections() {
const topics = ['robots', 'orders', 'system'];
topics.forEach(topic => {
const ws = new WebSocket(`ws://localhost:8000/ws/${topic}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.updateUI(topic, data);
};
this.websockets.set(topic, ws);
});
}
updateUI(topic, data) {
switch(topic) {
case 'robots':
this.updateRobotStatus(data);
break;
case 'orders':
this.updateOrderQueue(data);
break;
case 'system':
this.updateSystemMetrics(data);
break;
}
}
updateRobotStatus(robotData) {
const robotElements = document.querySelectorAll('.robot-card');
robotElements.forEach(element => {
const robotId = element.dataset.robotId;
if (robotData[robotId]) {
this.renderRobotCard(element, robotData[robotId]);
}
});
}
}
성능 최적화와 확장성
비동기 처리 최적화
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncTaskProcessor:
def __init__(self, max_workers=10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_semaphore = asyncio.Semaphore(max_workers)
async def process_heavy_task(self, task_data):
"""CPU 집약적 작업을 별도 스레드에서 처리"""
async with self.task_semaphore:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
self.heavy_computation,
task_data
)
return result
def heavy_computation(self, data):
"""실제 연산 작업 (6D 포즈 추정 등)"""
# 시간이 오래 걸리는 작업들
result = perform_computer_vision_task(data)
return result
캐싱 전략
from functools import lru_cache
import redis
class CacheManager:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.local_cache = {}
@lru_cache(maxsize=1000)
def get_robot_capabilities(self, robot_id: str):
"""로봇 능력 정보 캐싱 (변경 빈도가 낮음)"""
return self.fetch_robot_capabilities(robot_id)
async def get_cached_data(self, key: str, fetch_func, ttl: int = 300):
"""Redis 캐시 with fallback"""
# Redis에서 먼저 확인
cached_data = self.redis_client.get(key)
if cached_data:
return json.loads(cached_data)
# 캐시 미스 시 데이터 가져오기
fresh_data = await fetch_func()
self.redis_client.setex(key, ttl, json.dumps(fresh_data))
return fresh_data
테스트와 검증
통합 테스트 시나리오
import pytest
import asyncio
class IntegrationTestSuite:
@pytest.mark.asyncio
async def test_full_order_workflow(self):
"""전체 주문 처리 워크플로우 테스트"""
# 1. 주문 생성
order_data = {
"items": [{"menu_id": "salad_001", "quantity": 1}],
"table_number": 5
}
order_id = await self.create_order(order_data)
assert order_id is not None
# 2. 로봇 할당 확인
await asyncio.sleep(2) # 스케줄링 대기
assigned_robots = await self.get_assigned_robots(order_id)
assert len(assigned_robots) > 0
# 3. 조리 과정 시뮬레이션
cookbot_id = assigned_robots[0]
await self.simulate_cooking_process(cookbot_id, order_id)
# 4. 서빙 과정 시뮬레이션
albabot_id = assigned_robots[1]
await self.simulate_serving_process(albabot_id, order_id)
# 5. 최종 상태 확인
final_status = await self.get_order_status(order_id)
assert final_status == "completed"
async def simulate_cooking_process(self, robot_id: str, order_id: str):
"""조리 과정 시뮬레이션"""
steps = [
"ingredient_preparation",
"cooking_start",
"cooking_complete",
"plating"
]
for step in steps:
await self.update_robot_status(robot_id, {
"current_task": step,
"order_id": order_id
})
await asyncio.sleep(1)
배운 교훈과 개선 방향
1. 시스템 복잡성 관리
다중 로봇 시스템의 복잡성은 예상보다 훨씬 높았습니다. 각 컴포넌트가 독립적으로는 잘 작동하지만, 전체 시스템으로 통합했을 때 예상치 못한 상호작용이 발생했습니다.
해결책: 점진적 통합과 철저한 로깅 시스템을 구축했습니다.
2. 실시간성 vs 안정성
실시간 응답성을 높이려고 하면 시스템의 안정성이 떨어지는 트레이드오프가 있었습니다.
해결책: 적응적 타임아웃과 degraded mode 운영을 도입했습니다.
3. 사용자 경험의 중요성
기술적으로 완벽해도 사용자가 이해하기 어려우면 의미가 없다는 것을 깨달았습니다.
해결책: 사용자 피드백을 반영한 반복적인 UI/UX 개선을 진행했습니다.
결론
다중 로봇 시스템 설계는 단순히 로봇 개발에 그치지 않고, 시스템 엔지니어링, 네트워크, 사용자 경험 등 다양한 분야의 종합적인 접근이 필요한 복합적인 프로젝트였습니다.
프로젝트를 통해 로봇공학의 미래가 단순히 개별 로봇의 성능 향상에 있는 것이 아니라, 로봇들이 어떻게 협력하고 인간과 상호작용하느냐에 있다는 것을 깨달았습니다.
앞으로는 AI를 활용한 더 지능적인 작업 스케줄링과 예측 기반 시스템 최적화에 집중해보고 싶습니다.
이 글이 로봇 시스템 개발에 관심 있는 분들께 도움이 되었기를 바랍니다. 프로젝트에 대한 더 자세한 내용이나 기술적 질문이 있으시면 언제든 댓글로 남겨주세요.
Enjoy Reading This Article?
Here are some more articles you might like to read next: