-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathi2c_responder_test.py
141 lines (115 loc) · 5.37 KB
/
i2c_responder_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""I2CResponder Test Application.
NOTE: This module uses I2C Controller/Responder nomenclature per
https://www.eetimes.com/its-time-for-ieee-to-retire-master-slave/
This test application runs on a single Raspberry Pico to exercise the API of the
I2CResponder() class.
To execute this test application you will need to wire I2C0 and I2C1 together as follows:
+---- 3.3V -------------------------------------------------------------------+
| |
| +======================= Pico ====================+ |
| 1K Ohm I I |
+--/\/\/\/----O--------I Pin 1 (GP0, I2C0 SDA) PIN 40 I |
| | I I |
| | +---I Pin 2 (GP1, I2C0 SCL) PIN 39 I |
| | | I I |
| | | I Pin 2 PIN 38 I |
| | | I I |
| |--------I Pin 4 (GP2, I2C1 SDA) PIN 37 I |
| 1K Ohm | I I |
+--/\/\/\/---------O---I Pin 5 (GP3, I2C1 SCL) (3V3(OUT)) PIN 36 I----+
I I
I I
"""
# Standard Library
from machine import Pin, I2C
import time
import _thread
# Local
from i2c_responder import I2CResponder
I2C_FREQUENCY = 100000
CONTROLLER_I2C_DEVICE_ID = 1
GPIO_CONTROLLER_SDA = 2
GPIO_CONTROLLER_SCL = 3
RESPONDER_I2C_DEVICE_ID = 0
RESPONDER_ADDRESS = 0x41
GPIO_RESPONDER_SDA = 0
GPIO_RESPONDER_SCL = 1
READBUFFER = [0, 0]
def main():
# -----------------
# Initialize Responder and Controller
# -----------------
i2c_responder = I2CResponder(
RESPONDER_I2C_DEVICE_ID, sda_gpio=GPIO_RESPONDER_SDA, scl_gpio=GPIO_RESPONDER_SCL, responder_address=RESPONDER_ADDRESS
)
i2c_controller = I2C(
CONTROLLER_I2C_DEVICE_ID,
scl=Pin(GPIO_CONTROLLER_SCL),
sda=Pin(GPIO_CONTROLLER_SDA),
freq=I2C_FREQUENCY,
)
print('Testing I2CResponder v' + i2c_responder.VERSION)
# -----------------
# Demonstrate that the Responder is responding at its assigned I2C address.
# -----------------
print('Scanning I2C Bus for Responders...')
responder_addresses = i2c_controller.scan()
print('I2C Addresses of Responders found: ' + format_hex(responder_addresses))
print()
# -----------------
# Demonstrate I2C WRITE
# -----------------
buffer_out = bytearray([0x01, 0x02])
print('Controller: Issuing I2C WRITE with data: ' + format_hex(buffer_out))
i2c_controller.writeto(RESPONDER_ADDRESS, buffer_out)
time.sleep(0.25)
print(' Responder: Getting I2C WRITE data...')
buffer_in = i2c_responder.get_write_data(max_size=len(buffer_out))
print(' Responder: Received I2C WRITE data: ' + format_hex(buffer_in))
print()
# -----------------
# Demonstrate I2C READ
# -----------------
# NOTE: We want the Controller to initiate an I2C READ, but the Responder implementation
# is polled. As soon as we execute i2c_controller.readfrom() we will block
# until the I2C bus supplies the requested data. But we need to have executional
# control so that we can poll i2c_responder.read_is_pending() and then supply the
# requested data. To circumvent the deadlock, we will briefly launch a thread on the
# second Pico core, and THAT thread will execute the .readfrom(). That thread will block
# while this thread polls, then supplies the requested data.
# -----------------
thread_lock = _thread.allocate_lock()
_thread.start_new_thread(thread_i2c_controller_read, (i2c_controller, thread_lock,))
buffer_out = bytearray([0x09, 0x08])
for value in buffer_out:
# We will loop here (polling) until the Controller (running on its own thread) issues an
# I2C READ.
while not i2c_responder.read_is_pending():
pass
i2c_responder.put_read_data(value)
with thread_lock:
print(' Responder: Transmitted I2C READ data: ' + format_hex(value))
time.sleep(1)
print('Controller: Received I2C READ data: ' + format_hex(READBUFFER))
def thread_i2c_controller_read(i2c_controller, thread_lock):
"""Issue an I2C READ on the Controller."""
with thread_lock:
print('Controller: Initiating I2C READ...')
# NOTE: This operation will BLOCK until the Responder supplies the requested
# data, which is why we are running it on a second thread (on the second
# Pico core).
data = i2c_controller.readfrom(RESPONDER_ADDRESS, 2)
for i, value in enumerate(data):
READBUFFER[i] = value
def format_hex(_object):
"""Format a value or list of values as 2 digit hex."""
try:
values_hex = [to_hex(value) for value in _object]
return '[{}]'.format(', '.join(values_hex))
except TypeError:
# The object is a single value
return to_hex(_object)
def to_hex(value):
return '0x{:02X}'.format(value)
if __name__ == "__main__":
main()