123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- """
- mbed SDK
- Copyright (c) 2011-2013 ARM Limited
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
-
- # Check if 'serial' module is installed
- try:
- from serial import Serial
- except ImportError, e:
- print "Error: Can't import 'serial' module: %s"% e
- exit(-1)
-
- import os
- import re
- import types
- from sys import stdout
- from time import sleep, time
- from optparse import OptionParser
-
- import host_tests_plugins
-
- # This is a little tricky. We need to add upper directory to path so
- # we can find packages we want from the same level as other files do
- import sys
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
-
-
- class Mbed:
- """ Base class for a host driven test
- """
- def __init__(self):
- parser = OptionParser()
-
- parser.add_option("-m", "--micro",
- dest="micro",
- help="The target microcontroller",
- metavar="MICRO")
-
- parser.add_option("-p", "--port",
- dest="port",
- help="The serial port of the target mbed",
- metavar="PORT")
-
- parser.add_option("-d", "--disk",
- dest="disk",
- help="The target disk path",
- metavar="DISK_PATH")
-
- parser.add_option("-f", "--image-path",
- dest="image_path",
- help="Path with target's image",
- metavar="IMAGE_PATH")
-
- parser.add_option("-c", "--copy",
- dest="copy_method",
- help="Copy method selector",
- metavar="COPY_METHOD")
-
- parser.add_option("-C", "--program_cycle_s",
- dest="program_cycle_s",
- help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target",
- type="float",
- metavar="COPY_METHOD")
-
- parser.add_option("-t", "--timeout",
- dest="timeout",
- help="Timeout",
- metavar="TIMEOUT")
-
- parser.add_option("-r", "--reset",
- dest="forced_reset_type",
- help="Forces different type of reset")
-
- parser.add_option("-R", "--reset-timeout",
- dest="forced_reset_timeout",
- metavar="NUMBER",
- type="int",
- help="When forcing a reset using option -r you can set up after reset timeout in seconds")
-
- (self.options, _) = parser.parse_args()
-
- self.DEFAULT_RESET_TOUT = 0
- self.DEFAULT_TOUT = 10
-
- if self.options.port is None:
- raise Exception("The serial port of the target mbed have to be provided as command line arguments")
-
- # Options related to copy / reset mbed device
- self.port = self.options.port
- self.disk = self.options.disk
- self.image_path = self.options.image_path.strip('"')
- self.copy_method = self.options.copy_method
- self.program_cycle_s = float(self.options.program_cycle_s)
-
- self.serial = None
- self.serial_baud = 9600
- self.serial_timeout = 1
-
- self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
- print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk)
-
- def init_serial_params(self, serial_baud=9600, serial_timeout=1):
- """ Initialize port parameters.
- This parameters will be used by self.init_serial() function to open serial port
- """
- self.serial_baud = serial_baud
- self.serial_timeout = serial_timeout
-
- def init_serial(self, serial_baud=None, serial_timeout=None):
- """ Initialize serial port.
- Function will return error is port can't be opened or initialized
- """
- # Overload serial port configuration from default to parameters' values if they are specified
- serial_baud = serial_baud if serial_baud is not None else self.serial_baud
- serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout
-
- # Clear serial port
- if self.serial:
- self.serial.close()
- self.serial = None
-
- # We will pool for serial to be re-mounted if it was unmounted after device reset
- result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking
-
- # Port can be opened
- if result:
- self.flush()
- return result
-
- def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25):
- """ Functions pools for serial port readiness
- """
- result = True
- last_error = None
- # This loop is used to check for serial port availability due to
- # some delays and remounting when devices are being flashed with new software.
- for i in range(pooling_loops):
- sleep(loop_delay if i else init_delay)
- try:
- self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
- except Exception as e:
- result = False
- last_error = "MBED: %s"% str(e)
- stdout.write('.')
- stdout.flush()
- else:
- print "...port ready!"
- result = True
- break
- if not result and last_error:
- print last_error
- return result
-
- def set_serial_timeout(self, timeout):
- """ Wraps self.mbed.serial object timeout property
- """
- result = None
- if self.serial:
- self.serial.timeout = timeout
- result = True
- return result
-
- def serial_read(self, count=1):
- """ Wraps self.mbed.serial object read method
- """
- result = None
- if self.serial:
- try:
- result = self.serial.read(count)
- except:
- result = None
- return result
-
- def serial_readline(self, timeout=5):
- """ Wraps self.mbed.serial object read method to read one line from serial port
- """
- result = ''
- start = time()
- while (time() - start) < timeout:
- if self.serial:
- try:
- c = self.serial.read(1)
- result += c
- except Exception as e:
- print "MBED: %s"% str(e)
- result = None
- break
- if c == '\n':
- break
- return result
-
- def serial_write(self, write_buffer):
- """ Wraps self.mbed.serial object write method
- """
- result = None
- if self.serial:
- try:
- result = self.serial.write(write_buffer)
- except:
- result = None
- return result
-
- def reset_timeout(self, timeout):
- """ Timeout executed just after reset command is issued
- """
- for n in range(0, timeout):
- sleep(1)
-
- def reset(self):
- """ Calls proper reset plugin to do the job.
- Please refer to host_test_plugins functionality
- """
- # Flush serials to get only input after reset
- self.flush()
- if self.options.forced_reset_type:
- result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk)
- else:
- result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial)
- # Give time to wait for the image loading
- reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT
- self.reset_timeout(reset_tout_s)
- return result
-
- def copy_image(self, image_path=None, disk=None, copy_method=None):
- """ Closure for copy_image_raw() method.
- Method which is actually copying image to mbed
- """
- # Set closure environment
- image_path = image_path if image_path is not None else self.image_path
- disk = disk if disk is not None else self.disk
- copy_method = copy_method if copy_method is not None else self.copy_method
- # Call proper copy method
- result = self.copy_image_raw(image_path, disk, copy_method)
- sleep(self.program_cycle_s)
- return result
-
- def copy_image_raw(self, image_path=None, disk=None, copy_method=None):
- """ Copy file depending on method you want to use. Handles exception
- and return code from shell copy commands.
- """
- if copy_method is not None:
- # image_path - Where is binary with target's firmware
- result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
- else:
- copy_method = 'default'
- result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
- return result;
-
- def flush(self):
- """ Flush serial ports
- """
- result = False
- if self.serial:
- self.serial.flushInput()
- self.serial.flushOutput()
- result = True
- return result
-
-
- class HostTestResults:
- """ Test results set by host tests
- """
- def __init__(self):
- self.RESULT_SUCCESS = 'success'
- self.RESULT_FAILURE = 'failure'
- self.RESULT_ERROR = 'error'
- self.RESULT_IO_SERIAL = 'ioerr_serial'
- self.RESULT_NO_IMAGE = 'no_image'
- self.RESULT_IOERR_COPY = "ioerr_copy"
- self.RESULT_PASSIVE = "passive"
- self.RESULT_NOT_DETECTED = "not_detected"
- self.RESULT_MBED_ASSERT = "mbed_assert"
-
-
- import workspace_tools.host_tests as host_tests
-
-
- class Test(HostTestResults):
- """ Base class for host test's test runner
- """
- # Select default host_test supervision (replaced after autodetection)
- test_supervisor = host_tests.get_host_test("default")
-
- def __init__(self):
- self.mbed = Mbed()
-
- def detect_test_config(self, verbose=False):
- """ Detects test case configuration
- """
- result = {}
- while True:
- line = self.mbed.serial_readline()
- if "{start}" in line:
- self.notify("HOST: Start test...")
- break
- else:
- # Detect if this is property from TEST_ENV print
- m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1])
- if m and len(m.groups()) == 2:
- # This is most likely auto-detection property
- result[m.group(1)] = m.group(2)
- if verbose:
- self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2)))
- else:
- # We can check if this is TArget Id in mbed specific format
- m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1])
- if m2 and len(m2.groups()) == 2:
- if verbose:
- target_id = m2.group(1) + m2.group(2)
- self.notify("HOST: TargetID '%s'"% target_id)
- self.notify(line[len(target_id):-1])
- else:
- self.notify("HOST: Unknown property: %s"% line.strip())
- return result
-
- def run(self):
- """ Test runner for host test. This function will start executing
- test and forward test result via serial port to test suite
- """
- # Copy image to device
- self.notify("HOST: Copy image onto target...")
- result = self.mbed.copy_image()
- if not result:
- self.print_result(self.RESULT_IOERR_COPY)
-
- # Initialize and open target's serial port (console)
- self.notify("HOST: Initialize serial port...")
- result = self.mbed.init_serial()
- if not result:
- self.print_result(self.RESULT_IO_SERIAL)
-
- # Reset device
- self.notify("HOST: Reset target...")
- result = self.mbed.reset()
- if not result:
- self.print_result(self.RESULT_IO_SERIAL)
-
- # Run test
- try:
- CONFIG = self.detect_test_config(verbose=True) # print CONFIG
-
- if "host_test_name" in CONFIG:
- if host_tests.is_host_test(CONFIG["host_test_name"]):
- self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"])
- result = self.test_supervisor.test(self) #result = self.test()
-
- if result is not None:
- self.print_result(result)
- else:
- self.notify("HOST: Passive mode...")
- except Exception, e:
- print str(e)
- self.print_result(self.RESULT_ERROR)
-
- def setup(self):
- """ Setup and check if configuration for test is
- correct. E.g. if serial port can be opened.
- """
- result = True
- if not self.mbed.serial:
- result = False
- self.print_result(self.RESULT_IO_SERIAL)
- return result
-
- def notify(self, message):
- """ On screen notification function
- """
- print message
- stdout.flush()
-
- def print_result(self, result):
- """ Test result unified printing function
- """
- self.notify("\r\n{{%s}}\r\n{{end}}" % result)
-
-
- class DefaultTestSelector(Test):
- """ Test class with serial port initialization
- """
- def __init__(self):
- HostTestResults.__init__(self)
- Test.__init__(self)
-
- if __name__ == '__main__':
- DefaultTestSelector().run()
|