Examples

The following examples will demonstrate the usage of the modular robot software interface and can serve as an entry point for own projects.

Preparation

  1. Assenble a 1-Joint robot that cannot colide with itself or the envitonment. Follow the instructions from section Module Assembly for assembling the robot.
  2. Wire the components as indicated in section General Setup.
  3. Connect your PC or laptop to the appropriate port of the control unit and set up your network configuration as indicated in Network Configuration.

Netcat

Note

Section under construction.

Basic demo, no programming required.

  • Use this command to instruct the robot to move a its first joint to 90 degrees (assuming a revolute joint).
  • Use this command ro instruct the robot to move the joint back to the original position.
  • Use this command to recueive a message from the robot.

Can also be done for robots with multiple joints. Be aware of the risk of collision and resulting damage or injuries when commanding a robot to move to random positions.

Python Basic

Here a basic basic python example for interacting with the robot from python in very few lines of of code is given. This program will make a robot joint incrementally move into one direction and cyclically alter the direction of motion after a few increments. You can directly copy the source code form here to a python file and execute it or paste the code directly into a pyhton interreter line by line.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import socket
import time

IP_ADDR = "192.168.1.1"
PORT = 25000

my_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
msg_move_positive = bytearray([0x03, 0x00, 0x00, 0x05, 0x01, 0x00, 0x04, 0xFF,
    0xFF]);
msg_move_negative = bytearray([0x03, 0x00, 0x00, 0x05, 0x01, 0xFF, 0xFB, 0x00,
    0x01]);
msg_sequence_number = 0x00
count = 0
while(True):
    msg_move_positive[1] = msg_sequence_number
    msg_move_negative[1] = msg_sequence_number
    if(count < 500):
    	my_sock.sendto(msg_move_positive, (IP_ADDR, PORT))
    else:
        my_sock.sendto(msg_move_negative, (IP_ADDR, PORT))
    if(count > 1000):
        count = 0
    count += 1
    if(msg_sequence_number < 255):
        msg_sequence_number += 1
    else:
        msg_sequence_number = 0
    time.sleep(.01);

Python Advanced

The following example code is meant to serve as a starting point for custom Python projects that involve communication with the robot.

Receiver

This smaple program receives and decodes messages from the robot control unit. There is a handler function for each GET_ message specified in section Payload named in accodrance to the names from section Payload prefixed with the word process_. In this example, the decoded content of each valid message is printed by a call to print() in each handler function . Replace this print() statement by your desired hadling code for the decoded message contents.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import math
import socket

IP_ADDR = "192.168.1.2"
PORT = 25001
MAX_MSG_LENGTH = 508
MIN_MSG_LENGTH = 4

_last_rx_mesage_sequence_number = 0x00


def main():
    rx_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    rx_socket.bind((IP_ADDR, PORT))
    while True:
        data, address = rx_socket.recvfrom(MAX_MSG_LENGTH)
        if len(data) > 0:
            success = process_message(data)
            if not success:
                print("Error decoding message.")


def process_message(message):
    global _last_rx_mesage_sequence_number
    success = False
    if(len(message) >= MIN_MSG_LENGTH) and (len(message) <= MAX_MSG_LENGTH):
        # Valid message size verified. Continue processing.
        message_type = int.from_bytes(message[0:1], byteorder='big',
                                      signed=False)
        message_sequence_number = int.from_bytes(message[1:2], byteorder='big',
                                                 signed=False)
        if message_sequence_number == _last_rx_mesage_sequence_number:
            print("Duplicate message detected.")
        _last_rx_mesage_sequence_number = message_sequence_number
        payload_length = int.from_bytes(message[2:4], byteorder='big',
                                        signed=False)
        payload = message[4:]
        if payload_length == len(payload):
            if message_type == 0x02:  # GET_JOINT_ABS.
                success = process_get_joint_abs_payload(payload)
            elif message_type == 0x06:  # GET_POSE_ABS.
                success = process_get_pose_abs_payload(payload)
            elif message_type == 0x11:  # GET_MOTION_DONE. # Deprecated.
                success = process_get_motion_done_payload(payload)
            elif message_type == 0x21:  # GET_INPUT.
                success = process_get_input_payload(payload)
            elif message_type == 0x23:  # GET_OUTPUT.
                success = process_get_output_payload(payload) 
            elif message_type == 0x50:  # GET_STATUS.
                success = process_get_status_payload(payload)
            else:
                print("Unhandled message type received.")
        else:
            print("Payload length does not match expected value.")
    return success


def process_get_joint_abs_payload(payload):
    success = False
    joint_values = []
    number_of_joints = int.from_bytes(payload[0:1], byteorder='big',
                                      signed=False)
    # Each joint value is 4 Bytes long.
    if number_of_joints == (len(payload[1:]) / 4):
        for joint_index in range(number_of_joints):
            # Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 byte
            # offset to first payload Byte.
            raw_joint_value = int.from_bytes(payload[((joint_index * 4) + 1):((
                    (joint_index + 1) * 4) + 1)], byteorder='big', signed=True)
            joint_values.append(decode_angular_value(raw_joint_value))
        print("Joint values: ", joint_values)
        success = True
    return success


def process_get_pose_abs_payload(payload):
    success = False
    pose_values = []
    number_of_dimensions = int.from_bytes(payload[0:1], byteorder='big',
                                          signed=False)
    # Each dimensional value is 4 Bytes long.
    if (number_of_dimensions == (len(payload[1:]) / 4)) and \
            (number_of_dimensions == 6):
        for dimension_index in range(number_of_dimensions):
            # Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 Byte
            # offset to first payload Byte.
            raw_dimensional_value = int.from_bytes(
                payload[((dimension_index * 4) + 1):(((dimension_index + 1) * 4)
                                                     + 1)], byteorder='big',
                signed=True)
            if dimension_index < 3:  # Linear position values.
                pose_values.append(decode_linear_value(raw_dimensional_value))
            else:  # Angular orientation values (ZYX Euler intrinsic).
                pose_values.append(decode_angular_value(raw_dimensional_value))
        print("Pose values: ", pose_values)
        success = True
    return success


def process_get_motion_done_payload(payload):
    success = False
    if len(payload) == 1:  # Expected payload length: 1 Byte.
        point_sequence_number = int.from_bytes(payload[0:1], byteorder='big',
                                               signed=False)
        print("Reached waypoint. Id: ", point_sequence_number)
        success = True
    return success


def process_get_input_payload(payload):
    success = False
    input_values = []
    number_of_values = int.from_bytes(payload[0:1], byteorder='big',
                                      signed=False)
    # Each input value is 4 Bytes long.
    if number_of_values == (len(payload[1:]) / 4):
        # Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 byte
        # offset to first payload Byte.
        for value_index in range(number_of_values):
            input_values.append(int.from_bytes(payload[((value_index * 4) + 1):(
                    ((value_index + 1) * 4) + 1)], byteorder='big', signed=False
                    ))
        print("Input values: ", input_values)
        success = True
    return success


def process_get_output_payload(payload):
    success = False
    output_values = []
    number_of_values = int.from_bytes(payload[0:1], byteorder='big',
                                      signed=False)
    # Each output value is 4 Bytes long.
    if number_of_values == (len(payload[1:]) / 4):
        # Iterate over 32 Bit integers: 4 Bytes per encoded value, 1 byte
        # offset to first payload Byte.
        for value_index in range(number_of_values):
            output_values.append(int.from_bytes(payload[((value_index * 4) + 1):
                 (((value_index + 1) * 4) + 1)], byteorder='big', signed=False))
        print("Output values: ", output_values)
        success = True
    return success


def process_get_status_payload(payload):
    success = False
    if len(payload) >= 10:  # Minimum payload length: 10 Byte.
        status = int.from_bytes(payload[0:1], byteorder='big', signed=False)
        error_flag = (status >> 7) & 0x01
        status_group = (status >> 3) & 0x0f
        status_sub_group = status & 0x7
        rx_sequence_number = int.from_bytes(payload[1:2], byteorder='big',
                                            signed=False)
        rx_result = int.from_bytes(payload[2:3], byteorder='big', signed=True)
        last_reached_waypoint = int.from_bytes(payload[3:4], byteorder='big',
                                               signed=False)
        last_error_status_sequence_number = int.from_bytes(payload[4:5],
                                                           byteorder='big',
                                                           signed=False)
        last_error_status = int.from_bytes(payload[5:6], byteorder='big',
                                           signed=False)
        last_error_flag = (last_error_status >> 7) & 0x01
        last_error_status_gorup = (last_error_status >> 3) & 0x0f
        last_error_status_sub_gorup = last_error_status & 0x07
        mapped_data_id = int.from_bytes(payload[6:7], byteorder='big',
                                        signed=False)
        mapped_data_length = int.from_bytes(payload[7:9], byteorder='big',
                                            signed=False)
        mapped_data = payload[9:]
        print("Status: Error:", error_flag, "Group:", status_group,"Sub-Group:",
                status_sub_group, "Rx Sequence Number:", rx_sequence_number,
                "Rx Result:", rx_result, "Last Reached Waypoint:",
                last_reached_waypoint, "Last Error Sequence Number",
                last_error_status_sequence_number, "Last Error Flag:",
                last_error_flag, "Last Error Group:", last_error_status_gorup,
                "Last Error Sub-Group:", last_error_status_sub_gorup,
                "Mapped Data ID:", mapped_data_id, "Mapped Data Length:",
                mapped_data_length, "Mapped Data:", mapped_data)
        success = True
    return success


def decode_angular_value(raw_angular_value):
    # Angular values are encoded as nano radians divided by pi.
    return raw_angular_value * 1e-9 * math.pi


def decode_linear_value(raw_linear_value):
    # Linear values are encoded as micro meters.
    return raw_linear_value * 1e-6


if __name__ == "__main__":
    main()

Transmitter

This program provides examples for sending different commands to the robot control unit. For each command, there is a function that assembles an appropriate message in accordance to section Messaging. This message can then be snet to the robot control unit. Various exampels can be uncomented inside the main() function.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import math
import random
import socket
import time

IP_ADDR = "192.168.1.1"
PORT = 25000
MAX_MSG_LENGTH = 508
MIN_MSG_LENGTH = 4

_tx_sequence_number = random.randint(0, 255)


def main():
    tx_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # Command a two joint robot to move to joint angles -.2 .2 .
    message = generate_set_joint_abs_message(1, 250, 120, [-.2, .2])
    tx_socket.sendto(message, (IP_ADDR, PORT))

#    # Command the robot to move to position .1, .1, .1 and orientation
#    # (intrinsic ZYX Euler angles) .0, .0, pi.
#    message = generate_set_pose_abs_message(1, 50, 120, [.1, .1, .1, .0, .0,
#                                                          math.pi])
#    tx_socket.sendto(message, (IP_ADDR, PORT))

#    # Jog joint 0 and joint 1. Keep the offsets low!
#    # Forward.
#    for idx in range(100):
#        message = generate_set_joint_offs_message([.0005, .0005])
#        tx_socket.sendto(message, (IP_ADDR, PORT))
#        time.sleep(.01)
#    # Backward.
#    for idx in range(100):
#        message = generate_set_joint_offs_message([-.0005, -.0005])
#        tx_socket.sendto(message, (IP_ADDR, PORT))
#        time.sleep(.01)

#    # Set output 0 to enabled state.
#    message = generate_set_output_message(0, True)
#    tx_socket.sendto(message, (IP_ADDR, PORT))

#    # Enable gravity compensation for a few seconds.
#    for idx in range(500):
#        message = generate_set_g_compensation_message()
#        tx_socket.sendto(message, (IP_ADDR, PORT))
#        time.sleep(.01)

#    # Command the robot to stop all current and pending actions.
#    message = generate_set_stop_message()
#    tx_socket.sendto(message, (IP_ADDR, PORT))


def generate_set_joint_abs_message(point_sequence_number, approach_velocity,
                                   approach_acceleration, joint_angles):
    message = bytes()
    message_type = 0x01  # SET_JOINT_ABS.
    message += message_type.to_bytes(1, byteorder='big', signed=False)
    message += get_message_sequence_number().to_bytes(1, byteorder='big',
                                                signed=False)
    payload_length = 5 + (len(joint_angles) * 4)
    message += payload_length.to_bytes(2, byteorder='big', signed=False)
    message += point_sequence_number.to_bytes(1, byteorder='big', signed=False)
    approach_mode = 0x01  # Point-to-point.
    message += approach_mode.to_bytes(1, byteorder='big', signed=False)
    message += approach_velocity.to_bytes(1, byteorder='big', signed=False)
    message += approach_acceleration.to_bytes(1, byteorder='big', signed=False)
    number_of_joints = len(joint_angles)
    message += number_of_joints.to_bytes(1, byteorder='big', signed=False)
    for joint_angle in joint_angles:
        encoded_joint_angle = encode_angular_value(joint_angle)
        message += encoded_joint_angle.to_bytes(4, byteorder='big', signed=True)
    return message


def generate_set_joint_offs_message(joint_offsets):
    message = bytes()
    message_type = 0x03  # SET_JOINT_OFFS.
    message += message_type.to_bytes(1, byteorder='big', signed=False)
    message += get_message_sequence_number().to_bytes(1, byteorder='big',
                                                signed=False)
    payload_length = 1 + (len(joint_offsets) * 4)
    message += payload_length.to_bytes(2, byteorder='big', signed=False)
    number_of_joints = len(joint_offsets)
    message += number_of_joints.to_bytes(1, byteorder='big', signed=False)
    for joint_offset in joint_offsets:
        encoded_joint_offset = encode_angular_value(joint_offset)
        message += encoded_joint_offset.to_bytes(4, byteorder='big', signed=True
                                                 )
    return message


def generate_set_pose_abs_message(point_sequence_number, approach_velocity,
                                  approach_acceleration, pose):
    message = bytes()
    message_type = 0x05  # SET_POSE_ABS.
    message += message_type.to_bytes(1, byteorder='big', signed=False)
    message += get_message_sequence_number().to_bytes(1, byteorder='big',
                                                signed=False)
    payload_length = 5 + (len(pose) * 4)
    message += payload_length.to_bytes(2, byteorder='big', signed=False)
    message += point_sequence_number.to_bytes(1, byteorder='big', signed=False)
    approach_mode = 0x02  # Linear.
    message += approach_mode.to_bytes(1, byteorder='big', signed=False)
    message += approach_velocity.to_bytes(1, byteorder='big', signed=False)
    message += approach_acceleration.to_bytes(1, byteorder='big', signed=False)
    number_of_dimensions = len(pose)
    message += number_of_dimensions.to_bytes(1, byteorder='big', signed=False)
    for dimension_index in range(number_of_dimensions):
        if dimension_index < 3:
            encoded_position = encode_linear_value(pose[dimension_index])
            message += encoded_position.to_bytes(4, byteorder='big', signed=True
                                                 )
        else:
            encoded_position = encode_angular_value(pose[dimension_index])
            message += encoded_position.to_bytes(4, byteorder='big', signed=True
                                                 )
    return message


def generate_set_output_message(index, value):
    message = bytes()
    message_type = 0x20  # SET_OUTPUT.
    message += message_type.to_bytes(1, byteorder='big', signed=False)
    message += get_message_sequence_number().to_bytes(1, byteorder='big',
                                                signed=False)
    payload_length = 4 + 4
    message += payload_length.to_bytes(2, byteorder='big', signed=False)
    message += index.to_bytes(4, byteorder='big', signed=False)
    message += int(value).to_bytes(4, byteorder='big', signed=False)
    return message


def generate_set_g_compensation_message():
    message = bytes()
    message_type = 0x41  # SET_G_COMPENSATION.
    message += message_type.to_bytes(1, byteorder='big', signed=False)
    message += get_message_sequence_number().to_bytes(1, byteorder='big',
                                                signed=False)
    payload_length = 1
    message += payload_length.to_bytes(2, byteorder='big', signed=False)
    mode = 0  # Fully compensated but respecting joint limits.
    message += mode.to_bytes(1, byteorder='big', signed=False)
    return message


def generate_set_stop_message():
    message = bytes()
    message_type = 0xEE  # SET_STOP.
    message += message_type.to_bytes(1, byteorder='big', signed=False)
    message += get_message_sequence_number().to_bytes(1, byteorder='big',
                                                signed=False)
    payload_length = 1
    message += payload_length.to_bytes(2, byteorder='big', signed=False)
    rfu = 0  # Spacer. Reserved for future usage.
    message += rfu.to_bytes(1, byteorder='big', signed=False)
    return message


def encode_angular_value(angular_value):
    # Angular values are encoded as nano radians divided by pi.
    return int(angular_value / math.pi * 1e9)


def encode_linear_value(angular_value):
    # Linear values are encoded as micro meters.
    return int(angular_value * 1e6)


def get_message_sequence_number():
    global _tx_sequence_number
    if _tx_sequence_number >= 0xff:
        _tx_sequence_number = 0x00
    else:
        _tx_sequence_number += 1
    return _tx_sequence_number


if __name__ == "__main__":
    main()