Examples¶
The following examples will demonstrate the usage of the modular robot software interface and can serve as an entry point for own projects.
Preparation¶
- Assenble a 1-Joint robot that cannot colide with itself or the envitonment. Follow the instructions from section Module Assembly for assembling the robot.
- Wire the components as indicated in section General Setup.
- Connect your PC or laptop to the appropriate port of the control unit and set up your network configuration as indicated in Network Configuration.
Netcat¶
Note
Section under construction.
Basic demo, no programming required.
- Use this command to instruct the robot to move a its first joint to 90 degrees (assuming a revolute joint).
- Use this command ro instruct the robot to move the joint back to the original position.
- Use this command to recueive a message from the robot.
Can also be done for robots with multiple joints. Be aware of the risk of collision and resulting damage or injuries when commanding a robot to move to random positions.
Python Basic¶
Here a basic basic python example for interacting with the robot from python in very few lines of of code is given. This program will make a robot joint incrementally move into one direction and cyclically alter the direction of motion after a few increments. You can directly copy the source code form here to a python file and execute it or paste the code directly into a pyhton interreter line by line.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | import socket
import time
IP_ADDR = "192.168.1.1"
PORT = 25000
my_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
msg_move_positive = bytearray([0x03, 0x00, 0x00, 0x05, 0x01, 0x00, 0x04, 0xFF,
0xFF]);
msg_move_negative = bytearray([0x03, 0x00, 0x00, 0x05, 0x01, 0xFF, 0xFB, 0x00,
0x01]);
msg_sequence_number = 0x00
count = 0
while(True):
msg_move_positive[1] = msg_sequence_number
msg_move_negative[1] = msg_sequence_number
if(count < 500):
my_sock.sendto(msg_move_positive, (IP_ADDR, PORT))
else:
my_sock.sendto(msg_move_negative, (IP_ADDR, PORT))
if(count > 1000):
count = 0
count += 1
if(msg_sequence_number < 255):
msg_sequence_number += 1
else:
msg_sequence_number = 0
time.sleep(.01);
|
Python Advanced¶
The following example code is meant to serve as a starting point for custom Python projects that involve communication with the robot.
Receiver¶
This smaple program receives and decodes messages from the robot control unit.
There is a handler function for each GET_ message specified in section
Payload named in accodrance to the names from section Payload
prefixed with the word process_
. In this example, the decoded content of
each valid message is printed by a call to print()
in each handler function
. Replace this print()
statement by your desired hadling code for the
decoded message contents.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | import math
import socket
IP_ADDR = "192.168.1.2"
PORT = 25001
MAX_MSG_LENGTH = 508
MIN_MSG_LENGTH = 4
_last_rx_mesage_sequence_number = 0x00
def main():
rx_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_socket.bind((IP_ADDR, PORT))
while True:
data, address = rx_socket.recvfrom(MAX_MSG_LENGTH)
if len(data) > 0:
success = process_message(data)
if not success:
print("Error decoding message.")
def process_message(message):
global _last_rx_mesage_sequence_number
success = False
if(len(message) >= MIN_MSG_LENGTH) and (len(message) <= MAX_MSG_LENGTH):
# Valid message size verified. Continue processing.
message_type = int.from_bytes(message[0:1], byteorder='big',
signed=False)
message_sequence_number = int.from_bytes(message[1:2], byteorder='big',
signed=False)
if message_sequence_number == _last_rx_mesage_sequence_number:
print("Duplicate message detected.")
_last_rx_mesage_sequence_number = message_sequence_number
payload_length = int.from_bytes(message[2:4], byteorder='big',
signed=False)
payload = message[4:]
if payload_length == len(payload):
if message_type == 0x02: # GET_JOINT_ABS.
success = process_get_joint_abs_payload(payload)
elif message_type == 0x06: # GET_POSE_ABS.
success = process_get_pose_abs_payload(payload)
elif message_type == 0x11: # GET_MOTION_DONE. # Deprecated.
success = process_get_motion_done_payload(payload)
elif message_type == 0x21: # GET_INPUT.
success = process_get_input_payload(payload)
elif message_type == 0x23: # GET_OUTPUT.
success = process_get_output_payload(payload)
elif message_type == 0x50: # GET_STATUS.
success = process_get_status_payload(payload)
else:
print("Unhandled message type received.")
else:
print("Payload length does not match expected value.")
return success
def process_get_joint_abs_payload(payload):
success = False
joint_values = []
number_of_joints = int.from_bytes(payload[0:1], byteorder='big',
signed=False)
# Each joint value is 4 Bytes long.
if number_of_joints == (len(payload[1:]) / 4):
for joint_index in range(number_of_joints):
# Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 byte
# offset to first payload Byte.
raw_joint_value = int.from_bytes(payload[((joint_index * 4) + 1):((
(joint_index + 1) * 4) + 1)], byteorder='big', signed=True)
joint_values.append(decode_angular_value(raw_joint_value))
print("Joint values: ", joint_values)
success = True
return success
def process_get_pose_abs_payload(payload):
success = False
pose_values = []
number_of_dimensions = int.from_bytes(payload[0:1], byteorder='big',
signed=False)
# Each dimensional value is 4 Bytes long.
if (number_of_dimensions == (len(payload[1:]) / 4)) and \
(number_of_dimensions == 6):
for dimension_index in range(number_of_dimensions):
# Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 Byte
# offset to first payload Byte.
raw_dimensional_value = int.from_bytes(
payload[((dimension_index * 4) + 1):(((dimension_index + 1) * 4)
+ 1)], byteorder='big',
signed=True)
if dimension_index < 3: # Linear position values.
pose_values.append(decode_linear_value(raw_dimensional_value))
else: # Angular orientation values (ZYX Euler intrinsic).
pose_values.append(decode_angular_value(raw_dimensional_value))
print("Pose values: ", pose_values)
success = True
return success
def process_get_motion_done_payload(payload):
success = False
if len(payload) == 1: # Expected payload length: 1 Byte.
point_sequence_number = int.from_bytes(payload[0:1], byteorder='big',
signed=False)
print("Reached waypoint. Id: ", point_sequence_number)
success = True
return success
def process_get_input_payload(payload):
success = False
input_values = []
number_of_values = int.from_bytes(payload[0:1], byteorder='big',
signed=False)
# Each input value is 4 Bytes long.
if number_of_values == (len(payload[1:]) / 4):
# Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 byte
# offset to first payload Byte.
for value_index in range(number_of_values):
input_values.append(int.from_bytes(payload[((value_index * 4) + 1):(
((value_index + 1) * 4) + 1)], byteorder='big', signed=False
))
print("Input values: ", input_values)
success = True
return success
def process_get_output_payload(payload):
success = False
output_values = []
number_of_values = int.from_bytes(payload[0:1], byteorder='big',
signed=False)
# Each output value is 4 Bytes long.
if number_of_values == (len(payload[1:]) / 4):
# Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 byte
# offset to first payload Byte.
for value_index in range(number_of_values):
output_values.append(int.from_bytes(payload[((value_index * 4) + 1):
(((value_index + 1) * 4) + 1)], byteorder='big', signed=False))
print("Output values: ", output_values)
success = True
return success
def process_get_status_payload(payload):
success = False
if len(payload) >= 10: # Minimum payload length: 10 Byte.
status = int.from_bytes(payload[0:1], byteorder='big', signed=False)
error_flag = (status >> 7) & 0x01
status_group = (status >> 3) & 0x0f
status_sub_group = status & 0x7
rx_sequence_number = int.from_bytes(payload[1:2], byteorder='big',
signed=False)
rx_result = int.from_bytes(payload[2:3], byteorder='big', signed=True)
last_reached_waypoint = int.from_bytes(payload[3:4], byteorder='big',
signed=False)
last_error_status_sequence_number = int.from_bytes(payload[4:5],
byteorder='big',
signed=False)
last_error_status = int.from_bytes(payload[5:6], byteorder='big',
signed=False)
last_error_flag = (last_error_status >> 7) & 0x01
last_error_status_gorup = (last_error_status >> 3) & 0x0f
last_error_status_sub_gorup = last_error_status & 0x07
mapped_data_id = int.from_bytes(payload[6:7], byteorder='big',
signed=False)
mapped_data_length = int.from_bytes(payload[7:9], byteorder='big',
signed=False)
mapped_data = payload[9:]
print("Status: Error:", error_flag, "Group:", status_group,"Sub-Group:",
status_sub_group, "Rx Sequence Number:", rx_sequence_number,
"Rx Result:", rx_result, "Last Reached Waypoint:",
last_reached_waypoint, "Last Error Sequence Number",
last_error_status_sequence_number, "Last Error Flag:",
last_error_flag, "Last Error Group:", last_error_status_gorup,
"Last Error Sub-Group:", last_error_status_sub_gorup,
"Mapped Data ID:", mapped_data_id, "Mapped Data Length:",
mapped_data_length, "Mapped Data:", mapped_data)
success = True
return success
def decode_angular_value(raw_angular_value):
# Angular values are encoded as nano radians divided by pi.
return raw_angular_value * 1e-9 * math.pi
def decode_linear_value(raw_linear_value):
# Linear values are encoded as micro meters.
return raw_linear_value * 1e-6
if __name__ == "__main__":
main()
|
Transmitter¶
This program provides examples for sending different commands to the robot
control unit. For each command, there is a function that assembles an
appropriate message in accordance to section Messaging. This message can
then be snet to the robot control unit. Various exampels can be uncomented
inside the main()
function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | import math
import random
import socket
import time
IP_ADDR = "192.168.1.1"
PORT = 25000
MAX_MSG_LENGTH = 508
MIN_MSG_LENGTH = 4
_tx_sequence_number = random.randint(0, 255)
def main():
tx_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Command a two joint robot to move to joint angles -.2 .2 .
message = generate_set_joint_abs_message(1, 250, 120, [-.2, .2])
tx_socket.sendto(message, (IP_ADDR, PORT))
# # Command the robot to move to position .1, .1, .1 and orientation
# # (intrinsic ZYX Euler angles) .0, .0, pi.
# message = generate_set_pose_abs_message(1, 50, 120, [.1, .1, .1, .0, .0,
# math.pi])
# tx_socket.sendto(message, (IP_ADDR, PORT))
# # Jog joint 0 and joint 1. Keep the offsets low!
# # Forward.
# for idx in range(100):
# message = generate_set_joint_offs_message([.0005, .0005])
# tx_socket.sendto(message, (IP_ADDR, PORT))
# time.sleep(.01)
# # Backward.
# for idx in range(100):
# message = generate_set_joint_offs_message([-.0005, -.0005])
# tx_socket.sendto(message, (IP_ADDR, PORT))
# time.sleep(.01)
# # Set output 0 to enabled state.
# message = generate_set_output_message(0, True)
# tx_socket.sendto(message, (IP_ADDR, PORT))
# # Enable gravity compensation for a few seconds.
# for idx in range(500):
# message = generate_set_g_compensation_message()
# tx_socket.sendto(message, (IP_ADDR, PORT))
# time.sleep(.01)
# # Command the robot to stop all current and pending actions.
# message = generate_set_stop_message()
# tx_socket.sendto(message, (IP_ADDR, PORT))
def generate_set_joint_abs_message(point_sequence_number, approach_velocity,
approach_acceleration, joint_angles):
message = bytes()
message_type = 0x01 # SET_JOINT_ABS.
message += message_type.to_bytes(1, byteorder='big', signed=False)
message += get_message_sequence_number().to_bytes(1, byteorder='big',
signed=False)
payload_length = 5 + (len(joint_angles) * 4)
message += payload_length.to_bytes(2, byteorder='big', signed=False)
message += point_sequence_number.to_bytes(1, byteorder='big', signed=False)
approach_mode = 0x01 # Point-to-point.
message += approach_mode.to_bytes(1, byteorder='big', signed=False)
message += approach_velocity.to_bytes(1, byteorder='big', signed=False)
message += approach_acceleration.to_bytes(1, byteorder='big', signed=False)
number_of_joints = len(joint_angles)
message += number_of_joints.to_bytes(1, byteorder='big', signed=False)
for joint_angle in joint_angles:
encoded_joint_angle = encode_angular_value(joint_angle)
message += encoded_joint_angle.to_bytes(4, byteorder='big', signed=True)
return message
def generate_set_joint_offs_message(joint_offsets):
message = bytes()
message_type = 0x03 # SET_JOINT_OFFS.
message += message_type.to_bytes(1, byteorder='big', signed=False)
message += get_message_sequence_number().to_bytes(1, byteorder='big',
signed=False)
payload_length = 1 + (len(joint_offsets) * 4)
message += payload_length.to_bytes(2, byteorder='big', signed=False)
number_of_joints = len(joint_offsets)
message += number_of_joints.to_bytes(1, byteorder='big', signed=False)
for joint_offset in joint_offsets:
encoded_joint_offset = encode_angular_value(joint_offset)
message += encoded_joint_offset.to_bytes(4, byteorder='big', signed=True
)
return message
def generate_set_pose_abs_message(point_sequence_number, approach_velocity,
approach_acceleration, pose):
message = bytes()
message_type = 0x05 # SET_POSE_ABS.
message += message_type.to_bytes(1, byteorder='big', signed=False)
message += get_message_sequence_number().to_bytes(1, byteorder='big',
signed=False)
payload_length = 5 + (len(pose) * 4)
message += payload_length.to_bytes(2, byteorder='big', signed=False)
message += point_sequence_number.to_bytes(1, byteorder='big', signed=False)
approach_mode = 0x02 # Linear.
message += approach_mode.to_bytes(1, byteorder='big', signed=False)
message += approach_velocity.to_bytes(1, byteorder='big', signed=False)
message += approach_acceleration.to_bytes(1, byteorder='big', signed=False)
number_of_dimensions = len(pose)
message += number_of_dimensions.to_bytes(1, byteorder='big', signed=False)
for dimension_index in range(number_of_dimensions):
if dimension_index < 3:
encoded_position = encode_linear_value(pose[dimension_index])
message += encoded_position.to_bytes(4, byteorder='big', signed=True
)
else:
encoded_position = encode_angular_value(pose[dimension_index])
message += encoded_position.to_bytes(4, byteorder='big', signed=True
)
return message
def generate_set_output_message(index, value):
message = bytes()
message_type = 0x20 # SET_OUTPUT.
message += message_type.to_bytes(1, byteorder='big', signed=False)
message += get_message_sequence_number().to_bytes(1, byteorder='big',
signed=False)
payload_length = 4 + 4
message += payload_length.to_bytes(2, byteorder='big', signed=False)
message += index.to_bytes(4, byteorder='big', signed=False)
message += int(value).to_bytes(4, byteorder='big', signed=False)
return message
def generate_set_g_compensation_message():
message = bytes()
message_type = 0x41 # SET_G_COMPENSATION.
message += message_type.to_bytes(1, byteorder='big', signed=False)
message += get_message_sequence_number().to_bytes(1, byteorder='big',
signed=False)
payload_length = 1
message += payload_length.to_bytes(2, byteorder='big', signed=False)
mode = 0 # Fully compensated but respecting joint limits.
message += mode.to_bytes(1, byteorder='big', signed=False)
return message
def generate_set_stop_message():
message = bytes()
message_type = 0xEE # SET_STOP.
message += message_type.to_bytes(1, byteorder='big', signed=False)
message += get_message_sequence_number().to_bytes(1, byteorder='big',
signed=False)
payload_length = 1
message += payload_length.to_bytes(2, byteorder='big', signed=False)
rfu = 0 # Spacer. Reserved for future usage.
message += rfu.to_bytes(1, byteorder='big', signed=False)
return message
def encode_angular_value(angular_value):
# Angular values are encoded as nano radians divided by pi.
return int(angular_value / math.pi * 1e9)
def encode_linear_value(angular_value):
# Linear values are encoded as micro meters.
return int(angular_value * 1e6)
def get_message_sequence_number():
global _tx_sequence_number
if _tx_sequence_number >= 0xff:
_tx_sequence_number = 0x00
else:
_tx_sequence_number += 1
return _tx_sequence_number
if __name__ == "__main__":
main()
|