Skip to content

Commit

Permalink
try and fix flake8
Browse files Browse the repository at this point in the history
Signed-off-by: Kenji Brameld <kenjibrameld@gmail.com>
  • Loading branch information
ijnek committed Dec 27, 2023
1 parent e914537 commit 68d1748
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 30 deletions.
12 changes: 5 additions & 7 deletions r2r_spl/r2r_spl/r2r_spl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import construct
import socket
from threading import Thread

import construct
from gc_spl_interfaces.msg import RCGCD15
import rclpy
from rclpy.node import Node

from r2r_spl.serialization import Serialization

from gc_spl_interfaces.msg import RCGCD15

MAX_ALLOWED_MSG_SIZE = 128


Expand Down Expand Up @@ -107,7 +105,7 @@ def _loop(self):
while rclpy.ok():
try:
data, _ = self._sock.recvfrom(1024)
self.get_logger().debug('received: "%s"' % data)
self.get_logger().debug(f'received: {data}')

# Convert data to ROS msg
try:
Expand Down Expand Up @@ -148,10 +146,10 @@ def _rcgcd_callback(self, msg):
team_found = True

if not self._budget_reached and team.message_budget < 10:
self.get_logger().info("Budget almost reached, not sending anymore messages")
self.get_logger().info('Budget almost reached, not sending anymore messages')
self._budget_reached = True
elif self._budget_reached and team.message_budget > 10:
self.get_logger().info("Extra budget available, sending messages again")
self.get_logger().info('Extra budget available, sending messages again')
self._budget_reached = False

if not team_found:
Expand Down
16 changes: 6 additions & 10 deletions r2r_spl/test/test_r2r_spl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@

import socket
import time
import unittest

from gc_spl_interfaces.msg import RCGCD15
from r2r_spl.r2r_spl import R2RSPL
from r2r_spl.serialization import Serialization

from r2r_spl_test_interfaces.msg import ArrayTypes, BasicTypes
import rclpy
from rclpy.parameter import Parameter

from r2r_spl_test_interfaces.msg import ArrayTypes, BasicTypes

from gc_spl_interfaces.msg import RCGCD15

import unittest


class TestR2RSPL(unittest.TestCase):
"""Tests against R2RSPL."""
Expand Down Expand Up @@ -153,7 +149,7 @@ def test_sending(self):
try:
_ = sock.recv(1024)
except TimeoutError:
self.fail("TimeoutError, did not receive expected UDP packet")
self.fail('TimeoutError, did not receive expected UDP packet')

# Close socket
sock.close()
Expand Down Expand Up @@ -276,7 +272,7 @@ def test_message_budget(self):
try:
_ = sock.recv(1024)
except TimeoutError:
self.fail("TimeoutError, did not receive expected UDP packet")
self.fail('TimeoutError, did not receive expected UDP packet')

# Close socket
sock.close()
Expand All @@ -296,7 +292,7 @@ def test_msg_size_exceeding_128_bytes(self):
sock.settimeout(0.1)

# Publish ArrayTypes to r2r_spl_node
publisher.publish(ArrayTypes(data_string="a" * 129))
publisher.publish(ArrayTypes(data_string='a' * 129))

# Wait before spinning for the msg arrive in r2r_spl_node's subscription
time.sleep(0.1)
Expand Down
24 changes: 11 additions & 13 deletions r2r_spl/test/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from r2r_spl_test_interfaces.msg import ArrayTypes, BasicTypes, NestedTypes
from r2r_spl.serialization import Serialization

import numpy.testing
import unittest

import numpy.testing
from r2r_spl.serialization import Serialization
from r2r_spl_test_interfaces.msg import ArrayTypes, BasicTypes, NestedTypes


class TestSerialization(unittest.TestCase):
Expand Down Expand Up @@ -81,10 +79,10 @@ def test_array_types(self):
msg_instance.data_bool_unbounded_dynamic = [True, False, True, False, True]
msg_instance.data_int8_bounded_dynamic = [70, 80]
msg_instance.data_bool_bounded_dynamic = [False]
msg_instance.data_string = "Hello World"
msg_instance.data_string_bounded = "Hi"
msg_instance.data_wstring = "ハローワールド"
msg_instance.data_wstring_bounded = "やあ"
msg_instance.data_string = 'Hello World'
msg_instance.data_string_bounded = 'Hi'
msg_instance.data_wstring = 'ハローワールド'
msg_instance.data_wstring_bounded = 'やあ'

# Serialize
serialized = serialization.serialize(msg_instance)
Expand All @@ -104,10 +102,10 @@ def test_array_types(self):
deserialized.data_bool_unbounded_dynamic, [True, False, True, False, True])
numpy.testing.assert_array_equal(deserialized.data_int8_bounded_dynamic, [70, 80])
self.assertEqual(deserialized.data_bool_bounded_dynamic, [False])
self.assertEqual(deserialized.data_string, "Hello World")
self.assertEqual(deserialized.data_string_bounded, "Hi")
self.assertEqual(deserialized.data_wstring, "ハローワールド")
self.assertEqual(deserialized.data_wstring_bounded, "やあ")
self.assertEqual(deserialized.data_string, 'Hello World')
self.assertEqual(deserialized.data_string_bounded, 'Hi')
self.assertEqual(deserialized.data_wstring, 'ハローワールド')
self.assertEqual(deserialized.data_wstring_bounded, 'やあ')

def test_nested_types(self):
"""Test serialization and deserialization of nested types."""
Expand Down

0 comments on commit 68d1748

Please sign in to comment.