Keyboard firmwares for Atmel AVR and Cortex-M
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

udp_link_layer_auto.py 5.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. """
  2. mbed SDK
  3. Copyright (c) 2011-2013 ARM Limited
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. """
  14. """
  15. How to use:
  16. make.py -m LPC1768 -t ARM -d E:\ -n NET_14
  17. udp_link_layer_auto.py -p COM20 -d E:\ -t 10
  18. """
  19. import re
  20. import uuid
  21. import socket
  22. import thread
  23. from sys import stdout
  24. from time import time, sleep
  25. from host_test import DefaultTest
  26. from SocketServer import BaseRequestHandler, UDPServer
  27. # Received datagrams (with time)
  28. dict_udp_recv_datagrams = dict()
  29. # Sent datagrams (with time)
  30. dict_udp_sent_datagrams = dict()
  31. class UDPEchoClient_Handler(BaseRequestHandler):
  32. def handle(self):
  33. """ One handle per connection
  34. """
  35. _data, _socket = self.request
  36. # Process received datagram
  37. data_str = repr(_data)[1:-1]
  38. dict_udp_recv_datagrams[data_str] = time()
  39. def udp_packet_recv(threadName, server_ip, server_port):
  40. """ This function will receive packet stream from mbed device
  41. """
  42. server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
  43. print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)
  44. server.serve_forever()
  45. class UDPEchoServerTest(DefaultTest):
  46. ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
  47. ECHO_PORT = 0 # UDP port for datagram bursts
  48. CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
  49. s = None # Socket
  50. TEST_PACKET_COUNT = 1000 # how many packets should be send
  51. TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
  52. PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
  53. PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
  54. re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
  55. def get_control_data(self, command="stat\n"):
  56. BUFFER_SIZE = 256
  57. try:
  58. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  59. s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
  60. except Exception, e:
  61. data = None
  62. s.send(command)
  63. data = s.recv(BUFFER_SIZE)
  64. s.close()
  65. return data
  66. def test(self):
  67. serial_ip_msg = self.mbed.serial_readline()
  68. if serial_ip_msg is None:
  69. return self.RESULT_IO_SERIAL
  70. stdout.write(serial_ip_msg)
  71. stdout.flush()
  72. # Searching for IP address and port prompted by server
  73. m = self.re_detect_server_ip.search(serial_ip_msg)
  74. if m and len(m.groups()):
  75. self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
  76. self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
  77. self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
  78. # Open client socket to burst datagrams to UDP server in mbed
  79. try:
  80. self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  81. except Exception, e:
  82. self.s = None
  83. self.notify("HOST: Error: %s"% e)
  84. return self.RESULT_ERROR
  85. # UDP replied receiver works in background to get echoed datagrams
  86. SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
  87. SERVER_PORT = self.ECHO_PORT + 1
  88. thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
  89. sleep(0.5)
  90. # Burst part
  91. for no in range(self.TEST_PACKET_COUNT):
  92. TEST_STRING = str(uuid.uuid4())
  93. payload = str(no) + "__" + TEST_STRING
  94. self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
  95. dict_udp_sent_datagrams[payload] = time()
  96. sleep(self.TEST_STRESS_FACTOR)
  97. if self.s is not None:
  98. self.s.close()
  99. # Wait 5 seconds for packets to come
  100. result = True
  101. self.notify("HOST: Test Summary:")
  102. for d in range(5):
  103. sleep(1.0)
  104. summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
  105. self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
  106. summary_datagram_success,
  107. len(dict_udp_recv_datagrams),
  108. self.TEST_PACKET_COUNT,
  109. self.TEST_STRESS_FACTOR))
  110. result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
  111. stdout.flush()
  112. # Getting control data from test
  113. self.notify("...")
  114. self.notify("HOST: Mbed Summary:")
  115. mbed_stats = self.get_control_data()
  116. self.notify(mbed_stats)
  117. return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
  118. if __name__ == '__main__':
  119. UDPEchoServerTest().run()