From 3e02ab3cd72bcca6c2fe1160fa044ab687f86191 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Fri, 13 Mar 2015 20:19:46 +0100 Subject: fixed some lint errors and removed arming --- .../demo_tests/mavros_offboard_attctl_test.py | 49 ++++++++-------------- 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/integrationtests/demo_tests/mavros_offboard_attctl_test.py b/integrationtests/demo_tests/mavros_offboard_attctl_test.py index 92cdf1e0a..4c850315b 100755 --- a/integrationtests/demo_tests/mavros_offboard_attctl_test.py +++ b/integrationtests/demo_tests/mavros_offboard_attctl_test.py @@ -68,45 +68,27 @@ class MavrosOffboardAttctlTest(unittest.TestCase): rospy.wait_for_service('iris/mavros/cmd/arming', 30) rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback) - self.pubAtt = rospy.Publisher('iris/mavros/setpoint/attitude', PoseStamped, queue_size=10) - self.pubThr = rospy.Publisher('iris/mavros/setpoint/att_throttle', Float64, queue_size=10) - self.cmdArm = rospy.ServiceProxy("iris/mavros/cmd/arming", CommandBool) + self.pub_att = rospy.Publisher('iris/mavros/setpoint/attitude', PoseStamped, queue_size=10) + self.pub_thr = rospy.Publisher('iris/mavros/setpoint/att_throttle', Float64, queue_size=10) self.rate = rospy.Rate(10) # 10hz - self.rateSec = rospy.Rate(1) - self.hasPos = False - self.controlMode = vehicle_control_mode() + self.has_pos = False + self.control_mode = vehicle_control_mode() + self.local_position = PoseStamped() # # General callback functions used in tests # def position_callback(self, data): - self.hasPos = True - self.localPosition = data + self.has_pos = True + self.local_position = data def vehicle_control_mode_callback(self, data): - self.controlMode = data - - - # - # Helper methods - # - def arm(self): - return self.cmdArm(value=True) + self.control_mode = data # # Test offboard position control # def test_attctl(self): - # FIXME: this must go ASAP when arming is implemented - manIn = ManualInput() - manIn.arm() - manIn.offboard_attctl() - - self.assertTrue(self.arm(), "Could not arm") - self.rateSec.sleep() - self.rateSec.sleep() - self.assertTrue(self.controlMode.flag_armed, "flag_armed is not set after 2 seconds") - # set some attitude and thrust att = PoseStamped() att.header = Header() @@ -121,19 +103,22 @@ class MavrosOffboardAttctlTest(unittest.TestCase): # does it cross expected boundaries in X seconds? count = 0 timeout = 120 - while(count < timeout): + while count < timeout: # update timestamp for each published SP att.header.stamp = rospy.Time.now() - self.pubAtt.publish(att) - self.pubThr.publish(throttle) + self.pub_att.publish(att) + self.pub_thr.publish(throttle) - if (self.localPosition.pose.position.x > 5 - and self.localPosition.pose.position.z > 5 - and self.localPosition.pose.position.y < -5): + if (self.local_position.pose.position.x > 5 + and self.local_position.pose.position.z > 5 + and self.local_position.pose.position.y < -5): break count = count + 1 self.rate.sleep() + self.assertTrue(self.control_mode.flag_armed, "flag_armed is not set") + self.assertTrue(self.control_mode.flag_control_attitude_enabled, "flag_control_attitude_enabled is not set") + self.assertTrue(self.control_mode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set") self.assertTrue(count < timeout, "took too long to cross boundaries") -- cgit v1.2.3 From 8ada8dc648e730c6dd4efefada1f77e8dd732e68 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Fri, 13 Mar 2015 20:36:45 +0100 Subject: trying to fix timing issue --- integrationtests/demo_tests/mavros_offboard_attctl_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integrationtests/demo_tests/mavros_offboard_attctl_test.py b/integrationtests/demo_tests/mavros_offboard_attctl_test.py index 4c850315b..73cfe5a81 100755 --- a/integrationtests/demo_tests/mavros_offboard_attctl_test.py +++ b/integrationtests/demo_tests/mavros_offboard_attctl_test.py @@ -70,7 +70,7 @@ class MavrosOffboardAttctlTest(unittest.TestCase): rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback) self.pub_att = rospy.Publisher('iris/mavros/setpoint/attitude', PoseStamped, queue_size=10) self.pub_thr = rospy.Publisher('iris/mavros/setpoint/att_throttle', Float64, queue_size=10) - self.rate = rospy.Rate(10) # 10hz + self.rate = rospy.Rate(20) # 10hz self.has_pos = False self.control_mode = vehicle_control_mode() self.local_position = PoseStamped() @@ -107,14 +107,15 @@ class MavrosOffboardAttctlTest(unittest.TestCase): # update timestamp for each published SP att.header.stamp = rospy.Time.now() self.pub_att.publish(att) + self.rate.sleep() self.pub_thr.publish(throttle) + self.rate.sleep() if (self.local_position.pose.position.x > 5 and self.local_position.pose.position.z > 5 and self.local_position.pose.position.y < -5): break count = count + 1 - self.rate.sleep() self.assertTrue(self.control_mode.flag_armed, "flag_armed is not set") self.assertTrue(self.control_mode.flag_control_attitude_enabled, "flag_control_attitude_enabled is not set") -- cgit v1.2.3 From 2bbad206961ec4bf1e78ac8b6689634fa3e482b8 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Sun, 15 Mar 2015 09:51:38 +0100 Subject: fixed some warnings and updated comments --- .../demo_tests/mavros_offboard_attctl_test.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/integrationtests/demo_tests/mavros_offboard_attctl_test.py b/integrationtests/demo_tests/mavros_offboard_attctl_test.py index 73cfe5a81..ecad9df4d 100755 --- a/integrationtests/demo_tests/mavros_offboard_attctl_test.py +++ b/integrationtests/demo_tests/mavros_offboard_attctl_test.py @@ -37,29 +37,20 @@ # PKG = 'px4' -import sys import unittest import rospy -import math - -from numpy import linalg -import numpy as np from px4.msg import vehicle_control_mode from std_msgs.msg import Header from std_msgs.msg import Float64 from geometry_msgs.msg import PoseStamped, Quaternion from tf.transformations import quaternion_from_euler -from mavros.srv import CommandBool - -from manual_input import ManualInput # -# Tests flying a path in offboard control by sending position setpoints +# Tests flying a path in offboard control by sending attitude and thrust setpoints # over MAVROS. # -# For the test to be successful it needs to reach all setpoints in a certain time. -# FIXME: add flight path assertion (needs transformation from ROS frame to NED) +# For the test to be successful it needs to cross a certain boundary in time. # class MavrosOffboardAttctlTest(unittest.TestCase): @@ -106,14 +97,15 @@ class MavrosOffboardAttctlTest(unittest.TestCase): while count < timeout: # update timestamp for each published SP att.header.stamp = rospy.Time.now() + self.pub_att.publish(att) - self.rate.sleep() + self.rate.sleep() # I'm guessing this is necessary to prevent timing issues self.pub_thr.publish(throttle) self.rate.sleep() if (self.local_position.pose.position.x > 5 - and self.local_position.pose.position.z > 5 - and self.local_position.pose.position.y < -5): + and self.local_position.pose.position.z > 5 + and self.local_position.pose.position.y < -5): break count = count + 1 -- cgit v1.2.3 From 7018ff298ea3174d6764a1e35196521ee31f4dd3 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Sun, 15 Mar 2015 12:07:35 +0100 Subject: removing manual input and unused code from posctl test --- .../demo_tests/mavros_offboard_posctl_test.py | 71 +++++++++------------- 1 file changed, 29 insertions(+), 42 deletions(-) diff --git a/integrationtests/demo_tests/mavros_offboard_posctl_test.py b/integrationtests/demo_tests/mavros_offboard_posctl_test.py index 6d26015e7..568c2cbd8 100755 --- a/integrationtests/demo_tests/mavros_offboard_posctl_test.py +++ b/integrationtests/demo_tests/mavros_offboard_posctl_test.py @@ -37,7 +37,6 @@ # PKG = 'px4' -import sys import unittest import rospy import math @@ -49,9 +48,6 @@ from px4.msg import vehicle_control_mode from std_msgs.msg import Header from geometry_msgs.msg import PoseStamped, Quaternion from tf.transformations import quaternion_from_euler -from mavros.srv import CommandBool - -from manual_input import ManualInput # # Tests flying a path in offboard control by sending position setpoints @@ -64,37 +60,41 @@ class MavrosOffboardPosctlTest(unittest.TestCase): def setUp(self): rospy.init_node('test_node', anonymous=True) - rospy.wait_for_service('iris/mavros/cmd/arming', 30) rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback) - self.pubSpt = rospy.Publisher('iris/mavros/setpoint/local_position', PoseStamped, queue_size=10) - self.cmdArm = rospy.ServiceProxy("iris/mavros/cmd/arming", CommandBool) + self.pub_spt = rospy.Publisher('iris/mavros/setpoint/local_position', PoseStamped, queue_size=10) self.rate = rospy.Rate(10) # 10hz - self.rateSec = rospy.Rate(1) - self.hasPos = False - self.controlMode = vehicle_control_mode() + self.has_pos = False + self.local_position = PoseStamped() + self.control_mode = vehicle_control_mode() # # General callback functions used in tests # def position_callback(self, data): - self.hasPos = True - self.localPosition = data + self.has_pos = True + self.local_position = data def vehicle_control_mode_callback(self, data): - self.controlMode = data + self.control_mode = data # # Helper methods # def is_at_position(self, x, y, z, offset): - if(not self.hasPos): + if not self.has_pos: return False - rospy.logdebug("current position %f, %f, %f" % (self.localPosition.pose.position.x, self.localPosition.pose.position.y, self.localPosition.pose.position.z)) + rospy.logdebug("current position %f, %f, %f" % + (self.local_position.pose.position.x, + self.local_position.pose.position.y, + self.local_position.pose.position.z)) + desired = np.array((x, y, z)) - pos = np.array((self.localPosition.pose.position.x, self.localPosition.pose.position.y, self.localPosition.pose.position.z)) + pos = np.array((self.local_position.pose.position.x, + self.local_position.pose.position.y, + self.local_position.pose.position.z)) return linalg.norm(desired - pos) < offset def reach_position(self, x, y, z, timeout): @@ -114,57 +114,44 @@ class MavrosOffboardPosctlTest(unittest.TestCase): # does it reach the position in X seconds? count = 0 - while(count < timeout): + while count < timeout: # update timestamp for each published SP pos.header.stamp = rospy.Time.now() - self.pubSpt.publish(pos) + self.pub_spt.publish(pos) - if(self.is_at_position(pos.pose.position.x, pos.pose.position.y, pos.pose.position.z, 0.5)): + if self.is_at_position(pos.pose.position.x, pos.pose.position.y, pos.pose.position.z, 0.5): break count = count + 1 self.rate.sleep() self.assertTrue(count < timeout, "took too long to get to position") - def arm(self): - return self.cmdArm(value=True) - # # Test offboard position control # def test_posctl(self): - # FIXME: this must go ASAP when arming is implemented - manIn = ManualInput() - manIn.arm() - manIn.offboard_posctl() - - self.assertTrue(self.arm(), "Could not arm") - self.rateSec.sleep() - self.rateSec.sleep() - self.assertTrue(self.controlMode.flag_armed, "flag_armed is not set after 2 seconds") - # prepare flight path positions = ( - (0,0,0), - (2,2,2), - (2,-2,2), - (-2,-2,2), - (2,2,2)) + (0, 0, 0), + (2, 2, 2), + (2, -2, 2), + (-2, -2, 2), + (2, 2, 2)) for i in range(0, len(positions)): self.reach_position(positions[i][0], positions[i][1], positions[i][2], 120) - # does it hold the position for Y seconds? - positionHeld = True count = 0 timeout = 50 - while(count < timeout): - if(not self.is_at_position(2, 2, 2, 0.5)): - positionHeld = False + while count < timeout: + if not self.is_at_position(2, 2, 2, 0.5): break count = count + 1 self.rate.sleep() + self.assertTrue(self.control_mode.flag_armed, "flag_armed is not set") + self.assertTrue(self.control_mode.flag_control_position_enabled, "flag_control_position_enabled is not set") + self.assertTrue(self.control_mode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set") self.assertTrue(count == timeout, "position could not be held") -- cgit v1.2.3 From 4b65e6259595f1f6dfb70fe8b44e229b49896f28 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Sun, 15 Mar 2015 12:08:36 +0100 Subject: removing sleep between publishments again, issue should be fixed in mavlink node --- integrationtests/demo_tests/mavros_offboard_attctl_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integrationtests/demo_tests/mavros_offboard_attctl_test.py b/integrationtests/demo_tests/mavros_offboard_attctl_test.py index ecad9df4d..30e9fe9ba 100755 --- a/integrationtests/demo_tests/mavros_offboard_attctl_test.py +++ b/integrationtests/demo_tests/mavros_offboard_attctl_test.py @@ -61,7 +61,7 @@ class MavrosOffboardAttctlTest(unittest.TestCase): rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback) self.pub_att = rospy.Publisher('iris/mavros/setpoint/attitude', PoseStamped, queue_size=10) self.pub_thr = rospy.Publisher('iris/mavros/setpoint/att_throttle', Float64, queue_size=10) - self.rate = rospy.Rate(20) # 10hz + self.rate = rospy.Rate(10) # 10hz self.has_pos = False self.control_mode = vehicle_control_mode() self.local_position = PoseStamped() @@ -99,7 +99,6 @@ class MavrosOffboardAttctlTest(unittest.TestCase): att.header.stamp = rospy.Time.now() self.pub_att.publish(att) - self.rate.sleep() # I'm guessing this is necessary to prevent timing issues self.pub_thr.publish(throttle) self.rate.sleep() -- cgit v1.2.3 From 6f44b4d3543ff702dd9cf465212b20d7c3ae8d2e Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Sun, 15 Mar 2015 15:44:01 +0100 Subject: remove gazebo plugin command fake, fix some lint warnings --- integrationtests/demo_tests/manual_input.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/integrationtests/demo_tests/manual_input.py b/integrationtests/demo_tests/manual_input.py index 7072464de..167221285 100755 --- a/integrationtests/demo_tests/manual_input.py +++ b/integrationtests/demo_tests/manual_input.py @@ -35,7 +35,6 @@ # # @author Andreas Antener # -import sys import rospy from px4.msg import manual_control_setpoint @@ -46,17 +45,12 @@ from std_msgs.msg import Header # # Manual input control helper # -# FIXME: this is not the way to do it! ATM it fakes input to iris/command/attitude because else -# the simulator does not instantiate our controller. Probably this whole class will disappear once -# arming works correctly. -# -class ManualInput: +class ManualInput(object): def __init__(self): rospy.init_node('test_node', anonymous=True) - self.pubMcsp = rospy.Publisher('iris/manual_control_setpoint', manual_control_setpoint, queue_size=10) - self.pubOff = rospy.Publisher('iris/offboard_control_mode', offboard_control_mode, queue_size=10) - self.pubAtt = rospy.Publisher('iris/command/attitude', CommandAttitudeThrust, queue_size=10) + self.pub_mcsp = rospy.Publisher('iris/manual_control_setpoint', manual_control_setpoint, queue_size=10) + self.pub_off = rospy.Publisher('iris/offboard_control_mode', offboard_control_mode, queue_size=10) def arm(self): rate = rospy.Rate(10) # 10hz @@ -81,9 +75,7 @@ class ManualInput: rospy.loginfo("zeroing") time = rospy.get_rostime().now() pos.timestamp = time.secs * 1e6 + time.nsecs / 1000 - self.pubMcsp.publish(pos) - # Fake input to iris commander - self.pubAtt.publish(att) + self.pub_mcsp.publish(pos) rate.sleep() count = count + 1 @@ -93,7 +85,7 @@ class ManualInput: rospy.loginfo("arming") time = rospy.get_rostime().now() pos.timestamp = time.secs * 1e6 + time.nsecs / 1000 - self.pubMcsp.publish(pos) + self.pub_mcsp.publish(pos) rate.sleep() count = count + 1 @@ -118,7 +110,7 @@ class ManualInput: rospy.loginfo("triggering posctl") time = rospy.get_rostime().now() pos.timestamp = time.secs * 1e6 + time.nsecs / 1000 - self.pubMcsp.publish(pos) + self.pub_mcsp.publish(pos) rate.sleep() count = count + 1 @@ -147,7 +139,7 @@ class ManualInput: rospy.loginfo("setting offboard mode") time = rospy.get_rostime().now() mode.timestamp = time.secs * 1e6 + time.nsecs / 1000 - self.pubOff.publish(mode) + self.pub_off.publish(mode) rate.sleep() count = count + 1 @@ -169,7 +161,7 @@ class ManualInput: rospy.loginfo("triggering offboard") time = rospy.get_rostime().now() pos.timestamp = time.secs * 1e6 + time.nsecs / 1000 - self.pubMcsp.publish(pos) + self.pub_mcsp.publish(pos) rate.sleep() count = count + 1 -- cgit v1.2.3 From c4a81f66cbe6714f28b3a9853d428bde827d4a03 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Sun, 15 Mar 2015 15:50:25 +0100 Subject: set ground truth to true again, needs latest master from euroc_simulator --- integrationtests/demo_tests/direct_tests.launch | 2 +- integrationtests/demo_tests/mavros_tests.launch | 2 +- launch/gazebo_ardrone_empty_world.launch | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integrationtests/demo_tests/direct_tests.launch b/integrationtests/demo_tests/direct_tests.launch index 1a7d843fd..f332332cf 100644 --- a/integrationtests/demo_tests/direct_tests.launch +++ b/integrationtests/demo_tests/direct_tests.launch @@ -2,7 +2,7 @@ - + diff --git a/integrationtests/demo_tests/mavros_tests.launch b/integrationtests/demo_tests/mavros_tests.launch index e42db6043..cc4918307 100644 --- a/integrationtests/demo_tests/mavros_tests.launch +++ b/integrationtests/demo_tests/mavros_tests.launch @@ -2,7 +2,7 @@ - + diff --git a/launch/gazebo_ardrone_empty_world.launch b/launch/gazebo_ardrone_empty_world.launch index 122a27867..7243e7476 100644 --- a/launch/gazebo_ardrone_empty_world.launch +++ b/launch/gazebo_ardrone_empty_world.launch @@ -3,7 +3,7 @@ - + -- cgit v1.2.3 From 15a5c0a9bebf113ddcb6f1b95a8672fa882dcdcc Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Sun, 15 Mar 2015 16:12:17 +0100 Subject: cleaned up direct tests, fixed various lint warnings and removed unused code --- .../demo_tests/direct_manual_input_test.py | 23 ++--- .../demo_tests/direct_offboard_posctl_test.py | 55 ++++++------ .../demo_tests/flight_path_assertion.py | 100 ++++++++++++--------- 3 files changed, 99 insertions(+), 79 deletions(-) diff --git a/integrationtests/demo_tests/direct_manual_input_test.py b/integrationtests/demo_tests/direct_manual_input_test.py index b037d842f..394500e94 100755 --- a/integrationtests/demo_tests/direct_manual_input_test.py +++ b/integrationtests/demo_tests/direct_manual_input_test.py @@ -37,7 +37,6 @@ # PKG = 'px4' -import sys import unittest import rospy @@ -50,14 +49,18 @@ from manual_input import ManualInput # class ManualInputTest(unittest.TestCase): + def setUp(self): + self.actuator_status = actuator_armed() + self.control_mode = vehicle_control_mode() + # # General callback functions used in tests # def actuator_armed_callback(self, data): - self.actuatorStatus = data + self.actuator_status = data def vehicle_control_mode_callback(self, data): - self.controlMode = data + self.control_mode = data # # Test arming @@ -67,19 +70,19 @@ class ManualInputTest(unittest.TestCase): rospy.Subscriber('iris/actuator_armed', actuator_armed, self.actuator_armed_callback) rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) - man = ManualInput() + man_in = ManualInput() # Test arming - man.arm() - self.assertEquals(self.actuatorStatus.armed, True, "did not arm") + man_in.arm() + self.assertEquals(self.actuator_status.armed, True, "did not arm") # Test posctl - man.posctl() - self.assertTrue(self.controlMode.flag_control_position_enabled, "flag_control_position_enabled is not set") + man_in.posctl() + self.assertTrue(self.control_mode.flag_control_position_enabled, "flag_control_position_enabled is not set") # Test offboard - man.offboard() - self.assertTrue(self.controlMode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set") + man_in.offboard() + self.assertTrue(self.control_mode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set") if __name__ == '__main__': diff --git a/integrationtests/demo_tests/direct_offboard_posctl_test.py b/integrationtests/demo_tests/direct_offboard_posctl_test.py index 18239e926..225760d6c 100755 --- a/integrationtests/demo_tests/direct_offboard_posctl_test.py +++ b/integrationtests/demo_tests/direct_offboard_posctl_test.py @@ -37,7 +37,6 @@ # PKG = 'px4' -import sys import unittest import rospy @@ -46,11 +45,8 @@ import numpy as np from px4.msg import vehicle_local_position from px4.msg import vehicle_control_mode -from px4.msg import actuator_armed from px4.msg import position_setpoint_triplet from px4.msg import position_setpoint -from sensor_msgs.msg import Joy -from std_msgs.msg import Header from manual_input import ManualInput from flight_path_assertion import FlightPathAssertion @@ -68,31 +64,34 @@ class DirectOffboardPosctlTest(unittest.TestCase): rospy.init_node('test_node', anonymous=True) rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) rospy.Subscriber("iris/vehicle_local_position", vehicle_local_position, self.position_callback) - self.pubSpt = rospy.Publisher('iris/position_setpoint_triplet', position_setpoint_triplet, queue_size=10) + self.pub_spt = rospy.Publisher('iris/position_setpoint_triplet', position_setpoint_triplet, queue_size=10) self.rate = rospy.Rate(10) # 10hz + self.has_pos = False + self.local_position = vehicle_local_position() + self.control_mode = vehicle_control_mode() def tearDown(self): - if (self.fpa): + if self.fpa: self.fpa.stop() # # General callback functions used in tests # def position_callback(self, data): - self.hasPos = True - self.localPosition = data + self.has_pos = True + self.local_position = data def vehicle_control_mode_callback(self, data): - self.controlMode = data + self.control_mode = data # # Helper methods # def is_at_position(self, x, y, z, offset): - rospy.logdebug("current position %f, %f, %f" % (self.localPosition.x, self.localPosition.y, self.localPosition.z)) + rospy.logdebug("current position %f, %f, %f" % (self.local_position.x, self.local_position.y, self.local_position.z)) desired = np.array((x, y, z)) - pos = np.array((self.localPosition.x, self.localPosition.y, self.localPosition.z)) + pos = np.array((self.local_position.x, self.local_position.y, self.local_position.z)) return linalg.norm(desired - pos) < offset def reach_position(self, x, y, z, timeout): @@ -105,12 +104,12 @@ class DirectOffboardPosctlTest(unittest.TestCase): pos.position_valid = True stp = position_setpoint_triplet() stp.current = pos - self.pubSpt.publish(stp) + self.pub_spt.publish(stp) # does it reach the position in X seconds? count = 0 - while(count < timeout): - if(self.is_at_position(pos.x, pos.y, pos.z, 0.5)): + while count < timeout: + if self.is_at_position(pos.x, pos.y, pos.z, 0.5): break count = count + 1 self.rate.sleep() @@ -121,22 +120,22 @@ class DirectOffboardPosctlTest(unittest.TestCase): # Test offboard position control # def test_posctl(self): - manIn = ManualInput() + man_in = ManualInput() # arm and go into offboard - manIn.arm() - manIn.offboard() - self.assertTrue(self.controlMode.flag_armed, "flag_armed is not set") - self.assertTrue(self.controlMode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set") - self.assertTrue(self.controlMode.flag_control_position_enabled, "flag_control_position_enabled is not set") + man_in.arm() + man_in.offboard() + self.assertTrue(self.control_mode.flag_armed, "flag_armed is not set") + self.assertTrue(self.control_mode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set") + self.assertTrue(self.control_mode.flag_control_position_enabled, "flag_control_position_enabled is not set") # prepare flight path positions = ( - (0,0,0), - (2,2,-2), - (2,-2,-2), - (-2,-2,-2), - (2,2,-2)) + (0, 0, 0), + (2, 2, -2), + (2, -2, -2), + (-2, -2, -2), + (2, 2, -2)) # flight path assertion self.fpa = FlightPathAssertion(positions, 1, 0) @@ -147,12 +146,10 @@ class DirectOffboardPosctlTest(unittest.TestCase): self.assertFalse(self.fpa.failed, "breached flight path tunnel (%d)" % i) # does it hold the position for Y seconds? - positionHeld = True count = 0 timeout = 50 - while(count < timeout): - if(not self.is_at_position(2, 2, -2, 0.5)): - positionHeld = False + while count < timeout: + if not self.is_at_position(2, 2, -2, 0.5): break count = count + 1 self.rate.sleep() diff --git a/integrationtests/demo_tests/flight_path_assertion.py b/integrationtests/demo_tests/flight_path_assertion.py index 485de8c41..ccadd1ab9 100644 --- a/integrationtests/demo_tests/flight_path_assertion.py +++ b/integrationtests/demo_tests/flight_path_assertion.py @@ -35,7 +35,6 @@ # # @author Andreas Antener # -import sys import rospy import threading @@ -48,7 +47,6 @@ from geometry_msgs.msg import Twist from numpy import linalg import numpy as np -import math # # Helper to test if vehicle stays on expected flight path. @@ -62,30 +60,52 @@ class FlightPathAssertion(threading.Thread): # TODO: yaw validation # TODO: fail main test thread # - def __init__(self, positions, tunnelRadius = 1, yawOffset = 0.2): + def __init__(self, positions, tunnelRadius=1, yaw_offset=0.2): threading.Thread.__init__(self) rospy.Subscriber("px4_multicopter/vehicle_local_position", vehicle_local_position, self.position_callback) - self.spawnModel = rospy.ServiceProxy('gazebo/spawn_sdf_model', SpawnModel) - self.setModelState = rospy.ServiceProxy('gazebo/set_model_state', SetModelState) - self.deleteModel = rospy.ServiceProxy('gazebo/delete_model', DeleteModel) + self.spawn_model = rospy.ServiceProxy('gazebo/spawn_sdf_model', SpawnModel) + self.set_model_state = rospy.ServiceProxy('gazebo/set_model_state', SetModelState) + self.delete_model = rospy.ServiceProxy('gazebo/delete_model', DeleteModel) self.positions = positions - self.tunnelRadius = tunnelRadius - self.yawOffset = yawOffset - self.hasPos = False - self.shouldStop = False + self.tunnel_radius = tunnelRadius + self.yaw_offset = yaw_offset + self.has_pos = False + self.should_stop = False self.center = positions[0] - self.endOfSegment = False + self.end_of_segment = False self.failed = False + self.local_position = vehicle_local_position def position_callback(self, data): - self.hasPos = True - self.localPosition = data + self.has_pos = True + self.local_position = data def spawn_indicator(self): - self.deleteModel("indicator") - xml = "true0.7%f1 0 0 0.51 0 0 0.5" % self.tunnelRadius - self.spawnModel("indicator", xml, "", Pose(), "") + self.delete_model("indicator") + xml = ( + "" + + "" + + "" + + "true" + + "" + + "" + + "0.7" + + "" + + "" + + "%f" + + "" + + "" + + "" + + "1 0 0 0.5" + + "1 0 0 0.5" + + "" + + "" + + "" + + "" + + "" % self.tunnel_radius) + + self.spawn_model("indicator", xml, "", Pose(), "") def position_indicator(self): state = SetModelState() @@ -97,7 +117,7 @@ class FlightPathAssertion(threading.Thread): state.pose = pose state.twist = Twist() state.reference_frame = "" - self.setModelState(state) + self.set_model_state(state) def distance_to_line(self, a, b, pos): v = b - a @@ -111,7 +131,7 @@ class FlightPathAssertion(threading.Thread): c2 = np.dot(v, v) if c2 <= c1: # after b self.center = b - self.endOfSegment = True + self.end_of_segment = True return linalg.norm(pos - b) x = c1 / c2 @@ -120,7 +140,7 @@ class FlightPathAssertion(threading.Thread): return linalg.norm(pos - l) def stop(self): - self.shouldStop = True + self.should_stop = True def run(self): rate = rospy.Rate(10) # 10hz @@ -128,43 +148,43 @@ class FlightPathAssertion(threading.Thread): current = 0 - while not self.shouldStop: - if (self.hasPos): + while not self.should_stop: + if self.has_pos: # calculate distance to line segment between first two points - # if distances > tunnelRadius + # if distances > tunnel_radius # exit with error - # advance current pos if not on the line anymore or distance to next point < tunnelRadius + # advance current pos if not on the line anymore or distance to next point < tunnel_radius # exit if current pos is now the last position self.position_indicator() - pos = np.array((self.localPosition.x, - self.localPosition.y, - self.localPosition.z)) - aPos = np.array((self.positions[current][0], - self.positions[current][1], - self.positions[current][2])) - bPos = np.array((self.positions[current + 1][0], - self.positions[current + 1][1], - self.positions[current + 1][2])) + pos = np.array((self.local_position.x, + self.local_position.y, + self.local_position.z)) + a_pos = np.array((self.positions[current][0], + self.positions[current][1], + self.positions[current][2])) + b_pos = np.array((self.positions[current + 1][0], + self.positions[current + 1][1], + self.positions[current + 1][2])) - dist = self.distance_to_line(aPos, bPos, pos) - bDist = linalg.norm(pos - bPos) + dist = self.distance_to_line(a_pos, b_pos, pos) + b_dist = linalg.norm(pos - b_pos) - rospy.logdebug("distance to line: %f, distance to end: %f" % (dist, bDist)) + rospy.logdebug("distance to line: %f, distance to end: %f" % (dist, b_dist)) - if (dist > self.tunnelRadius): - msg = "left tunnel at position (%f, %f, %f)" % (self.localPosition.x, self.localPosition.y, self.localPosition.z) + if dist > self.tunnel_radius: + msg = "left tunnel at position (%f, %f, %f)" % (self.local_position.x, self.local_position.y, self.local_position.z) rospy.logerr(msg) self.failed = True break - if (self.endOfSegment or bDist < self.tunnelRadius): + if self.end_of_segment or b_dist < self.tunnel_radius: rospy.loginfo("next segment") - self.endOfSegment = False + self.end_of_segment = False current = current + 1 - if (current == len(self.positions) - 1): + if current == len(self.positions) - 1: rospy.loginfo("no more positions") break -- cgit v1.2.3 From 1d5beb1edf63fef52a8c68993bcfe80b51a1aba8 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Sun, 15 Mar 2015 16:39:35 +0100 Subject: - updated flight path assertion position subscription and added check if it does not get a position after 1 sec - modified test so it works iwth new _Z_P parameter --- integrationtests/demo_tests/direct_offboard_posctl_test.py | 1 + integrationtests/demo_tests/flight_path_assertion.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/integrationtests/demo_tests/direct_offboard_posctl_test.py b/integrationtests/demo_tests/direct_offboard_posctl_test.py index 225760d6c..d61eec063 100755 --- a/integrationtests/demo_tests/direct_offboard_posctl_test.py +++ b/integrationtests/demo_tests/direct_offboard_posctl_test.py @@ -132,6 +132,7 @@ class DirectOffboardPosctlTest(unittest.TestCase): # prepare flight path positions = ( (0, 0, 0), + (0, 0, -2), (2, 2, -2), (2, -2, -2), (-2, -2, -2), diff --git a/integrationtests/demo_tests/flight_path_assertion.py b/integrationtests/demo_tests/flight_path_assertion.py index ccadd1ab9..244324cd0 100644 --- a/integrationtests/demo_tests/flight_path_assertion.py +++ b/integrationtests/demo_tests/flight_path_assertion.py @@ -62,7 +62,7 @@ class FlightPathAssertion(threading.Thread): # def __init__(self, positions, tunnelRadius=1, yaw_offset=0.2): threading.Thread.__init__(self) - rospy.Subscriber("px4_multicopter/vehicle_local_position", vehicle_local_position, self.position_callback) + rospy.Subscriber("iris/vehicle_local_position", vehicle_local_position, self.position_callback) self.spawn_model = rospy.ServiceProxy('gazebo/spawn_sdf_model', SpawnModel) self.set_model_state = rospy.ServiceProxy('gazebo/set_model_state', SetModelState) self.delete_model = rospy.ServiceProxy('gazebo/delete_model', DeleteModel) @@ -103,7 +103,7 @@ class FlightPathAssertion(threading.Thread): "" + "" + "" + - "" % self.tunnel_radius) + "") % self.tunnel_radius self.spawn_model("indicator", xml, "", Pose(), "") @@ -147,7 +147,7 @@ class FlightPathAssertion(threading.Thread): self.spawn_indicator() current = 0 - + count = 0 while not self.should_stop: if self.has_pos: # calculate distance to line segment between first two points @@ -189,3 +189,9 @@ class FlightPathAssertion(threading.Thread): break rate.sleep() + count = count + 1 + + if count > 10 and not self.has_pos: # no position after 1 sec + rospy.logerr("no position") + self.failed = True + break -- cgit v1.2.3