From e0d404e122202c25c85dcebedcbd567837068b65 Mon Sep 17 00:00:00 2001 From: Python3pkg Date: Mon, 22 May 2017 10:29:25 -0700 Subject: [PATCH] Convert to Python3 --- pymodel/FSM.py | 206 +++---- pymodel/ModelProgram.py | 389 ++++++------- pymodel/ProductModelProgram.py | 559 ++++++++++--------- pymodel/TestSuite.py | 235 ++++---- pymodel/pma.py | 10 +- pymodel/pmg.py | 4 +- pymodel/pmt.py | 18 +- pymodel/pmv.py | 6 +- pymodel/trun.py | 4 +- pymodel/wsgirunner.py | 4 +- samples/Socket/stepper.py | 2 +- samples/Socket/stepper_a.py | 2 +- samples/Socket/stepper_d.py | 2 +- samples/Socket/stepper_o.py | 2 +- samples/Socket/stepper_util.py | 8 +- samples/Timeout/Stepper.py | 2 +- samples/WebApplication/Session.py | 160 +++--- samples/WebApplication/Stepper.py | 284 +++++----- samples/WebApplication/WSGIVerboseConfig.py | 2 +- samples/WebApplication/webapp.py | 358 ++++++------ samples/tracemultiplexer/tracemultiplexer.py | 410 +++++++------- 21 files changed, 1335 insertions(+), 1332 deletions(-) diff --git a/pymodel/FSM.py b/pymodel/FSM.py index 2fcef4a..2c04987 100644 --- a/pymodel/FSM.py +++ b/pymodel/FSM.py @@ -1,103 +1,103 @@ -""" -Interface to an FSM module (graph) used by ProductModelProgram -""" - -from model import Model - -class FSM(Model): - - def __init__(self, module, exclude, include): - Model.__init__(self, module, exclude, include) - - def post_init(self): - """ - Now that all modules have been imported and executed their __init__ - do a postprocessing pass - to process metadata that might be affected by configuration modules - """ - # Do all of this work here rather than in __init__ - # so it can include the effects of any pymodel config modules - - # Make copies of collections that may be altered later - # self.actions is not used in this module outside this __init__ - # BUT it is used in several places in Model and ProductModelProgram - # EnabledTransitions below works directly on self.module.graph, - # not self.actions - if not hasattr(self.module, 'actions'): - self.actions = list(self.actions_in_graph()) # default, make copy - else: - self.actions = list(self.module.actions) # copy - Model.post_init(self) # uses self.actions - # Construct self.graph like self.module.graph - # BUT also accounts for include, exclude via self.actions - self.graph = [ (current, (action,args,result), next) - for (current, (action,args,result), next) in - self.module.graph if action in self.actions ] - # prepare for first run - self.current = self.module.initial # raise exception if module is not FSM - - - def actions_in_graph(self): - return tuple(set([ action for (current, (action,args,result), next) in - self.module.graph])) # not self.graph, here ONLY - - def make_properties(self, state): - return { 'accepting': state in self.module.accepting, 'statefilter': True, - 'stateinvariant': True } - - def Properties(self): - return self.make_properties(self.current) - - def Reset(self): # needed by stepper - self.current = self.module.initial - - def CleanupGraph(self, cleanup=False): - """ - if cleanup, return the graph with just the cleanup transitions - """ - if cleanup: - graph = [(current,(action,args,result),next) - for (current,(action,args,result),next) in self.graph - if action in self.module.cleanup] - else: - graph = self.graph - return graph - - def ActionEnabled(self, a, args): - """ - action a with args is enabled in the current state - """ - # no cleanup check here - # any args matches empty arguments in FSM - return any([(a == action and (not arguments or args == arguments)) - for (current,(action, arguments, result),next) - in self.graph - if current == self.current ]) - - def EnabledTransitions(self, cleanup=False): - """ - Return list tuples for all enabled actions: - (action, args, next state, properties) - """ - graph = self.CleanupGraph(cleanup) - return [(action, args, result, next, self.make_properties(next)) - for (current,(action,args,result),next) in graph - if current == self.current ] - - def DoAction(self, a, arguments): - ts = [(current,(action,args,result),next) - for (current,(action,args,result),next) in self.graph - if current == self.current and action == a - and args == arguments[0:len(args)]] #ignore extra trailing args - # print 'List ts %s' % ts # DEBUG - # Might be nondet. FSM: for now, just pick first transition that matches - current, (action,args,result), self.current = ts[0] - return result - - def Current(self): - return self.current - - def Restore(self, state): - self.current = state - - # GetNext not needed +""" +Interface to an FSM module (graph) used by ProductModelProgram +""" + +from .model import Model + +class FSM(Model): + + def __init__(self, module, exclude, include): + Model.__init__(self, module, exclude, include) + + def post_init(self): + """ + Now that all modules have been imported and executed their __init__ + do a postprocessing pass + to process metadata that might be affected by configuration modules + """ + # Do all of this work here rather than in __init__ + # so it can include the effects of any pymodel config modules + + # Make copies of collections that may be altered later + # self.actions is not used in this module outside this __init__ + # BUT it is used in several places in Model and ProductModelProgram + # EnabledTransitions below works directly on self.module.graph, + # not self.actions + if not hasattr(self.module, 'actions'): + self.actions = list(self.actions_in_graph()) # default, make copy + else: + self.actions = list(self.module.actions) # copy + Model.post_init(self) # uses self.actions + # Construct self.graph like self.module.graph + # BUT also accounts for include, exclude via self.actions + self.graph = [ (current, (action,args,result), next) + for (current, (action,args,result), next) in + self.module.graph if action in self.actions ] + # prepare for first run + self.current = self.module.initial # raise exception if module is not FSM + + + def actions_in_graph(self): + return tuple(set([ action for (current, (action,args,result), next) in + self.module.graph])) # not self.graph, here ONLY + + def make_properties(self, state): + return { 'accepting': state in self.module.accepting, 'statefilter': True, + 'stateinvariant': True } + + def Properties(self): + return self.make_properties(self.current) + + def Reset(self): # needed by stepper + self.current = self.module.initial + + def CleanupGraph(self, cleanup=False): + """ + if cleanup, return the graph with just the cleanup transitions + """ + if cleanup: + graph = [(current,(action,args,result),next) + for (current,(action,args,result),next) in self.graph + if action in self.module.cleanup] + else: + graph = self.graph + return graph + + def ActionEnabled(self, a, args): + """ + action a with args is enabled in the current state + """ + # no cleanup check here + # any args matches empty arguments in FSM + return any([(a == action and (not arguments or args == arguments)) + for (current,(action, arguments, result),next) + in self.graph + if current == self.current ]) + + def EnabledTransitions(self, cleanup=False): + """ + Return list tuples for all enabled actions: + (action, args, next state, properties) + """ + graph = self.CleanupGraph(cleanup) + return [(action, args, result, next, self.make_properties(next)) + for (current,(action,args,result),next) in graph + if current == self.current ] + + def DoAction(self, a, arguments): + ts = [(current,(action,args,result),next) + for (current,(action,args,result),next) in self.graph + if current == self.current and action == a + and args == arguments[0:len(args)]] #ignore extra trailing args + # print 'List ts %s' % ts # DEBUG + # Might be nondet. FSM: for now, just pick first transition that matches + current, (action,args,result), self.current = ts[0] + return result + + def Current(self): + return self.current + + def Restore(self, state): + self.current = state + + # GetNext not needed diff --git a/pymodel/ModelProgram.py b/pymodel/ModelProgram.py index 64c0e16..9fce372 100644 --- a/pymodel/ModelProgram.py +++ b/pymodel/ModelProgram.py @@ -1,194 +1,195 @@ -""" -Interface to a model program (python module) used by ProductModelProgram -""" - -# Actions a are still function objects (not aname strings) here -# because composition happens in ProductModelProgram, a level above. - -import sys -import copy -import inspect -import itertools -from model import Model - -class ModelProgram(Model): - - def __init__(self, module, exclude, include): - Model.__init__(self, module, exclude, include) - - def post_init(self): - """ - Now that all modules have been imported and executed their __init__ - do a postprocessing pass - to process metadata that might be affected by configuration modules - """ - # Do all of this work here rather than in __init__ - # so it can include the effects of any pymodel config modules - - # recognize PEP-8 style names (all lowercase) if present - if hasattr(self.module, 'accepting'): - self.module.Accepting = self.module.accepting - if hasattr(self.module, 'statefilter'): - self.module.StateFilter = self.module.statefilter - if hasattr(self.module, 'state_filter'): - self.module.StateFilter = self.module.state_filter - if hasattr(self.module, 'stateinvariant'): - self.module.StateInvariant = self.module.stateinvariant - if hasattr(self.module, 'state_invariant'): - self.module.StateInvariant = self.module.state_invariant - if hasattr(self.module, 'reset'): - self.module.Reset = self.module.reset - - # assign defaults to optional attributes - # cleanup and observables are handled in Models base class - if not hasattr(self.module, 'enablers'): - self.module.enablers = dict() # all actions always enabled - if not hasattr(self.module, 'domains'): - self.module.domains = dict() # empty domains - if not hasattr(self.module, 'combinations'): - self.module.combinations = dict() # 'all', Cartesian product - if not hasattr(self.module, 'Accepting'): - self.module.Accepting = self.TrueDefault - if not hasattr(self.module, 'StateFilter'): - self.module.StateFilter = self.TrueDefault - if not hasattr(self.module, 'StateInvariant'): - self.module.StateInvariant = self.TrueDefault - - # Make copies of collections that may be altered by post_init - self.actions = list(self.module.actions) - Model.post_init(self) # uses self.actions - - - def make_argslist(self, a): - """ - Parameter generator: return list of all args combos for action symbol a - """ - arginfo = inspect.getargspec(a)# ArgInfo(args,varargs,keywords,locals) - if arginfo[0]: - args = arginfo[0] # usual case: fixed sequence of positional arguments - elif arginfo[1]: - args = [arginfo[1]] # special case: either no arg, or one exception arg - else: - args = () # no arguments anywhere, args must have this value - domains = [ self.module.domains[a][arg]() # evaluate state-dependent domain - if callable(self.module.domains[a][arg]) - else self.module.domains[a][arg] # look up static domain - for arg in args if a in self.module.domains ] - combination = self.module.combinations.get(a, 'all') # default is 'all' - if combination == 'cases': # as many args as items in smallest domain - argslists = zip(*domains) - elif combination == 'all': # Cartesian product - argslists = itertools.product(*domains) - # might be nice to support 'pairwise' also - # return tuple not list, hashable so it can be key in dictionary - # also handle special case (None,) indicates empty argslist in domains - return tuple([() if x == (None,) else x for x in argslists ]) - - def ParamGen(self): - #print 'ModelProgram ParamGen for', self.module.__name__ #DEBUG - #print ' actions ', self.actions - #print ' domains ', self.module.domains - self.argslists = dict([(a, self.make_argslist(a)) - for a in self.actions - if not a in self.module.observables ]) - - def TrueDefault(self): - return True - - def Properties(self): - return { 'accepting': self.module.Accepting(), - 'statefilter': self.module.StateFilter(), - 'stateinvariant': self.module.StateInvariant() } - - def Reset(self): - try: - self.module.Reset() - except AttributeError: # Reset is optional, but there is no default - print 'No Reset function for model program %s' % self.module.__name__ - sys.exit() - - def ActionEnabled(self, a, args): - """ - action a with args is enabled in the current state - """ - if a not in self.module.enablers: - return True - else: - # Assumes enablers[a] has only one item, always true in this version - a_enabled = self.module.enablers[a][0] - nparams = len(inspect.getargspec(a_enabled)[0]) - nargs = len(args) - # nparams == 0 means match any args - if nparams > 0 and nparams != nargs: - print 'Error: %s needs %s arguments, got %s. Check parameter generator.' %\ - (a_enabled.__name__, nparams, nargs) - sys.exit(1) # Don't return, just exit with error status - else: - if nparams > 0: - return a_enabled(*args) - else: - return a_enabled() # nparams == 0 means match any args - - def Transitions(self, actions, argslists): - """ - Return tuple for all enabled transitions: - (action, args, result, next state, properties) - Pass appropriate params for observable or controllable actions + argslists - """ - enabled = list() - for a in actions: - enabled += [(a, args) + self.GetNext(a,args) # (a,args,result,next,prop's) - for args in argslists[a] if self.ActionEnabled(a, args) ] - return [(a,args,result,next,properties) - for (a,args,result,next,properties) in enabled - if properties['statefilter']] - - def EnabledTransitions(self, argslists, cleanup=False): - """ - Return tuple for all enabled observable and controllable transitions: - (action, args, result, next state, properties) - """ - actions = self.cleanup if cleanup else self.actions - controllableActions = set(actions) - set(self.module.observables) - observableActions = set(argslists.keys()) & set(self.module.observables) - if cleanup: - observableActions = set(observableActions) & set(self.cleanup) - enabled = list() - # Controllable actions use self.argslists assigned by ParamGen - enabled += self.Transitions(controllableActions, self.argslists) - # Observable actions use argslists parameter here - enabled += self.Transitions(observableActions, argslists) - return enabled - - def DoAction(self, a, args): - """ - Execute action in model, update state, - """ - return a(*args) - - def Current(self): - """ - Return current state, a dictionary of variable names to values - This is used to save/restore states, so make deep copies of values - """ - return dict([(vname, copy.deepcopy(getattr(self.module, vname))) - for vname in self.module.state ]) - - def Restore(self, state): - """ - Restore state - """ - self.module.__dict__.update(state) - - def GetNext(self, a, args): - """ - Return result and next state, given action and args. - Nondestructive, restores state. - also return dict. of properties of next state as third element in tuple - """ - saved = self.Current() - result = self.DoAction(a, args) - next = self.Current() - properties = self.Properties() # accepting state, etc. - self.Restore(saved) - return (result, next, properties) +""" +Interface to a model program (python module) used by ProductModelProgram +""" + +# Actions a are still function objects (not aname strings) here +# because composition happens in ProductModelProgram, a level above. + +import sys +import copy +import inspect +import itertools +from .model import Model +import collections + +class ModelProgram(Model): + + def __init__(self, module, exclude, include): + Model.__init__(self, module, exclude, include) + + def post_init(self): + """ + Now that all modules have been imported and executed their __init__ + do a postprocessing pass + to process metadata that might be affected by configuration modules + """ + # Do all of this work here rather than in __init__ + # so it can include the effects of any pymodel config modules + + # recognize PEP-8 style names (all lowercase) if present + if hasattr(self.module, 'accepting'): + self.module.Accepting = self.module.accepting + if hasattr(self.module, 'statefilter'): + self.module.StateFilter = self.module.statefilter + if hasattr(self.module, 'state_filter'): + self.module.StateFilter = self.module.state_filter + if hasattr(self.module, 'stateinvariant'): + self.module.StateInvariant = self.module.stateinvariant + if hasattr(self.module, 'state_invariant'): + self.module.StateInvariant = self.module.state_invariant + if hasattr(self.module, 'reset'): + self.module.Reset = self.module.reset + + # assign defaults to optional attributes + # cleanup and observables are handled in Models base class + if not hasattr(self.module, 'enablers'): + self.module.enablers = dict() # all actions always enabled + if not hasattr(self.module, 'domains'): + self.module.domains = dict() # empty domains + if not hasattr(self.module, 'combinations'): + self.module.combinations = dict() # 'all', Cartesian product + if not hasattr(self.module, 'Accepting'): + self.module.Accepting = self.TrueDefault + if not hasattr(self.module, 'StateFilter'): + self.module.StateFilter = self.TrueDefault + if not hasattr(self.module, 'StateInvariant'): + self.module.StateInvariant = self.TrueDefault + + # Make copies of collections that may be altered by post_init + self.actions = list(self.module.actions) + Model.post_init(self) # uses self.actions + + + def make_argslist(self, a): + """ + Parameter generator: return list of all args combos for action symbol a + """ + arginfo = inspect.getargspec(a)# ArgInfo(args,varargs,keywords,locals) + if arginfo[0]: + args = arginfo[0] # usual case: fixed sequence of positional arguments + elif arginfo[1]: + args = [arginfo[1]] # special case: either no arg, or one exception arg + else: + args = () # no arguments anywhere, args must have this value + domains = [ self.module.domains[a][arg]() # evaluate state-dependent domain + if isinstance(self.module.domains[a][arg], collections.Callable) + else self.module.domains[a][arg] # look up static domain + for arg in args if a in self.module.domains ] + combination = self.module.combinations.get(a, 'all') # default is 'all' + if combination == 'cases': # as many args as items in smallest domain + argslists = list(zip(*domains)) + elif combination == 'all': # Cartesian product + argslists = itertools.product(*domains) + # might be nice to support 'pairwise' also + # return tuple not list, hashable so it can be key in dictionary + # also handle special case (None,) indicates empty argslist in domains + return tuple([() if x == (None,) else x for x in argslists ]) + + def ParamGen(self): + #print 'ModelProgram ParamGen for', self.module.__name__ #DEBUG + #print ' actions ', self.actions + #print ' domains ', self.module.domains + self.argslists = dict([(a, self.make_argslist(a)) + for a in self.actions + if not a in self.module.observables ]) + + def TrueDefault(self): + return True + + def Properties(self): + return { 'accepting': self.module.Accepting(), + 'statefilter': self.module.StateFilter(), + 'stateinvariant': self.module.StateInvariant() } + + def Reset(self): + try: + self.module.Reset() + except AttributeError: # Reset is optional, but there is no default + print('No Reset function for model program %s' % self.module.__name__) + sys.exit() + + def ActionEnabled(self, a, args): + """ + action a with args is enabled in the current state + """ + if a not in self.module.enablers: + return True + else: + # Assumes enablers[a] has only one item, always true in this version + a_enabled = self.module.enablers[a][0] + nparams = len(inspect.getargspec(a_enabled)[0]) + nargs = len(args) + # nparams == 0 means match any args + if nparams > 0 and nparams != nargs: + print('Error: %s needs %s arguments, got %s. Check parameter generator.' %\ + (a_enabled.__name__, nparams, nargs)) + sys.exit(1) # Don't return, just exit with error status + else: + if nparams > 0: + return a_enabled(*args) + else: + return a_enabled() # nparams == 0 means match any args + + def Transitions(self, actions, argslists): + """ + Return tuple for all enabled transitions: + (action, args, result, next state, properties) + Pass appropriate params for observable or controllable actions + argslists + """ + enabled = list() + for a in actions: + enabled += [(a, args) + self.GetNext(a,args) # (a,args,result,next,prop's) + for args in argslists[a] if self.ActionEnabled(a, args) ] + return [(a,args,result,next,properties) + for (a,args,result,next,properties) in enabled + if properties['statefilter']] + + def EnabledTransitions(self, argslists, cleanup=False): + """ + Return tuple for all enabled observable and controllable transitions: + (action, args, result, next state, properties) + """ + actions = self.cleanup if cleanup else self.actions + controllableActions = set(actions) - set(self.module.observables) + observableActions = set(argslists.keys()) & set(self.module.observables) + if cleanup: + observableActions = set(observableActions) & set(self.cleanup) + enabled = list() + # Controllable actions use self.argslists assigned by ParamGen + enabled += self.Transitions(controllableActions, self.argslists) + # Observable actions use argslists parameter here + enabled += self.Transitions(observableActions, argslists) + return enabled + + def DoAction(self, a, args): + """ + Execute action in model, update state, + """ + return a(*args) + + def Current(self): + """ + Return current state, a dictionary of variable names to values + This is used to save/restore states, so make deep copies of values + """ + return dict([(vname, copy.deepcopy(getattr(self.module, vname))) + for vname in self.module.state ]) + + def Restore(self, state): + """ + Restore state + """ + self.module.__dict__.update(state) + + def GetNext(self, a, args): + """ + Return result and next state, given action and args. + Nondestructive, restores state. + also return dict. of properties of next state as third element in tuple + """ + saved = self.Current() + result = self.DoAction(a, args) + next = self.Current() + properties = self.Properties() # accepting state, etc. + self.Restore(saved) + return (result, next, properties) diff --git a/pymodel/ProductModelProgram.py b/pymodel/ProductModelProgram.py index a374a20..96f6bf1 100644 --- a/pymodel/ProductModelProgram.py +++ b/pymodel/ProductModelProgram.py @@ -1,279 +1,280 @@ -""" -ProductModelProgram - -Uniform interface to every kind of model: ModelProgram, FSM, TestSuite. - -This module is used by both the analyzer and the tester. - -This module uses *composition* to construct and use the *product* of -all the models in the session. - -This module performs composition, so it must identify actions by name -strings (which are the same in all the composed models) not function -objects (which are specific to each model's module). - -Users of this module identify actions by aname strings. -Modules used by this module invoke action function objects. -This module translates action function a to string aname: aname = a.__name__ -and translates aname string to action function a: a = getattr(module, aname) -""" - -from operator import concat -from collections import defaultdict - -from FSM import FSM -from TestSuite import TestSuite -from ModelProgram import ModelProgram - -class ProductModelProgram(object): - - def __init__(self, options, args): - self.TestSuite = False # used by pmt nruns logic - self.module = dict() # dict of modules keyed by name - self.mp = dict() # dict of model programs keyed by same module name - - # Populate self.module and self.mp from modules named in command line args - # Models that users write are just modules, not classes (types) - # so we find out what type to wrap each one in - # by checking for one of each type's required attributes using hasattr - for mname in args: # args is list of module name - self.module[mname] = __import__(mname) - if hasattr(self.module[mname], 'graph'): - self.mp[mname] = FSM(self.module[mname],options.exclude,options.action) - # for backwards compatibility we accept all of these test_suite variants - elif (hasattr(self.module[mname], 'testSuite') or - hasattr(self.module[mname], 'testsuite') or - hasattr(self.module[mname], 'test_suite')): - self.mp[mname] = TestSuite(self.module[mname], - options.exclude, options.action) - self.TestSuite = True # used by pmt nruns logic - elif self.module[mname].__doc__.strip().upper().startswith('PYMODEL CONFIG'): - pass # configuration module, merely importing it did all the work - else: - # got this far, should be a ModelProgram -- if not, crash - self.mp[mname] = ModelProgram(self.module[mname], - options.exclude, options.action) - - # Now that all modules have been imported and executed their __init__ - # do a postprocessing pass over all model objects - # to process metadata that might be affected by configuration modules - for mp in self.mp.values(): - mp.post_init() - - # set of all anames in all model programs - the vocabulary of the product - self.anames = set().union(*[set([a.__name__ for a in mp.actions ]) - for mp in self.mp.values()]) - # print 'anames %s' % self.anames # DEBUG - - # set of anames of all observable actions in all model programs - # observables obtain arg values from the environment, not parameter gen. - self.observables = set().union(*[set([a.__name__ - for a in mp.module.observables]) - for mp in self.mp.values()]) - # FSM and TestSuite must have .observables - # print 'observables %s' % self.observables # DEBUG - - # dict from aname to set of all m where aname is in vocabulary - self.vocabularies = \ - dict([(aname, set([m for m in self.mp if aname in - [a.__name__ for a in self.mp[m].actions]])) - for aname in self.anames]) - # print 'vocabularies %s' % self.vocabularies # DEBUG - - # ProductModelProgram only provides methods etc. called by test runner etc.: - # EnabledActions(cleanup), Properties(), DoAction(a,args), Reset(), TestSuite - # BUT not methods used internally in mp classes: Churrent, Restore, GetNext - - def ActionEnabled(self, aname, args): - """ - True if action aname with args is enabled in the current state - """ - return all([m.ActionEnabled(getattr(m.module, aname), args) - # NOT! empty argument list in model matches any arguments - # NOT! or m.ActionEnabled(getattr(m.module, aname), ()) - # handle zero-args/match-all inside m.ActionEnabled - for m in self.mp.values() - # aname might be an unshared action, not present in all mp - if aname in [ a.__name__ for a in m.actions ]]) - - def EnabledTransitions(self, cleanup): - """ - This is where composition happens! - (aname,args,result) is enabled in the product if it is enabled, - OR (aname, (), None) with empty args and None result is enabled - in every machine where aname is in the vocabulary. - Returns list: [(aname, args, result, next, properties), ... ] - third item result is the same value from all mp, or None - fourth item next is dict of mp name to that mp's next state: - (m1:next1,m2:current2,m3:next3,...),...] - or to its current state if aname is not in that mp vocabulary - fifth item properties is dict of property name to value in next state: - { 'accepting': True, 'statefilter': True, ... } - where there is just one value for each property for the whole product - """ - # Call ParamGen here to allow for state-dependent parameter generation - for mp in self.mp.values(): - if isinstance(mp, ModelProgram): - mp.ParamGen() - - # Built up dicts from mp name to list of all its enabled - # (action, args, result, next state, properties) - - # dict for FSM and TestSuite only, they might provide args for observables - enabledScenarioActions = \ - dict([(m, self.mp[m].EnabledTransitions(cleanup)) - for m in self.mp if (not isinstance(self.mp[m], ModelProgram))]) - # print 'enabledScenarioActions %s' % enabledScenarioActions # DEBUG - - # dict from action to sequence of argslists - argslists = defaultdict(list) - for transitions in enabledScenarioActions.values(): - for (action, args, result, next_state, properties) in transitions: - argslists[action].append(args) # append not extend - - # If more than one scenario in product, there may be duplicates - use sets - scenarioArgslists = dict([(action, set(args)) - for (action,args) in argslists.items()]) - # print 'scenarioArgslists %s' % scenarioArgslists - - # Pass scenarioArgslists to ModelProgram EnabledTransitions - # so any observable actions can use these argslists - enabledModelActions = \ - dict([(m, self.mp[m].EnabledTransitions(scenarioArgslists, cleanup)) - for m in self.mp if isinstance(self.mp[m], ModelProgram)]) - # print 'enabledModelActions %s' % enabledModelActions # DEBUG - - # Combine enabled actions dictionaries (they have distinct keys) - enabledActions = dict() - enabledActions.update(enabledScenarioActions) # FSM and TestSuite - enabledActions.update(enabledModelActions) # ModelProgam - # print 'enabledActions %s' % enabledActions # DEBUG - - # set (with no duplicates) of all (aname, args, result) in enabledActions - transitions = set([(a.__name__, args, result) - for (a,args,result,next,properties) in - reduce(concat,enabledActions.values())]) - # print 'transitions %s' % transitions - - # dict from (aname, args, result) - # to set of all m where (aname, args, result) is enabled - # this dict can be compared to self.vocabularies - invocations = \ - dict([((aname, args, result), - set([ m for m in self.mp - if (aname,args,result) in - [(a.__name__, argsx, resultx) # argsx,resultx is inner loop - for (a,argsx,resultx,next,properties) in enabledActions[m]]])) - for (aname, args, result) in transitions ]) - # print 'invocations %s' % invocations # DEBUG - - # list of all (aname, args, result) that are enabled in the product - # (aname,args,result) enabled in product if (aname,args,result) is enabled - # or (aname,()) and None result is enabled in all m where aname is in vocab - # (would be nice to generalize to all prefixes of args, not just ()) - enabledAnameArgs = \ - [(aname, args, result) - for (aname, args, result) in transitions - # set union, maybe use ... .union(* ... ) for all prefixes - if invocations[aname,args,result] | invocations.get((aname,(),None), - set()) - == self.vocabularies[aname]] - - # Now we have all enabled (action,args,result), now rearrange the data - - # for each enabled (aname,args), associate next states and properties by mp - # all enabled [(aname,args,result,{m1:(next1,properties1),m2:...}), ...] - enabledTs = \ - [(aname, args, result, - dict([(m, [(next,properties) - for (a,argsx,resultx,next,properties) in enabledActions[m] - # ...[0] to extract item from singleton list - if a.__name__ == aname - and (argsx == args or argsx == ())][0] - if m in # aname,args or aname,() is enabled in m - invocations[aname,args,result] | invocations.get((aname,(), - None),set()) - # a,args not enabled in m, m remains in same state - # (the following pair is the value for key m) - else (self.mp[m].Current(), self.mp[m].Properties())) - for m in self.mp ])) - for (aname, args, result) in enabledAnameArgs ] - - # print 'enabledTs %s' % enabledTs # DEBUG - - # combine result and properties from all the mp - # list, all enabled [(aname,args,result,{m1:next1,m2:next2},properties),...] - mpEnabledTransitions = [(aname, args, result, - # dict of next states: {m:next1,m2:next2, ... } - dict([ (m,mdict[m][0]) for m in mdict ]), - # combined properties - self.NextProperties(dict([ (m,mdict[m][1]) - for m in mdict ]))) - for (aname,args,result,mdict) in enabledTs ] - return mpEnabledTransitions - - def Accepting(self): - return self.Properties()['accepting'] - - def StateInvariant(self): - return self.Properties()['stateinvariant'] - - # lots of nearly-repeated code in next two methods, can we streamline ... ? - - def Properties(self): - """ - Combine properties of mps in the current state - """ - return { 'accepting': - # all mp in the current state are in their accepting states - all([ m.Properties()['accepting'] for m in self.mp.values() ]), - 'statefilter': - all([ m.Properties()['statefilter'] for m in self.mp.values() ]), - 'stateinvariant': - all([ m.Properties()['stateinvariant'] for m in self.mp.values() ]) - } - - def NextProperties(self, next_properties): - """ - Combine properties of mps in the next state - """ - return { 'accepting': - # all mp in the next state are in their accepting states - all([ next_properties[m]['accepting'] for m in next_properties]), - 'statefilter': - all([ next_properties[m]['statefilter'] for m in next_properties]), - 'stateinvariant': - all([ next_properties[m]['stateinvariant'] for m in next_properties]) - } - - def DoAction(self, aname, args): - """ - Execute action with aname in all the mp where it is enabled, - return result from last mp arg - """ - result = None - for m in self.mp.values(): - # aname might be an unshared action, not present in all mp - if aname in [ a.__name__ for a in m.actions ]: - result = m.DoAction(getattr(m.module, aname), args) - return result # results from all mp should be the same, return any one - - def Reset(self): - """ - Reset all the mp - """ - for m in self.mp.values(): - m.Reset() - - def Current(self): - """ - Return dictionary of current states - """ - return dict([(m, self.mp[m].Current()) for m in self.mp ]) - - def Restore(self, state): - """ - Restore states from dictionary - """ - for m in self.mp: - self.mp[m].Restore(state[m]) +""" +ProductModelProgram + +Uniform interface to every kind of model: ModelProgram, FSM, TestSuite. + +This module is used by both the analyzer and the tester. + +This module uses *composition* to construct and use the *product* of +all the models in the session. + +This module performs composition, so it must identify actions by name +strings (which are the same in all the composed models) not function +objects (which are specific to each model's module). + +Users of this module identify actions by aname strings. +Modules used by this module invoke action function objects. +This module translates action function a to string aname: aname = a.__name__ +and translates aname string to action function a: a = getattr(module, aname) +""" + +from operator import concat +from collections import defaultdict + +from .FSM import FSM +from .TestSuite import TestSuite +from .ModelProgram import ModelProgram +from functools import reduce + +class ProductModelProgram(object): + + def __init__(self, options, args): + self.TestSuite = False # used by pmt nruns logic + self.module = dict() # dict of modules keyed by name + self.mp = dict() # dict of model programs keyed by same module name + + # Populate self.module and self.mp from modules named in command line args + # Models that users write are just modules, not classes (types) + # so we find out what type to wrap each one in + # by checking for one of each type's required attributes using hasattr + for mname in args: # args is list of module name + self.module[mname] = __import__(mname) + if hasattr(self.module[mname], 'graph'): + self.mp[mname] = FSM(self.module[mname],options.exclude,options.action) + # for backwards compatibility we accept all of these test_suite variants + elif (hasattr(self.module[mname], 'testSuite') or + hasattr(self.module[mname], 'testsuite') or + hasattr(self.module[mname], 'test_suite')): + self.mp[mname] = TestSuite(self.module[mname], + options.exclude, options.action) + self.TestSuite = True # used by pmt nruns logic + elif self.module[mname].__doc__.strip().upper().startswith('PYMODEL CONFIG'): + pass # configuration module, merely importing it did all the work + else: + # got this far, should be a ModelProgram -- if not, crash + self.mp[mname] = ModelProgram(self.module[mname], + options.exclude, options.action) + + # Now that all modules have been imported and executed their __init__ + # do a postprocessing pass over all model objects + # to process metadata that might be affected by configuration modules + for mp in list(self.mp.values()): + mp.post_init() + + # set of all anames in all model programs - the vocabulary of the product + self.anames = set().union(*[set([a.__name__ for a in mp.actions ]) + for mp in list(self.mp.values())]) + # print 'anames %s' % self.anames # DEBUG + + # set of anames of all observable actions in all model programs + # observables obtain arg values from the environment, not parameter gen. + self.observables = set().union(*[set([a.__name__ + for a in mp.module.observables]) + for mp in list(self.mp.values())]) + # FSM and TestSuite must have .observables + # print 'observables %s' % self.observables # DEBUG + + # dict from aname to set of all m where aname is in vocabulary + self.vocabularies = \ + dict([(aname, set([m for m in self.mp if aname in + [a.__name__ for a in self.mp[m].actions]])) + for aname in self.anames]) + # print 'vocabularies %s' % self.vocabularies # DEBUG + + # ProductModelProgram only provides methods etc. called by test runner etc.: + # EnabledActions(cleanup), Properties(), DoAction(a,args), Reset(), TestSuite + # BUT not methods used internally in mp classes: Churrent, Restore, GetNext + + def ActionEnabled(self, aname, args): + """ + True if action aname with args is enabled in the current state + """ + return all([m.ActionEnabled(getattr(m.module, aname), args) + # NOT! empty argument list in model matches any arguments + # NOT! or m.ActionEnabled(getattr(m.module, aname), ()) + # handle zero-args/match-all inside m.ActionEnabled + for m in list(self.mp.values()) + # aname might be an unshared action, not present in all mp + if aname in [ a.__name__ for a in m.actions ]]) + + def EnabledTransitions(self, cleanup): + """ + This is where composition happens! + (aname,args,result) is enabled in the product if it is enabled, + OR (aname, (), None) with empty args and None result is enabled + in every machine where aname is in the vocabulary. + Returns list: [(aname, args, result, next, properties), ... ] + third item result is the same value from all mp, or None + fourth item next is dict of mp name to that mp's next state: + (m1:next1,m2:current2,m3:next3,...),...] + or to its current state if aname is not in that mp vocabulary + fifth item properties is dict of property name to value in next state: + { 'accepting': True, 'statefilter': True, ... } + where there is just one value for each property for the whole product + """ + # Call ParamGen here to allow for state-dependent parameter generation + for mp in list(self.mp.values()): + if isinstance(mp, ModelProgram): + mp.ParamGen() + + # Built up dicts from mp name to list of all its enabled + # (action, args, result, next state, properties) + + # dict for FSM and TestSuite only, they might provide args for observables + enabledScenarioActions = \ + dict([(m, self.mp[m].EnabledTransitions(cleanup)) + for m in self.mp if (not isinstance(self.mp[m], ModelProgram))]) + # print 'enabledScenarioActions %s' % enabledScenarioActions # DEBUG + + # dict from action to sequence of argslists + argslists = defaultdict(list) + for transitions in list(enabledScenarioActions.values()): + for (action, args, result, next_state, properties) in transitions: + argslists[action].append(args) # append not extend + + # If more than one scenario in product, there may be duplicates - use sets + scenarioArgslists = dict([(action, set(args)) + for (action,args) in list(argslists.items())]) + # print 'scenarioArgslists %s' % scenarioArgslists + + # Pass scenarioArgslists to ModelProgram EnabledTransitions + # so any observable actions can use these argslists + enabledModelActions = \ + dict([(m, self.mp[m].EnabledTransitions(scenarioArgslists, cleanup)) + for m in self.mp if isinstance(self.mp[m], ModelProgram)]) + # print 'enabledModelActions %s' % enabledModelActions # DEBUG + + # Combine enabled actions dictionaries (they have distinct keys) + enabledActions = dict() + enabledActions.update(enabledScenarioActions) # FSM and TestSuite + enabledActions.update(enabledModelActions) # ModelProgam + # print 'enabledActions %s' % enabledActions # DEBUG + + # set (with no duplicates) of all (aname, args, result) in enabledActions + transitions = set([(a.__name__, args, result) + for (a,args,result,next,properties) in + reduce(concat,list(enabledActions.values()))]) + # print 'transitions %s' % transitions + + # dict from (aname, args, result) + # to set of all m where (aname, args, result) is enabled + # this dict can be compared to self.vocabularies + invocations = \ + dict([((aname, args, result), + set([ m for m in self.mp + if (aname,args,result) in + [(a.__name__, argsx, resultx) # argsx,resultx is inner loop + for (a,argsx,resultx,next,properties) in enabledActions[m]]])) + for (aname, args, result) in transitions ]) + # print 'invocations %s' % invocations # DEBUG + + # list of all (aname, args, result) that are enabled in the product + # (aname,args,result) enabled in product if (aname,args,result) is enabled + # or (aname,()) and None result is enabled in all m where aname is in vocab + # (would be nice to generalize to all prefixes of args, not just ()) + enabledAnameArgs = \ + [(aname, args, result) + for (aname, args, result) in transitions + # set union, maybe use ... .union(* ... ) for all prefixes + if invocations[aname,args,result] | invocations.get((aname,(),None), + set()) + == self.vocabularies[aname]] + + # Now we have all enabled (action,args,result), now rearrange the data + + # for each enabled (aname,args), associate next states and properties by mp + # all enabled [(aname,args,result,{m1:(next1,properties1),m2:...}), ...] + enabledTs = \ + [(aname, args, result, + dict([(m, [(next,properties) + for (a,argsx,resultx,next,properties) in enabledActions[m] + # ...[0] to extract item from singleton list + if a.__name__ == aname + and (argsx == args or argsx == ())][0] + if m in # aname,args or aname,() is enabled in m + invocations[aname,args,result] | invocations.get((aname,(), + None),set()) + # a,args not enabled in m, m remains in same state + # (the following pair is the value for key m) + else (self.mp[m].Current(), self.mp[m].Properties())) + for m in self.mp ])) + for (aname, args, result) in enabledAnameArgs ] + + # print 'enabledTs %s' % enabledTs # DEBUG + + # combine result and properties from all the mp + # list, all enabled [(aname,args,result,{m1:next1,m2:next2},properties),...] + mpEnabledTransitions = [(aname, args, result, + # dict of next states: {m:next1,m2:next2, ... } + dict([ (m,mdict[m][0]) for m in mdict ]), + # combined properties + self.NextProperties(dict([ (m,mdict[m][1]) + for m in mdict ]))) + for (aname,args,result,mdict) in enabledTs ] + return mpEnabledTransitions + + def Accepting(self): + return self.Properties()['accepting'] + + def StateInvariant(self): + return self.Properties()['stateinvariant'] + + # lots of nearly-repeated code in next two methods, can we streamline ... ? + + def Properties(self): + """ + Combine properties of mps in the current state + """ + return { 'accepting': + # all mp in the current state are in their accepting states + all([ m.Properties()['accepting'] for m in list(self.mp.values()) ]), + 'statefilter': + all([ m.Properties()['statefilter'] for m in list(self.mp.values()) ]), + 'stateinvariant': + all([ m.Properties()['stateinvariant'] for m in list(self.mp.values()) ]) + } + + def NextProperties(self, next_properties): + """ + Combine properties of mps in the next state + """ + return { 'accepting': + # all mp in the next state are in their accepting states + all([ next_properties[m]['accepting'] for m in next_properties]), + 'statefilter': + all([ next_properties[m]['statefilter'] for m in next_properties]), + 'stateinvariant': + all([ next_properties[m]['stateinvariant'] for m in next_properties]) + } + + def DoAction(self, aname, args): + """ + Execute action with aname in all the mp where it is enabled, + return result from last mp arg + """ + result = None + for m in list(self.mp.values()): + # aname might be an unshared action, not present in all mp + if aname in [ a.__name__ for a in m.actions ]: + result = m.DoAction(getattr(m.module, aname), args) + return result # results from all mp should be the same, return any one + + def Reset(self): + """ + Reset all the mp + """ + for m in list(self.mp.values()): + m.Reset() + + def Current(self): + """ + Return dictionary of current states + """ + return dict([(m, self.mp[m].Current()) for m in self.mp ]) + + def Restore(self, state): + """ + Restore states from dictionary + """ + for m in self.mp: + self.mp[m].Restore(state[m]) diff --git a/pymodel/TestSuite.py b/pymodel/TestSuite.py index 68e9e49..f709b3b 100644 --- a/pymodel/TestSuite.py +++ b/pymodel/TestSuite.py @@ -1,117 +1,118 @@ -""" -Interface to a test suite module (one or more runs) used by ProductModelProgram -""" - -from operator import concat -from model import Model - -class TestSuite(Model): - - def __init__(self, module, exclude, include): - Model.__init__(self, module, exclude, include) - - def post_init(self): - """ - Now that all modules have been imported and executed their __init__ - do a postprocessing pass - to process metadata that might be affected by configuration modules - """ - # Do all of this work here rather than in __init__ - # so it can include the effects of any pymodel config modules - - # recognize PEP-8 style names (all lowercase) if present - if hasattr(self.module, 'testsuite'): - self.module.testSuite = self.module.testsuite - if hasattr(self.module, 'test_suite'): - self.module.testSuite = self.module.test_suite - - if hasattr(self.module, 'actions'): - self.actions = list(self.module.actions) # copy, actions from cmd line - else: - self.actions = list(self.actions_in_suite()) # default, copy - Model.post_init(self) # uses self.actions - # Revise the test suite to account for excluded, included actions - self.test_suite = list() - for run in self.module.testSuite: - new_run = list() # list not tuple, must mutable - for action in run: - if action[0] in self.actions: - new_run.append(action) - else: - break # truncate the run before the excluded action - self.test_suite.append(new_run) - # prepare for first run - self.irun = 0 # index of current test run in test suite - self.pc = 0 # program counter - - - def actions_in_suite(self): - # there might be two or three items in action_tuple - return tuple(set(reduce(concat,[[action_tuple[0] for action_tuple in run] - for run in self.module.testSuite]))) - - def Accepting(self): - # In a test suite, the only accepting states are at ends of runs - # NB Here Accepting() is called *after* DoAction() that advances self.pc - length = len(self.test_suite[self.irun]) # number of tuples in run - return (self.pc == length) - - def make_properties(self, accepting): - return { 'accepting': accepting, 'statefilter': True, - 'stateinvariant': True } - - def Properties(self): - return self.make_properties(self.Accepting()) - - def Reset(self): # needed by stepper - self.pc = 0 - if self.irun < len(self.test_suite) - 1: - self.irun += 1 - else: - raise StopIteration # no more runs in test suite - - def ActionEnabled(self, a, args): - """ - action a with args is enabled in the current state - """ - step = self.test_suite[self.irun][self.pc] - action, arguments = step[0:2] # works whether or not step has result - return (a == action and args == arguments) - - def EnabledTransitions(self, cleanup=False): - """ - Return list of all tuples for enabled actions. Here, there is just one. - (action, args, next state, next state is accepting state) - Next state is a list of two elements:the run number and step within the run - In a test suite, there is always just *one* next action, or *none* - Ignore cleanup, test suite should always end in accepting state. - """ - run = self.test_suite[self.irun] - length = len(run) - if self.pc < length: - step = run[self.pc] - action, args = step[0:2] - result = step[2] if len(step) > 2 else None # result is optional - next = self.pc + 1 - accepting = (next == length) - return([(action, args, result, (self.irun,next), - self.make_properties(accepting))]) - else: - return list() # test run finished, nothing enabled, - - def DoAction(self, a, args): - step = self.test_suite[self.irun][self.pc] - result = step[2] if len(step) > 2 else None # result is optional - self.pc += 1 - return result - - def Current(self): - return (self.irun, self.pc) - - def Restore(self, state): - """ - Restore state - """ - self.irun, self.pc = state - - # GetNext not needed +""" +Interface to a test suite module (one or more runs) used by ProductModelProgram +""" + +from operator import concat +from .model import Model +from functools import reduce + +class TestSuite(Model): + + def __init__(self, module, exclude, include): + Model.__init__(self, module, exclude, include) + + def post_init(self): + """ + Now that all modules have been imported and executed their __init__ + do a postprocessing pass + to process metadata that might be affected by configuration modules + """ + # Do all of this work here rather than in __init__ + # so it can include the effects of any pymodel config modules + + # recognize PEP-8 style names (all lowercase) if present + if hasattr(self.module, 'testsuite'): + self.module.testSuite = self.module.testsuite + if hasattr(self.module, 'test_suite'): + self.module.testSuite = self.module.test_suite + + if hasattr(self.module, 'actions'): + self.actions = list(self.module.actions) # copy, actions from cmd line + else: + self.actions = list(self.actions_in_suite()) # default, copy + Model.post_init(self) # uses self.actions + # Revise the test suite to account for excluded, included actions + self.test_suite = list() + for run in self.module.testSuite: + new_run = list() # list not tuple, must mutable + for action in run: + if action[0] in self.actions: + new_run.append(action) + else: + break # truncate the run before the excluded action + self.test_suite.append(new_run) + # prepare for first run + self.irun = 0 # index of current test run in test suite + self.pc = 0 # program counter + + + def actions_in_suite(self): + # there might be two or three items in action_tuple + return tuple(set(reduce(concat,[[action_tuple[0] for action_tuple in run] + for run in self.module.testSuite]))) + + def Accepting(self): + # In a test suite, the only accepting states are at ends of runs + # NB Here Accepting() is called *after* DoAction() that advances self.pc + length = len(self.test_suite[self.irun]) # number of tuples in run + return (self.pc == length) + + def make_properties(self, accepting): + return { 'accepting': accepting, 'statefilter': True, + 'stateinvariant': True } + + def Properties(self): + return self.make_properties(self.Accepting()) + + def Reset(self): # needed by stepper + self.pc = 0 + if self.irun < len(self.test_suite) - 1: + self.irun += 1 + else: + raise StopIteration # no more runs in test suite + + def ActionEnabled(self, a, args): + """ + action a with args is enabled in the current state + """ + step = self.test_suite[self.irun][self.pc] + action, arguments = step[0:2] # works whether or not step has result + return (a == action and args == arguments) + + def EnabledTransitions(self, cleanup=False): + """ + Return list of all tuples for enabled actions. Here, there is just one. + (action, args, next state, next state is accepting state) + Next state is a list of two elements:the run number and step within the run + In a test suite, there is always just *one* next action, or *none* + Ignore cleanup, test suite should always end in accepting state. + """ + run = self.test_suite[self.irun] + length = len(run) + if self.pc < length: + step = run[self.pc] + action, args = step[0:2] + result = step[2] if len(step) > 2 else None # result is optional + next = self.pc + 1 + accepting = (next == length) + return([(action, args, result, (self.irun,next), + self.make_properties(accepting))]) + else: + return list() # test run finished, nothing enabled, + + def DoAction(self, a, args): + step = self.test_suite[self.irun][self.pc] + result = step[2] if len(step) > 2 else None # result is optional + self.pc += 1 + return result + + def Current(self): + return (self.irun, self.pc) + + def Restore(self, state): + """ + Restore state + """ + self.irun, self.pc = state + + # GetNext not needed diff --git a/pymodel/pma.py b/pymodel/pma.py index 38daf55..86015db 100755 --- a/pymodel/pma.py +++ b/pymodel/pma.py @@ -3,9 +3,9 @@ PyModel Analyzer - generate FSM from product model program """ -import Analyzer -import AnalyzerOptions -from ProductModelProgram import ProductModelProgram +from . import Analyzer +from . import AnalyzerOptions +from .ProductModelProgram import ProductModelProgram def main(): (options, args) = AnalyzerOptions.parse_args() @@ -15,8 +15,8 @@ def main(): else: mp = ProductModelProgram(options, args) Analyzer.explore(mp, options.maxTransitions) - print '%s states, %s transitions, %s accepting states, %s unsafe states' % \ - (len(Analyzer.states),len(Analyzer.graph),len(Analyzer.accepting),len(Analyzer.unsafe)) + print('%s states, %s transitions, %s accepting states, %s unsafe states' % \ + (len(Analyzer.states),len(Analyzer.graph),len(Analyzer.accepting),len(Analyzer.unsafe))) mname = options.output if options.output else '%sFSM' % args[0] Analyzer.save(mname) diff --git a/pymodel/pmg.py b/pymodel/pmg.py index 129b5ec..40dcbd1 100755 --- a/pymodel/pmg.py +++ b/pymodel/pmg.py @@ -3,8 +3,8 @@ PyModel Graphics - generate graphics from pymodel FSM """ -import GraphicsOptions -from Dot import dotfile +from . import GraphicsOptions +from .Dot import dotfile def main(): (options, args) = GraphicsOptions.parse_args() diff --git a/pymodel/pmt.py b/pymodel/pmt.py index d73aa9f..7bbda27 100755 --- a/pymodel/pmt.py +++ b/pymodel/pmt.py @@ -10,10 +10,10 @@ import random import signal import traceback -import TesterOptions -import observation_queue as observation +from . import TesterOptions +from . import observation_queue as observation -from ProductModelProgram import ProductModelProgram +from .ProductModelProgram import ProductModelProgram class TimeoutException(Exception): pass @@ -103,9 +103,9 @@ def RunTest(options, mp, stepper, strategy, f, krun): modelResult = mp.DoAction(aname, args) # Execute in model, get result qResult = quote(modelResult) if modelResult != None: - print aname if options.quiet else '%s%s / %s' % (aname, args, qResult) + print(aname if options.quiet else '%s%s / %s' % (aname, args, qResult)) else: - print aname if options.quiet else '%s%s' % (aname, args) + print(aname if options.quiet else '%s%s' % (aname, args)) if options.output: if qResult != None: f.write(' (%s, %s, %s),\n' % (aname, args, qResult)) @@ -160,11 +160,11 @@ def RunTest(options, mp, stepper, strategy, f, krun): if stepper and not mp.Accepting() and not failMessage: failMessage = infoMessage # test run ends in non-accepting state: fail if failMessage: - print '%3d. Failure at step %s, %s' % (krun, isteps, failMessage) + print('%3d. Failure at step %s, %s' % (krun, isteps, failMessage)) else: - print '%3d. %s at step %s%s' % (krun, 'Success' if stepper else 'Finished', + print('%3d. %s at step %s%s' % (krun, 'Success' if stepper else 'Finished', isteps, - (', %s' % infoMessage) if infoMessage else '') + (', %s' % infoMessage) if infoMessage else '')) if options.output: f.write(' ],\n') @@ -231,7 +231,7 @@ def main(): RunTest(options, mp, stepper, strategy, f, k) k += 1 if k > 1: - print 'Test finished, completed %s runs' % k + print('Test finished, completed %s runs' % k) if options.output: f.write(']') diff --git a/pymodel/pmv.py b/pymodel/pmv.py index 8794f8e..c9c92dd 100755 --- a/pymodel/pmv.py +++ b/pymodel/pmv.py @@ -5,7 +5,7 @@ """ import os -import ViewerOptions +from . import ViewerOptions # an option in a singleton tuple means there might be a list of such options # use tuples not lists here so they can be keys in dict @@ -27,10 +27,10 @@ def make_opts(keys, options): else k[0]]]) def command(cmd): - print cmd # DEBUG + print(cmd) # DEBUG status = os.system(cmd) if status: - print 'Failed: %s' % cmd # status 0 means success + print('Failed: %s' % cmd) # status 0 means success def main(): (options, args) = ViewerOptions.parse_args() diff --git a/pymodel/trun.py b/pymodel/trun.py index 3ec5b70..5bba96d 100755 --- a/pymodel/trun.py +++ b/pymodel/trun.py @@ -15,6 +15,6 @@ # ... ] for (description, cmd) in test.cases: - print description + print(description) os.system(cmd) - print + print() diff --git a/pymodel/wsgirunner.py b/pymodel/wsgirunner.py index dbea1e6..af431e4 100755 --- a/pymodel/wsgirunner.py +++ b/pymodel/wsgirunner.py @@ -41,8 +41,8 @@ def main(): app_module = args[0] app = __import__(app_module) application = app.application - print "Running %s at http://localhost:%s/" \ - % (app_module, options.port) + print("Running %s at http://localhost:%s/" \ + % (app_module, options.port)) httpd = simple_server.WSGIServer(('', options.port), simple_server.WSGIRequestHandler) httpd.set_app(application) diff --git a/samples/Socket/stepper.py b/samples/Socket/stepper.py index ad7ec85..40dcfa5 100644 --- a/samples/Socket/stepper.py +++ b/samples/Socket/stepper.py @@ -70,7 +70,7 @@ def test_action(aname, args, modelResult): return None # pmt will call wait(), below else: - raise NotImplementedError, 'action not supported by stepper: %s' % aname + raise NotImplementedError('action not supported by stepper: %s' % aname) def wait(timeout): diff --git a/samples/Socket/stepper_a.py b/samples/Socket/stepper_a.py index ba9705c..8070807 100644 --- a/samples/Socket/stepper_a.py +++ b/samples/Socket/stepper_a.py @@ -70,7 +70,7 @@ def wait_for_return(): return None # pmt will check observation_queue else: - raise NotImplementedError, 'action not supported by stepper: %s' % aname + raise NotImplementedError('action not supported by stepper: %s' % aname) def wait(timeout): diff --git a/samples/Socket/stepper_d.py b/samples/Socket/stepper_d.py index c04750b..4269b6a 100644 --- a/samples/Socket/stepper_d.py +++ b/samples/Socket/stepper_d.py @@ -76,4 +76,4 @@ def test_action(aname, args, model_result): return 'recv returned %s (%s), expected %s (%s)' % (sdata, nd, smodel, nm) else: - raise NotImplementedError, 'action not supported by stepper: %s' % aname + raise NotImplementedError('action not supported by stepper: %s' % aname) diff --git a/samples/Socket/stepper_o.py b/samples/Socket/stepper_o.py index e2ef324..acae755 100644 --- a/samples/Socket/stepper_o.py +++ b/samples/Socket/stepper_o.py @@ -57,4 +57,4 @@ def test_action(aname, args, modelResult): return None # pmt will check observation_queue else: - raise NotImplementedError, 'action not supported by stepper: %s' % aname + raise NotImplementedError('action not supported by stepper: %s' % aname) diff --git a/samples/Socket/stepper_util.py b/samples/Socket/stepper_util.py index 73f7e82..b046f4e 100644 --- a/samples/Socket/stepper_util.py +++ b/samples/Socket/stepper_util.py @@ -26,7 +26,7 @@ def listen(): # Listen, prepare for senders to connect listener.bind(('localhost', port)) listener.listen(1) - print '\nServer listens on localhost port %s with RCVBUF size %s' % (port, rcvbuf) + print('\nServer listens on localhost port %s with RCVBUF size %s' % (port, rcvbuf)) # Define function for sender connect - also used in stepper reset() @@ -35,11 +35,11 @@ def connect(): sender = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # get and print send buffer size, just FYI sndbuf = sender.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) - print 'Sender creates socket with SNDBUF size %s' % (sndbuf) + print('Sender creates socket with SNDBUF size %s' % (sndbuf)) sender.connect(('localhost', port)) - print 'Sender connects to localhost port %s' % port + print('Sender connects to localhost port %s' % port) receiver, addr = listener.accept() - print 'Server accepts connection from ', addr + print('Server accepts connection from ', addr) # State needed to remember _call args for __return msg = '' n = 0 diff --git a/samples/Timeout/Stepper.py b/samples/Timeout/Stepper.py index db2eaf4..8a8e77e 100644 --- a/samples/Timeout/Stepper.py +++ b/samples/Timeout/Stepper.py @@ -9,7 +9,7 @@ def TestAction(aname, args, modelResult): (seconds,) = args time.sleep(seconds) else: - raise NotImplementedError, 'action not supported by stepper: %s' % aname + raise NotImplementedError('action not supported by stepper: %s' % aname) def Reset(): pass diff --git a/samples/WebApplication/Session.py b/samples/WebApplication/Session.py index dea5ccf..4e487d2 100644 --- a/samples/WebApplication/Session.py +++ b/samples/WebApplication/Session.py @@ -1,80 +1,80 @@ -""" -Experiment with code for WebApplication stepper -""" - -import re -import urllib -import urllib2 -import cookielib - -# Scrape page contents - -def loginFailed(page): - return (page.find('Incorrect login') > -1) - -intPattern = re.compile(r'Number: (\d+)') - -def intContents(page): - m = intPattern.search(page) - if m: - return int(m.group(1)) - else: - return None - -def main(): - - # Configure. Web application in this sample requires cookies, redirect - cookies = cookielib.CookieJar() - cookie_handler = urllib2.HTTPCookieProcessor(cookies) - redirect_handler= urllib2.HTTPRedirectHandler() - debug_handler = urllib2.HTTPHandler(debuglevel=1) # print headers on console - opener = urllib2.build_opener(cookie_handler,redirect_handler,debug_handler) - - # Constants - site = 'http://localhost/' - path = 'nmodel/webapplication/php/' - webAppPage = 'doStuff.php' # Shouldn't this be called webAppPage, ...Url -? - logoutPage = 'logout.php' - - webAppUrl = site + path + webAppPage - logoutUrl = site + path + logoutPage - - print 'GET to show login page' - print opener.open(webAppUrl).read() - - print 'POST to login with sample username and password, pass separate args for POST' - args = urllib.urlencode({'username':'user1', 'password':'123'}) - page = opener.open(webAppUrl, args).read() # should show successful login - print page - if loginFailed(page): - print 'Login FAILED' - - print 'GET with arg in URL to UpdateInt on server' - num = 99 - wrongNum = 'xx' - numArg = urllib.urlencode({'num':num}) - print opener.open("%s?%s" % (webAppUrl,numArg)).read() - - print 'GET to retrieve page with integer' - page = opener.open(webAppUrl).read() - print page - print '%s found in page, expected %s' % (intContents(page), num) - print - - print 'GET to logout' - print opener.open(logoutUrl).read() - - print 'GET to show login page -- again' - print opener.open(webAppUrl).read() - - print 'POST to login with username and WRONG password' - args = urllib.urlencode({'username':'user1', 'password':'321'}) # wrong pass - page = opener.open(webAppUrl, args).read() # should show login fail - print page - if loginFailed(page): - print 'Login FAILED' - - # No logout this time - we're not logged in - -if __name__ == '__main__': - main() +""" +Experiment with code for WebApplication stepper +""" + +import re +import urllib.request, urllib.parse, urllib.error +import urllib.request, urllib.error, urllib.parse +import http.cookiejar + +# Scrape page contents + +def loginFailed(page): + return (page.find('Incorrect login') > -1) + +intPattern = re.compile(r'Number: (\d+)') + +def intContents(page): + m = intPattern.search(page) + if m: + return int(m.group(1)) + else: + return None + +def main(): + + # Configure. Web application in this sample requires cookies, redirect + cookies = http.cookiejar.CookieJar() + cookie_handler = urllib.request.HTTPCookieProcessor(cookies) + redirect_handler= urllib.request.HTTPRedirectHandler() + debug_handler = urllib.request.HTTPHandler(debuglevel=1) # print headers on console + opener = urllib.request.build_opener(cookie_handler,redirect_handler,debug_handler) + + # Constants + site = 'http://localhost/' + path = 'nmodel/webapplication/php/' + webAppPage = 'doStuff.php' # Shouldn't this be called webAppPage, ...Url -? + logoutPage = 'logout.php' + + webAppUrl = site + path + webAppPage + logoutUrl = site + path + logoutPage + + print('GET to show login page') + print(opener.open(webAppUrl).read()) + + print('POST to login with sample username and password, pass separate args for POST') + args = urllib.parse.urlencode({'username':'user1', 'password':'123'}) + page = opener.open(webAppUrl, args).read() # should show successful login + print(page) + if loginFailed(page): + print('Login FAILED') + + print('GET with arg in URL to UpdateInt on server') + num = 99 + wrongNum = 'xx' + numArg = urllib.parse.urlencode({'num':num}) + print(opener.open("%s?%s" % (webAppUrl,numArg)).read()) + + print('GET to retrieve page with integer') + page = opener.open(webAppUrl).read() + print(page) + print('%s found in page, expected %s' % (intContents(page), num)) + print() + + print('GET to logout') + print(opener.open(logoutUrl).read()) + + print('GET to show login page -- again') + print(opener.open(webAppUrl).read()) + + print('POST to login with username and WRONG password') + args = urllib.parse.urlencode({'username':'user1', 'password':'321'}) # wrong pass + page = opener.open(webAppUrl, args).read() # should show login fail + print(page) + if loginFailed(page): + print('Login FAILED') + + # No logout this time - we're not logged in + +if __name__ == '__main__': + main() diff --git a/samples/WebApplication/Stepper.py b/samples/WebApplication/Stepper.py index ef4f8ee..2c53cba 100644 --- a/samples/WebApplication/Stepper.py +++ b/samples/WebApplication/Stepper.py @@ -1,142 +1,142 @@ -""" -WebApplication stepper (test harness) -""" - -import re -import urllib -import urllib2 -import cookielib - -# Default stepper configuration for -# webapp.py running on localhost at port 8080 - -site = 'http://localhost:8000/' -path = '' -webAppPage = 'webapp.py' -logoutPage = 'logout.py' - -webAppUrl = site + path + webAppPage -logoutUrl = site + path + logoutPage - -# user in model : user in implementation - -users = { "OleBrumm":"user1", "VinniPuhh":"user2" } -passwords = { "user1":"123", "user2":"234" } -wrongPassword = "000" - -debuglevel = 0 # for debug_handler 1: print HTTP headers, 0: don't print -show_page = False - -# Optionally rebind configuration. If no Config module found, retain defaults. - -try: - from Config import * -except ImportError: - pass - -print 'webAppUrl: %s' % webAppUrl # show which config file (if any) - -# handlers that are the same for all users - -redirect_handler= urllib2.HTTPRedirectHandler() -debug_handler = urllib2.HTTPHandler(debuglevel=debuglevel) - -# handlers that are different for each user are part of the session state - -class Session(object): - """ - One user's session state: cookies and handlers - """ - def __init__(self): - self.cookies = cookielib.CookieJar() - self.cookie_handler = urllib2.HTTPCookieProcessor(self.cookies) - self.opener = urllib2.build_opener(self.cookie_handler, - redirect_handler,debug_handler) - -session = dict() # each user's Session - -# helpers, determine test results by scraping the page - -# like in NModel WebApplication WebTestHelper -def loginFailed(page): - return (page.find('Incorrect login') > -1) - -# not in NModel WebApplication, it has no positive check for login success -def loginSuccess(page): - return (page.find('DoStuff') > -1) - -# similar to NModel WebApplication WebTestHelper -intPattern = re.compile(r'Number: (\d+)') - -def intContents(page): - m = intPattern.search(page) - if m: - return int(m.group(1)) - -# stepper core - -def TestAction(aname, args, modelResult): - """ - To indicate success, no return statement (or return None) - To indicate failure, return string that explains failure - Test runner also treats unhandled exceptions as failures - """ - - global session - - if aname == 'Initialize': - session = dict() # clear out cookies/session IDs from previous session - - elif aname == 'Login': - user = users[args[0]] - if user not in session: - session[user] = Session() - password = passwords[user] if args[1] == 'Correct' else wrongPassword - postArgs = urllib.urlencode({'username':user, 'password':password}) - # GET login page - page = session[user].opener.open(webAppUrl).read() - if show_page: - print page - # POST username, password - page = session[user].opener.open(webAppUrl, postArgs).read() - if show_page: - print page - # Check conformance, reproduce NModel WebApplication Stepper logic: - # check for login failed message, no check for positive indication of login - result = 'Success' - if loginFailed(page): - result = 'Failure' - if result != modelResult: - return 'received Login %s, expected %s' % (result, modelResult) - - elif aname == 'Logout': - user = users[args[0]] - page = session[user].opener.open(logoutUrl).read() - del session[user] # clear out cookie/session ID - if show_page: - print page - - elif aname == 'UpdateInt': - user = users[args[0]] - numArg = urllib.urlencode({'num':args[1]}) - page = session[user].opener.open("%s?%s" % (webAppUrl,numArg)).read() - if show_page: - print page - - elif aname == 'ReadInt': - user = users[args[0]] - page = session[user].opener.open(webAppUrl).read() - if show_page: - print page - numInPage = intContents(page) - if numInPage != modelResult: # check conformance - return 'found %s in page, expected %s' % (numInPage, modelResult) - else: - return None - else: - raise NotImplementedError, 'action %s not handled by stepper' % aname - - -def Reset(): - global session - session.clear() +""" +WebApplication stepper (test harness) +""" + +import re +import urllib.request, urllib.parse, urllib.error +import urllib.request, urllib.error, urllib.parse +import http.cookiejar + +# Default stepper configuration for +# webapp.py running on localhost at port 8080 + +site = 'http://localhost:8000/' +path = '' +webAppPage = 'webapp.py' +logoutPage = 'logout.py' + +webAppUrl = site + path + webAppPage +logoutUrl = site + path + logoutPage + +# user in model : user in implementation + +users = { "OleBrumm":"user1", "VinniPuhh":"user2" } +passwords = { "user1":"123", "user2":"234" } +wrongPassword = "000" + +debuglevel = 0 # for debug_handler 1: print HTTP headers, 0: don't print +show_page = False + +# Optionally rebind configuration. If no Config module found, retain defaults. + +try: + from Config import * +except ImportError: + pass + +print('webAppUrl: %s' % webAppUrl) # show which config file (if any) + +# handlers that are the same for all users + +redirect_handler= urllib.request.HTTPRedirectHandler() +debug_handler = urllib.request.HTTPHandler(debuglevel=debuglevel) + +# handlers that are different for each user are part of the session state + +class Session(object): + """ + One user's session state: cookies and handlers + """ + def __init__(self): + self.cookies = http.cookiejar.CookieJar() + self.cookie_handler = urllib.request.HTTPCookieProcessor(self.cookies) + self.opener = urllib.request.build_opener(self.cookie_handler, + redirect_handler,debug_handler) + +session = dict() # each user's Session + +# helpers, determine test results by scraping the page + +# like in NModel WebApplication WebTestHelper +def loginFailed(page): + return (page.find('Incorrect login') > -1) + +# not in NModel WebApplication, it has no positive check for login success +def loginSuccess(page): + return (page.find('DoStuff') > -1) + +# similar to NModel WebApplication WebTestHelper +intPattern = re.compile(r'Number: (\d+)') + +def intContents(page): + m = intPattern.search(page) + if m: + return int(m.group(1)) + +# stepper core + +def TestAction(aname, args, modelResult): + """ + To indicate success, no return statement (or return None) + To indicate failure, return string that explains failure + Test runner also treats unhandled exceptions as failures + """ + + global session + + if aname == 'Initialize': + session = dict() # clear out cookies/session IDs from previous session + + elif aname == 'Login': + user = users[args[0]] + if user not in session: + session[user] = Session() + password = passwords[user] if args[1] == 'Correct' else wrongPassword + postArgs = urllib.parse.urlencode({'username':user, 'password':password}) + # GET login page + page = session[user].opener.open(webAppUrl).read() + if show_page: + print(page) + # POST username, password + page = session[user].opener.open(webAppUrl, postArgs).read() + if show_page: + print(page) + # Check conformance, reproduce NModel WebApplication Stepper logic: + # check for login failed message, no check for positive indication of login + result = 'Success' + if loginFailed(page): + result = 'Failure' + if result != modelResult: + return 'received Login %s, expected %s' % (result, modelResult) + + elif aname == 'Logout': + user = users[args[0]] + page = session[user].opener.open(logoutUrl).read() + del session[user] # clear out cookie/session ID + if show_page: + print(page) + + elif aname == 'UpdateInt': + user = users[args[0]] + numArg = urllib.parse.urlencode({'num':args[1]}) + page = session[user].opener.open("%s?%s" % (webAppUrl,numArg)).read() + if show_page: + print(page) + + elif aname == 'ReadInt': + user = users[args[0]] + page = session[user].opener.open(webAppUrl).read() + if show_page: + print(page) + numInPage = intContents(page) + if numInPage != modelResult: # check conformance + return 'found %s in page, expected %s' % (numInPage, modelResult) + else: + return None + else: + raise NotImplementedError('action %s not handled by stepper' % aname) + + +def Reset(): + global session + session.clear() diff --git a/samples/WebApplication/WSGIVerboseConfig.py b/samples/WebApplication/WSGIVerboseConfig.py index 1f5df9b..e2b65f0 100644 --- a/samples/WebApplication/WSGIVerboseConfig.py +++ b/samples/WebApplication/WSGIVerboseConfig.py @@ -12,7 +12,7 @@ webAppUrl = site + path + webAppPage logoutUrl = site + path + logoutPage -print 'webAppUrl %s' % webAppUrl # debug +print('webAppUrl %s' % webAppUrl) # debug debuglevel = 1 # 1: print HTTP headers, 0: don't print # show_page = True diff --git a/samples/WebApplication/webapp.py b/samples/WebApplication/webapp.py index 1edd6ba..1b05683 100644 --- a/samples/WebApplication/webapp.py +++ b/samples/WebApplication/webapp.py @@ -1,179 +1,179 @@ -""" -Web application to test with PyModel WebApplication sample - -This is a WSGI-compliant application. Its behavior is similar to -Juhan Ernits' sample application in PHP at http://nmodel.codeplex.com/ - -To run this web application: - - wsgirunner webapp # runs on port 8000 on localhost - -or - - wsgirunner -p 8080 webapp # -p option sets port number on localhost - -""" - -import pprint -import urlparse - -# page templates appear at the end of this file - -# configuration -password = { 'user1':'123', 'user2':'234' } - -# data state -integers = dict() # user to int -strings = dict() # user to str -sessions = dict() # cookie to user, assume each user has at most one session -next_cookie = 0 - -def application(environ, start_response): - global next_cookie - - # print environ_template % pprint.pformat(environ) # DEBUG, voluminous! - - response_headers = [] # add headers below - cookie = environ.get('HTTP_COOKIE') # might be None - - # show login page - if (environ['PATH_INFO'] == '/webapp.py' - and environ['REQUEST_METHOD'] == 'GET' - and cookie not in sessions): # cookie might be None - response_body = login_page - response_headers += [ - ('Set-Cookie','PYSESSID=%s; path=/' % next_cookie)] - next_cookie += 1 - status = '200 OK' - - # log in, if successful show data form page - elif (environ['PATH_INFO'] == '/webapp.py' - and environ['REQUEST_METHOD'] == 'POST'): - wd = environ['wsgi.input'] - method = environ['REQUEST_METHOD'] - length = int(environ['CONTENT_LENGTH']) - request_body = wd.read(length) - vars = urlparse.parse_qs(request_body) - user = vars['username'][0] # vars[x] are lists, get first item - passwd = vars['password'][0] - if user in password and password[user] == passwd: - sessions[cookie] = user - if not user in strings: - strings[user] = '' - # CORRECT CODE comented out - # if not user in integers: - # integers[user] = 0 - # BUG follows, should be guarded by if ... like strings - integers[user] = 0 # BUG, always overwrites data from last session - # PHP version sends redirect back to doStuff instead of this response_body - #response_body = dostuff_template % (integers[user], - # strings[user]) - response_headers += [('Location','webapp.py')] - status = "302 Found" - response_body = '' - else: - response_body = login_failure_page - status = '200 OK' - - # submit data in form page - elif (environ['PATH_INFO'] == '/webapp.py' - and environ['REQUEST_METHOD'] == 'GET' - and cookie in sessions): - user = sessions[cookie] - vars = urlparse.parse_qs(environ['QUERY_STRING']) - if 'num' in vars: - integers[user] = str(vars['num'][0]) # vars[x] are lists, 1st item - if 'str' in vars: - strings[user] = vars['str'][0] - response_body = dostuff_template % (integers[user], - strings[user]) - status = '200 OK' - - # log out - elif environ['PATH_INFO'] == '/logout.py': - if cookie in sessions: - del sessions[cookie] - response_body = '' # blank page, like original NModel version - status = '200 OK' - pass - - # unknown page - elif environ['PATH_INFO'] not in ('/webapp.py', '/logout.py'): - response_body = p404_page - status = '404 Not Found' - - # nonsense: doStuff REQUEST_METHOD not GET or POST, or ... ? - else: - raise ValueError # send 500 Server Error - - # response - response_headers += [('Content-Type', 'text/html'), - ('Content-Length', str(len(response_body)))] - start_response(status, response_headers) - return [response_body] - - -environ_template = """environ is - -%s - -""" - - -p404_page = """ - -404 Not Found - - -404 Not found - - -""" - -login_page = """ - -LoginPage - - -
-Username: -Password: - -
- - -""" - -login_failure_page = """ - -Login Failure - - -Incorrect login name or password. Please try again. - - -""" - -# usage: dostuff_template % (integers[user], strings[user]) -dostuff_template = """ - - -DoStuff - - -Number: %s
-String: %s -
-Number: - -
-
-String: - -
- -Log out - - -""" - +""" +Web application to test with PyModel WebApplication sample + +This is a WSGI-compliant application. Its behavior is similar to +Juhan Ernits' sample application in PHP at http://nmodel.codeplex.com/ + +To run this web application: + + wsgirunner webapp # runs on port 8000 on localhost + +or + + wsgirunner -p 8080 webapp # -p option sets port number on localhost + +""" + +import pprint +import urllib.parse + +# page templates appear at the end of this file + +# configuration +password = { 'user1':'123', 'user2':'234' } + +# data state +integers = dict() # user to int +strings = dict() # user to str +sessions = dict() # cookie to user, assume each user has at most one session +next_cookie = 0 + +def application(environ, start_response): + global next_cookie + + # print environ_template % pprint.pformat(environ) # DEBUG, voluminous! + + response_headers = [] # add headers below + cookie = environ.get('HTTP_COOKIE') # might be None + + # show login page + if (environ['PATH_INFO'] == '/webapp.py' + and environ['REQUEST_METHOD'] == 'GET' + and cookie not in sessions): # cookie might be None + response_body = login_page + response_headers += [ + ('Set-Cookie','PYSESSID=%s; path=/' % next_cookie)] + next_cookie += 1 + status = '200 OK' + + # log in, if successful show data form page + elif (environ['PATH_INFO'] == '/webapp.py' + and environ['REQUEST_METHOD'] == 'POST'): + wd = environ['wsgi.input'] + method = environ['REQUEST_METHOD'] + length = int(environ['CONTENT_LENGTH']) + request_body = wd.read(length) + vars = urllib.parse.parse_qs(request_body) + user = vars['username'][0] # vars[x] are lists, get first item + passwd = vars['password'][0] + if user in password and password[user] == passwd: + sessions[cookie] = user + if not user in strings: + strings[user] = '' + # CORRECT CODE comented out + # if not user in integers: + # integers[user] = 0 + # BUG follows, should be guarded by if ... like strings + integers[user] = 0 # BUG, always overwrites data from last session + # PHP version sends redirect back to doStuff instead of this response_body + #response_body = dostuff_template % (integers[user], + # strings[user]) + response_headers += [('Location','webapp.py')] + status = "302 Found" + response_body = '' + else: + response_body = login_failure_page + status = '200 OK' + + # submit data in form page + elif (environ['PATH_INFO'] == '/webapp.py' + and environ['REQUEST_METHOD'] == 'GET' + and cookie in sessions): + user = sessions[cookie] + vars = urllib.parse.parse_qs(environ['QUERY_STRING']) + if 'num' in vars: + integers[user] = str(vars['num'][0]) # vars[x] are lists, 1st item + if 'str' in vars: + strings[user] = vars['str'][0] + response_body = dostuff_template % (integers[user], + strings[user]) + status = '200 OK' + + # log out + elif environ['PATH_INFO'] == '/logout.py': + if cookie in sessions: + del sessions[cookie] + response_body = '' # blank page, like original NModel version + status = '200 OK' + pass + + # unknown page + elif environ['PATH_INFO'] not in ('/webapp.py', '/logout.py'): + response_body = p404_page + status = '404 Not Found' + + # nonsense: doStuff REQUEST_METHOD not GET or POST, or ... ? + else: + raise ValueError # send 500 Server Error + + # response + response_headers += [('Content-Type', 'text/html'), + ('Content-Length', str(len(response_body)))] + start_response(status, response_headers) + return [response_body] + + +environ_template = """environ is + +%s + +""" + + +p404_page = """ + +404 Not Found + + +404 Not found + + +""" + +login_page = """ + +LoginPage + + +
+Username: +Password: + +
+ + +""" + +login_failure_page = """ + +Login Failure + + +Incorrect login name or password. Please try again. + + +""" + +# usage: dostuff_template % (integers[user], strings[user]) +dostuff_template = """ + + +DoStuff + + +Number: %s
+String: %s +
+Number: + +
+
+String: + +
+ +Log out + + +""" + diff --git a/samples/tracemultiplexer/tracemultiplexer.py b/samples/tracemultiplexer/tracemultiplexer.py index f85b74d..09332b7 100644 --- a/samples/tracemultiplexer/tracemultiplexer.py +++ b/samples/tracemultiplexer/tracemultiplexer.py @@ -1,205 +1,205 @@ -""" -Simulate a program where two threads write to the same log -file. Exhibit nondeterminism in scheduling threads. Try to synchronize -so that only one thread at a time can write to the log. Detect unsafe -states where both threads may write to the log. Identify log messages -that may have been corrupted by unsynchronized writes from both -threads. - -The simulated multi-threaded program has been instrumented to save -traces of its API calls in a log. Instead of simply calling the API -functions, each thread instead calls the *tracecapture* function, -which calls the API function but also saves the call (with arguments) -and return (with return value) in the trace log. The tracecapture -function uses a lock: - - tracelock = lock() - - def tracecapture(thread_id, call, args): # call is action name - tracelock.acquire() - log(thread_id, call, "start", args) # log the call with arguments - tracelock.release() # release here allows threads to interleave - ret = call(*args) - tracelock.acquire() - log(thread_id, call, "finish", ret) # log the return with return value - tracelock.release() - return ret - -The purpose of *tracelock* is to ensure that only one thread at a time -can write to the trace log. To allow threads to interleave, the first -*tracelock.release()* *precedes* the call, otherwise the call might -block and hold *tracelock*, prevent other threads from running while -that call blocks. - -This model program exhibits nondeterminism in thread scheduling, so -traces (paths through the graphs) differ in the order that API calls -and returns are made, which results in different orders for messages in -the log. - -Parameters (constants that do not change when the model executes): - -program: models the multi-threaded program. It is a sequence of API -call ids (action ids) for each thread, first index is thread id, -second index is thread pc. Here we model a simple program with just -two threads, where each thread makes just one API call, executes -tracecapture just once, and writes just two log messages. With a few -revisions, we could also handle a more complicated simulated program -(with more threads and more actions). - -unsynchronized: set False to use tracelock, True to ignore tracelock - - -State: - -pc: program counter for each thread, indexed by thread_id -Indicates which action in program each thread is executing - -phase: phase of each thread, indexed by thread id -models progress through the tracecapture function (see above) -For each API call in program, phases in order are: ready, start, call, finish -phase[thread] == start means thread holds tracelock to write call start -phase[thread] == finish means thread holds tracelock to write call finish -After processing last API call in its thread, phase[thread] == done - -log: contents of the tracelog written by all the threads - -Actions: - -Each action represents progress through the phases of tracecapture. -Each action might be enabled for more than one thread in some states; -this models the nondeterminism of threading. - -""" - -from copy import copy - -## Parameters - -# This simple program has only two threads, with only one API call in each - -program = (( 'listfiles', ), # thread 0 - ( 'openfile', )) # thread 1 - -threads = range(len(program)) # one element of program for each thread - -unsynchronized = False # False: use tracelock, True: ignore tracelock - -### State - -# tracecapture state - -pc = list() # program counter for each thread -phase = list() # phase of each thread in tracecapture -log = list() # contents of tracelog written by all threads - -# file system state - -files = list() # filenames in filesystem -listing = list() # listfiles return value, FIXME ret should be in tracecapture - -### Safety condition - -# phases where a thread can write to the log -writing = ('start','finish') - -def writing_threads(): - """ - list of threads that can write to the log - """ - return [ t for t in threads if phase[t] in writing ] - -def state_invariant(): - """ - At most one thread can write to the log - """ - return len(writing_threads()) <= 1 - - -### Other necessary functions - -# run is allowed to stop -def accepting(): - return all([ phase[t] == 'done' for t in threads ]) - -# reset before another run -def reset(): - global pc, phase, log - pc = [ 0 for thread in program ] - phase = [ 'ready' for thread in program ] - log = [] - files = [] - -### Initialize - -reset() - -### Actions - -def start_enabled(thread): - return (phase[thread] == 'ready' - and (not writing_threads() # lock is free - or unsynchronized)) # ignore lock - might corrupt file - -def start(thread): - phase[thread] = 'start' # acquire lock - # write log, if it might be corrupted write 'XXX' at the end - if state_invariant(): - log.append((thread, program[thread][pc[thread]], 'start')) - else: - log.append((thread, program[thread][pc[thread]], 'start', 'XXX')) - -def call_enabled(thread): - return phase[thread] == 'start' # holding lock - -def call(thread): - global listing # we reassign whole list, we don't just update it - phase[thread] = 'call' # release lock, execute call - action = program[thread][pc[thread]] - # for now. handle each action in *program* as a special case inline here - if action == 'openfile': - files.append('file0') # only works if openfiles just called once - if action == 'listfiles': - listing = copy(files) # must copy now because open may change files - -def finish_enabled(thread): - return (phase[thread] == 'call' - and (not writing_threads() # lock is free - or unsynchronized)) # ignore lock - might corrupt file - -def finish(thread): - phase[thread] = 'finish' # acquire lock - action = program[thread][pc[thread]] - # for now, handle each action in *program* as a special case inline here - if action == 'openfile': - ret = files[-1] # most recently appended - if action == 'listfiles': - ret = listing # most recently appended - # write log, if it might be corrupted write 'XXX' at the end - if state_invariant(): - log.append((thread, action, 'finish', ret)) - else: - log.append((thread, action, 'finish', ret, 'XXX')) - -def exit_enabled(thread): - return phase[thread] == 'finish' # holding lock - -# For now, handle exit as a special case, assign phase 'done'. -# If the simulated threads had more than one action, here we -# would advance to the next action and reset phase to 'start'. -def exit(thread): - phase[thread] = 'done' # release lock, indicate done - - -### Metadata - -state = ('pc', 'phase', 'log', 'files', 'listing') - -actions = (start, call, finish, exit) - -enablers = {start:(start_enabled,), call:(call_enabled,), - finish:(finish_enabled,), exit:(exit_enabled,)} - -domains = { start: { 'thread': threads }, - call: { 'thread': threads }, - finish: { 'thread': threads }, - exit: { 'thread': threads }} +""" +Simulate a program where two threads write to the same log +file. Exhibit nondeterminism in scheduling threads. Try to synchronize +so that only one thread at a time can write to the log. Detect unsafe +states where both threads may write to the log. Identify log messages +that may have been corrupted by unsynchronized writes from both +threads. + +The simulated multi-threaded program has been instrumented to save +traces of its API calls in a log. Instead of simply calling the API +functions, each thread instead calls the *tracecapture* function, +which calls the API function but also saves the call (with arguments) +and return (with return value) in the trace log. The tracecapture +function uses a lock: + + tracelock = lock() + + def tracecapture(thread_id, call, args): # call is action name + tracelock.acquire() + log(thread_id, call, "start", args) # log the call with arguments + tracelock.release() # release here allows threads to interleave + ret = call(*args) + tracelock.acquire() + log(thread_id, call, "finish", ret) # log the return with return value + tracelock.release() + return ret + +The purpose of *tracelock* is to ensure that only one thread at a time +can write to the trace log. To allow threads to interleave, the first +*tracelock.release()* *precedes* the call, otherwise the call might +block and hold *tracelock*, prevent other threads from running while +that call blocks. + +This model program exhibits nondeterminism in thread scheduling, so +traces (paths through the graphs) differ in the order that API calls +and returns are made, which results in different orders for messages in +the log. + +Parameters (constants that do not change when the model executes): + +program: models the multi-threaded program. It is a sequence of API +call ids (action ids) for each thread, first index is thread id, +second index is thread pc. Here we model a simple program with just +two threads, where each thread makes just one API call, executes +tracecapture just once, and writes just two log messages. With a few +revisions, we could also handle a more complicated simulated program +(with more threads and more actions). + +unsynchronized: set False to use tracelock, True to ignore tracelock + + +State: + +pc: program counter for each thread, indexed by thread_id +Indicates which action in program each thread is executing + +phase: phase of each thread, indexed by thread id +models progress through the tracecapture function (see above) +For each API call in program, phases in order are: ready, start, call, finish +phase[thread] == start means thread holds tracelock to write call start +phase[thread] == finish means thread holds tracelock to write call finish +After processing last API call in its thread, phase[thread] == done + +log: contents of the tracelog written by all the threads + +Actions: + +Each action represents progress through the phases of tracecapture. +Each action might be enabled for more than one thread in some states; +this models the nondeterminism of threading. + +""" + +from copy import copy + +## Parameters + +# This simple program has only two threads, with only one API call in each + +program = (( 'listfiles', ), # thread 0 + ( 'openfile', )) # thread 1 + +threads = list(range(len(program))) # one element of program for each thread + +unsynchronized = False # False: use tracelock, True: ignore tracelock + +### State + +# tracecapture state + +pc = list() # program counter for each thread +phase = list() # phase of each thread in tracecapture +log = list() # contents of tracelog written by all threads + +# file system state + +files = list() # filenames in filesystem +listing = list() # listfiles return value, FIXME ret should be in tracecapture + +### Safety condition + +# phases where a thread can write to the log +writing = ('start','finish') + +def writing_threads(): + """ + list of threads that can write to the log + """ + return [ t for t in threads if phase[t] in writing ] + +def state_invariant(): + """ + At most one thread can write to the log + """ + return len(writing_threads()) <= 1 + + +### Other necessary functions + +# run is allowed to stop +def accepting(): + return all([ phase[t] == 'done' for t in threads ]) + +# reset before another run +def reset(): + global pc, phase, log + pc = [ 0 for thread in program ] + phase = [ 'ready' for thread in program ] + log = [] + files = [] + +### Initialize + +reset() + +### Actions + +def start_enabled(thread): + return (phase[thread] == 'ready' + and (not writing_threads() # lock is free + or unsynchronized)) # ignore lock - might corrupt file + +def start(thread): + phase[thread] = 'start' # acquire lock + # write log, if it might be corrupted write 'XXX' at the end + if state_invariant(): + log.append((thread, program[thread][pc[thread]], 'start')) + else: + log.append((thread, program[thread][pc[thread]], 'start', 'XXX')) + +def call_enabled(thread): + return phase[thread] == 'start' # holding lock + +def call(thread): + global listing # we reassign whole list, we don't just update it + phase[thread] = 'call' # release lock, execute call + action = program[thread][pc[thread]] + # for now. handle each action in *program* as a special case inline here + if action == 'openfile': + files.append('file0') # only works if openfiles just called once + if action == 'listfiles': + listing = copy(files) # must copy now because open may change files + +def finish_enabled(thread): + return (phase[thread] == 'call' + and (not writing_threads() # lock is free + or unsynchronized)) # ignore lock - might corrupt file + +def finish(thread): + phase[thread] = 'finish' # acquire lock + action = program[thread][pc[thread]] + # for now, handle each action in *program* as a special case inline here + if action == 'openfile': + ret = files[-1] # most recently appended + if action == 'listfiles': + ret = listing # most recently appended + # write log, if it might be corrupted write 'XXX' at the end + if state_invariant(): + log.append((thread, action, 'finish', ret)) + else: + log.append((thread, action, 'finish', ret, 'XXX')) + +def exit_enabled(thread): + return phase[thread] == 'finish' # holding lock + +# For now, handle exit as a special case, assign phase 'done'. +# If the simulated threads had more than one action, here we +# would advance to the next action and reset phase to 'start'. +def exit(thread): + phase[thread] = 'done' # release lock, indicate done + + +### Metadata + +state = ('pc', 'phase', 'log', 'files', 'listing') + +actions = (start, call, finish, exit) + +enablers = {start:(start_enabled,), call:(call_enabled,), + finish:(finish_enabled,), exit:(exit_enabled,)} + +domains = { start: { 'thread': threads }, + call: { 'thread': threads }, + finish: { 'thread': threads }, + exit: { 'thread': threads }}