|
|
|
@ -121,6 +121,36 @@ class SocketCANTest(unittest.TestCase): |
|
|
|
interface = 'vcan0' |
|
|
|
bufsize = 128 |
|
|
|
|
|
|
|
"""The CAN frame structure is defined in <linux/can.h>: |
|
|
|
|
|
|
|
struct can_frame { |
|
|
|
canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ |
|
|
|
__u8 can_dlc; /* data length code: 0 .. 8 */ |
|
|
|
__u8 data[8] __attribute__((aligned(8))); |
|
|
|
}; |
|
|
|
""" |
|
|
|
can_frame_fmt = "=IB3x8s" |
|
|
|
can_frame_size = struct.calcsize(can_frame_fmt) |
|
|
|
|
|
|
|
"""The Broadcast Management Command frame structure is defined |
|
|
|
in <linux/can/bcm.h>: |
|
|
|
|
|
|
|
struct bcm_msg_head { |
|
|
|
__u32 opcode; |
|
|
|
__u32 flags; |
|
|
|
__u32 count; |
|
|
|
struct timeval ival1, ival2; |
|
|
|
canid_t can_id; |
|
|
|
__u32 nframes; |
|
|
|
struct can_frame frames[0]; |
|
|
|
} |
|
|
|
|
|
|
|
`bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see |
|
|
|
`struct can_frame` definition). Must use native not standard types for packing. |
|
|
|
""" |
|
|
|
bcm_cmd_msg_fmt = "@3I4l2I" |
|
|
|
bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) |
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) |
|
|
|
self.addCleanup(self.s.close) |
|
|
|
@ -1291,10 +1321,35 @@ class BasicCANTest(unittest.TestCase): |
|
|
|
socket.PF_CAN |
|
|
|
socket.CAN_RAW |
|
|
|
|
|
|
|
@unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
|
|
|
'socket.CAN_BCM required for this test.') |
|
|
|
def testBCMConstants(self): |
|
|
|
socket.CAN_BCM |
|
|
|
|
|
|
|
# opcodes |
|
|
|
socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task |
|
|
|
socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task |
|
|
|
socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task |
|
|
|
socket.CAN_BCM_TX_SEND # send one CAN frame |
|
|
|
socket.CAN_BCM_RX_SETUP # create RX content filter subscription |
|
|
|
socket.CAN_BCM_RX_DELETE # remove RX content filter subscription |
|
|
|
socket.CAN_BCM_RX_READ # read properties of RX content filter subscription |
|
|
|
socket.CAN_BCM_TX_STATUS # reply to TX_READ request |
|
|
|
socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) |
|
|
|
socket.CAN_BCM_RX_STATUS # reply to RX_READ request |
|
|
|
socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent |
|
|
|
socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) |
|
|
|
|
|
|
|
def testCreateSocket(self): |
|
|
|
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
|
|
|
pass |
|
|
|
|
|
|
|
@unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
|
|
|
'socket.CAN_BCM required for this test.') |
|
|
|
def testCreateBCMSocket(self): |
|
|
|
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: |
|
|
|
pass |
|
|
|
|
|
|
|
def testBindAny(self): |
|
|
|
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
|
|
|
s.bind(('', )) |
|
|
|
@ -1327,19 +1382,8 @@ class BasicCANTest(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
|
|
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') |
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.') |
|
|
|
class CANTest(ThreadedCANSocketTest): |
|
|
|
|
|
|
|
"""The CAN frame structure is defined in <linux/can.h>: |
|
|
|
|
|
|
|
struct can_frame { |
|
|
|
canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ |
|
|
|
__u8 can_dlc; /* data length code: 0 .. 8 */ |
|
|
|
__u8 data[8] __attribute__((aligned(8))); |
|
|
|
}; |
|
|
|
""" |
|
|
|
can_frame_fmt = "=IB3x8s" |
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'): |
|
|
|
ThreadedCANSocketTest.__init__(self, methodName=methodName) |
|
|
|
|
|
|
|
@ -1388,6 +1432,46 @@ class CANTest(ThreadedCANSocketTest): |
|
|
|
self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') |
|
|
|
self.cli.send(self.cf2) |
|
|
|
|
|
|
|
@unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
|
|
|
'socket.CAN_BCM required for this test.') |
|
|
|
def _testBCM(self): |
|
|
|
cf, addr = self.cli.recvfrom(self.bufsize) |
|
|
|
self.assertEqual(self.cf, cf) |
|
|
|
can_id, can_dlc, data = self.dissect_can_frame(cf) |
|
|
|
self.assertEqual(self.can_id, can_id) |
|
|
|
self.assertEqual(self.data, data) |
|
|
|
|
|
|
|
@unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
|
|
|
'socket.CAN_BCM required for this test.') |
|
|
|
def testBCM(self): |
|
|
|
bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) |
|
|
|
self.addCleanup(bcm.close) |
|
|
|
bcm.connect((self.interface,)) |
|
|
|
self.can_id = 0x123 |
|
|
|
self.data = bytes([0xc0, 0xff, 0xee]) |
|
|
|
self.cf = self.build_can_frame(self.can_id, self.data) |
|
|
|
opcode = socket.CAN_BCM_TX_SEND |
|
|
|
flags = 0 |
|
|
|
count = 0 |
|
|
|
ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 |
|
|
|
bcm_can_id = 0x0222 |
|
|
|
nframes = 1 |
|
|
|
assert len(self.cf) == 16 |
|
|
|
header = struct.pack(self.bcm_cmd_msg_fmt, |
|
|
|
opcode, |
|
|
|
flags, |
|
|
|
count, |
|
|
|
ival1_seconds, |
|
|
|
ival1_usec, |
|
|
|
ival2_seconds, |
|
|
|
ival2_usec, |
|
|
|
bcm_can_id, |
|
|
|
nframes, |
|
|
|
) |
|
|
|
header_plus_frame = header + self.cf |
|
|
|
bytes_sent = bcm.send(header_plus_frame) |
|
|
|
self.assertEqual(bytes_sent, len(header_plus_frame)) |
|
|
|
|
|
|
|
|
|
|
|
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') |
|
|
|
class BasicRDSTest(unittest.TestCase): |
|
|
|
|