brica1.ros のソースコード
# -*- coding: utf-8 -*-
"""
ros.py
=====
This module containes classes for ROS integration.
"""
__all__ = ["ROSAdapter"]
from abc import ABCMeta, abstractmethod
import copy
import numpy
# BriCA imports
from unit import *
# ROS imporets
import rospy
from std_msgs.msg import Int16MultiArray, MultiArrayDimension
[ドキュメント]class ROSAdapter(Unit):
"""
`ROSAdapter` is a BriCA `Unit` which is intented to provide a bridge over the
ROS Publisher/Subscriber and BriCA `Agent`.
"""
__metaclass__ = ABCMeta
def __init__(self, name="BriCA1 Node"):
""" Create a new `ROSAdapter` instance.
Args:
None.
Returns:
ROSAdapter: a new `ROSAdapter` instance.
"""
super(ROSAdapter, self).__init__()
self.inputs = {}
self.states = {}
self.results = {}
rospy.init_node(name, anonymous=True)
[ドキュメント] def setup_subscriber(self, topic, msg_type, id, length, converter):
""" Setup a ROS subscriber
Args:
topic (str): a topic to subscribe to.
msg_type (msg): incoming message type.
id (str): a string ID.
length (int): an initial length of the value vector.
Returns:
None.
"""
self.make_out_port(id, length)
def callback(data):
self.get_out_port(id).buffer = converter(data)
rospy.Subscriber(topic, msg_type, callback)
[ドキュメント] def setup_publisher(self, topic, id, length):
""" Setup a ROS subscriber
Args:
topic (str): a topic to subscribe to.
id (str): a string ID.
length (int): an initial length of the value vector.
Returns:
None.
"""
self.make_in_port(id, length)
pub = rospy.Publisher(topic, Int16MultiArray, queue_size=10)
def callback(data):
msg = Int16MultiArray()
msg.data = data
msg.layout.dim = [MultiArrayDimension("data", 1, length)]
pub.publish(msg)
self.get_in_port(id).register_callback(callback)
[ドキュメント] def connect(self, target, from_id, to_id):
""" Connect an out-port of another `Unit` to an in-port.
Args:
target (Unit): a `Unit` to connect to.
from_id (str): an out-port of the target `Unit`.
to_id(str): an in-port of this `Unit`.
Returns:
None.
"""
super(ROSAdapter, self).connect(target, from_id, to_id)
from_port = target.get_out_port(from_id)
to_port = self.get_in_port(to_id)
def callback(data):
to_port.sync()
to_port.invoke_callbacks()
from_port.register_callback(callback)