Changeset 3235 for TI05-delivery


Ignore:
Timestamp:
16/01/08 15:33:47 (12 years ago)
Author:
spascoe
Message:

Simplifying OWS Protocol in the decorators module

Location:
TI05-delivery/ows_framework/branches/ows_framework-refactor/ows_common/ows_common
Files:
1 added
1 edited
1 copied

Legend:

Unmodified
Added
Removed
  • TI05-delivery/ows_framework/branches/ows_framework-refactor/ows_common/ows_common/pylons/decorators.py

    r3218 r3235  
    1414import inspect 
    1515 
    16 def ows_operation(method): 
     16def _wrap_sig(func, requiredArgs, optionalArgs, withVarkw=False, withVarargs=False): 
     17    """ 
     18    Wrap a function in a function with a definied signature 
     19    (i.e. args, *args and **kwargs).  This function works around a 
     20    problem PEP-0262 is designed to address.  When wrapping functions using 
     21    decorators you loose the function signature (visible via 
     22    inspect.getargspec()).  This is a problem for Pylons because it uses inspection to 
     23    dispatch controller actions. 
     24 
     25    Not all signature information is retained in the wrapper.  Optional arguments are 
     26    supported but their default values are not visible (the wrapped method will handle 
     27    them as usual). 
     28 
     29    @param func: A function to be wrapped 
     30    @param requiredArgs: Required argument names 
     31    @param optionalArgs: Optional argument names 
     32    @param withVarargs: If True allow variable arguments 
     33    @param withVarkw: If True allow variable keyword arguments 
     34    @return: A function with the given argument signature which wraps func. 
     35 
     36    """ 
     37 
     38    # Default acts as a singleton to mark optional arguments 
     39    class Default: 
     40        pass 
     41     
     42    if withVarkw: 
     43        varkw = 'varkw_in' 
     44    else: 
     45        varkw = None 
     46    if withVarargs: 
     47        varargs = 'varargs_in' 
     48    else: 
     49        varargs = None 
     50 
     51    args = requiredArgs + optionalArgs 
     52    first_default = len(requiredArgs) 
     53 
     54    def formatarg(arg): 
     55        i = args.index(arg) 
     56        if i < first_default: 
     57            return arg 
     58        else: 
     59            return '%s=_wrap_default' % arg 
     60 
     61    def process(localVars): 
     62        args = [localVars[x] for x in requiredArgs] 
     63        args += localVars.get('varargs_in', []) 
     64        kwargs = localVars.get('varkw_in', {}) 
     65        for arg in optionalArgs: 
     66            if localVars[arg] != Default: 
     67                kwargs[arg] = localVars[arg] 
     68        return args, kwargs 
     69 
     70    wrap_vars = dict(_wrap_func=func, _wrap_process=process, _wrap_default=Default) 
     71    wrap_sig = inspect.formatargspec(args, varargs, varkw, 
     72                                     formatarg=formatarg) 
     73 
     74    wrap_expr = """ 
     75def %s%s: 
     76    args, kwargs = _wrap_process(locals())     
     77    return _wrap_func(*args, **kwargs)""" % (func.__name__, wrap_sig) 
     78 
     79    print wrap_expr 
     80    exec wrap_expr in wrap_vars 
     81 
     82    return wrap_vars[func.__name__] 
     83 
     84 
     85def ows_operation(requiredParameters, optionalParameters): 
    1786    """ 
    1887    A decorator which defines a method as a OWS operation. 
    1988 
    20     The decorator assumes it is used in the context of an OwsController object with the 
    21     attribute self.ows_params available. 
    22  
    23  
    24     The method is wrapped in function that performs various OWS compatible checks: 
    25      1. Parameters from the request are checked against the methods _ows_parameters attribute 
    26         to ensure they conform to possibleValues.  Valid parameters are passed to the method 
    27         as arguments, invalid parameters trigger an exception. 
    28  
    29     @note: It would seem natural to implement this as a decorator class 
    30         but I can't make it work. 
    31  
    32     """ 
    33  
    34     argspec = inspect.getargspec(method) 
    35  
    36     def wrapper(self, **kw): 
    37         """ 
    38         This wrapper assumes invocation via GET with KVP encoding.  Future implementations 
    39         could detect POST and support XML encoding. 
    40  
    41         @param self: The object of the method being called. 
    42         @param kw: All other keyword arguments are passed to the method. 
    43          
    44         """ 
    45  
    46         # Remove unwanted arguments from kw 
    47         if not argspec[2]: 
    48             kw2 = {} 
    49             for k, v in kw.iteritems(): 
    50                 if k in argspec[0]: 
    51                     kw2[k] = v 
    52             kw = kw2    
    53  
    54         # Add arguments to kw according to self._ows_parameters 
    55         for param, domain in wrapper._ows_parameters.iteritems(): 
    56             try: 
    57                 param_l = param.lower() 
    58                 value = self.ows_params[param_l] 
    59             except KeyError: 
    60                 if param in wrapper._ows_required_parameters: 
    61                     raise MissingParameterValue('%s parameter is not specified' % param, 
    62                                                 param) 
    63             else: 
    64                 # If the argument is present check it's a valid value 
    65                 if not domain.isValidValue(value): 
    66                     raise InvalidParameterValue('%s is not a valid value for %s' % (value, param), param) 
    67                 # Use validator function if present 
    68                 if param in wrapper._ows_validators: 
    69                     value = wrapper._ows_validators[param](value) 
    70  
    71                 # Set value in kw dict 
    72                 kw[param_l] = value 
    73                  
    74         return method(self, **kw) 
    75  
    76     # Propergate the OWS protocol attributes up to the wrapper 
    77     wrapper._ows_name = method.__name__ 
    78     wrapper._ows_parameters = getattr(method, '_ows_parameters', {}) 
    79     wrapper._ows_constraints = getattr(method, '_ows_constraints', {}) 
    80     wrapper._ows_required_parameters = getattr(method, '_ows_required_parameters', [])         
    81     wrapper._ows_validators = getattr(method, '_ows_validators', {}) 
    82  
    83     return wrapper 
    84  
    85  
    86 def ows_parameter(name, value=None, possibleValues=None, 
    87               meaning=None, dataType=None, valuesUnit=None, required=False, 
    88               validator=None): 
    89     """ 
    90     A decorator to add a parameter to an operation. 
    91  
    92     @param name: the parameter name.  This should be appropriately CamelCased. 
    93     @param value: the ows:Domain.defaultValue attribute or None 
    94     @param possibleValues: None or a list of allowed values or a 
    95         PossibleValues instance. 
    96     @param required: A boolean that indicates whether to raise an exception if the parameter 
    97         is missing. 
    98     @param validator: If not None this is a function which is called to validate operation 
    99         parameters.  It takes the parameter value string as it's only argument and returns 
    100         a parsed version of the value or raises an OWS exception. 
     89    The argument signature of the wrapped method is deduced from the decorator 
     90    arguments to allow other decorators to be used below this one 
     91    (e.g. typecheck decorators) and still work as Pylons actions. 
     92 
     93    Parameter names should be lowerCamelCase to that match the method 
     94    signature such that <parameter-x>.lower() == <argument-x>.  This 
     95    is required to support XML request encoding. 
     96 
     97    @param requiredParameters: A sequence of required parameter names in lowerCamelCase 
     98    @param optionalParameters: A sequence of optional parameter names in lowerCamelCase 
    10199     
    102100    """ 
    103     def d(method): 
    104         method._ows_parameters = getattr(method, '_ows_parameters', {}) 
    105         method._ows_validators = getattr(method, '_ows_validators', {}) 
    106         method._ows_parameters[name] = make_domain(value=value, 
    107                                                  possibleValues=possibleValues, 
    108                                                  meaning=meaning, 
    109                                                  dataType=dataType, 
    110                                                  valuesUnit=valuesUnit) 
    111         method._ows_required_parameters = getattr(method, '_ows_required_parameters', []) 
    112         if required: 
    113             method._ows_required_parameters.append(name) 
    114         if validator: 
    115             method._ows_validators[name] = validator 
    116              
    117         return method 
    118  
    119     return d 
    120  
    121  
    122 def ows_constraint(name, value=None, possibleValues=None, 
    123                meaning=None, dataType=None, valuesUnit=None): 
    124     """ 
    125     A decorator to add a constraint to an operation. 
    126  
    127     @param name: the parameter name.  This should be appropriately CamelCased. 
    128     @param value: the ows:Domain.defaultValue attribute or None 
    129     @param possibleValues: None or a list of allowed values or a 
    130         PossibleValues instance. 
    131      
    132     """ 
    133     def d(method): 
    134         method._ows_constraints = getattr(method, '_ows_constraints', {}) 
    135         method._ows_constraints[name] = make_domain(value=value, 
    136                                                  possibleValues=possibleValues, 
    137                                                  meaning=meaning, 
    138                                                  dataType=dataType, 
    139                                                  valuesUnit=valuesUnit) 
    140         return method 
    141  
    142     return d 
     101 
     102 
     103    def f(method): 
     104        wrapper = _wrap_sig(method, 
     105                                 ['self'] + [x.lower() for x in requiredParameters], 
     106                                 [x.lower() for x in optionalParameters]) 
     107        wrapper._ows_name = method.__name__ 
     108        wrapper._ows_required_parameters = requiredParameters 
     109        wrapper._ows_optional_parameters = optionalParameters 
     110 
     111        return wrapper 
     112 
     113    return f 
     114 
    143115 
    144116#----------------------------------------------------------------------------- 
    145117 
    146118from unittest import TestCase 
    147  
    148 class TestDecorators(TestCase): 
    149     def testParameter(self): 
    150         @parameter('foo', 'bar') 
    151         def f(x): 
    152             return x+1 
    153         domain = f._ows_parameters['foo'] 
    154         assert domain.defaultValue == 'bar' 
    155         assert domain.possibleValues.type == domain.possibleValues.ANY_VALUE 
    156         assert f(2) == 3 
    157  
    158     def testConstraint(self): 
    159         @constraint('foo', 'bar') 
    160         def f(x): 
    161             return x+1 
    162         domain = f._ows_constraints['foo'] 
    163         assert domain.defaultValue == 'bar' 
    164         assert domain.possibleValues.type == domain.possibleValues.ANY_VALUE 
    165         assert f(2) == 3 
    166      
    167     def testMultiParameter(self): 
    168         @parameter('foo', 'bar') 
    169         @parameter('baz', 12) 
    170         def f(x): 
    171             return x+1 
    172         assert 'foo' in f._ows_parameters.keys() 
    173         assert 'baz' in f._ows_parameters.keys() 
    174  
    175     def testMultiConstraint(self): 
    176         @constraint('foo', 'bar') 
    177         @constraint('baz', 12) 
    178         def f(x): 
    179             return x+1 
    180         assert 'foo' in f._ows_constraints.keys() 
    181         assert 'baz' in f._ows_constraints.keys() 
    182         assert f(2) == 3 
    183  
    184     def testComplexParameter(self): 
    185         @parameter('foo', 'bar', possibleValues=[1,2,3]) 
    186         def f(x): 
    187             return x+1 
    188         domain = f._ows_parameters['foo'] 
    189         assert domain.defaultValue == 'bar' 
    190         print domain.possibleValues.type 
    191         assert domain.possibleValues.type == \ 
    192                domain.possibleValues.ALLOWED_VALUES 
    193         assert domain.possibleValues.allowedValues == [1,2,3] 
    194  
    195119 
    196120class TestOperationDecorator(TestCase): 
    197121    def setUp(self): 
    198122        class Foo(object): 
    199             @operation 
    200             @parameter('Bar', required=True) 
    201             @parameter('Baz', possibleValues=['a', 'b']) 
    202             def MyOp(self, bar, baz=None): 
    203                 return bar+1 
     123            @ows_operation(['Bar'], ['Baz']) 
     124            def MyOp(self, bar, baz=1): 
     125                return bar+baz 
    204126 
    205127        self.foo = Foo() 
     
    208130        # Check OWS protocol is adhered to 
    209131        assert self.foo.MyOp._ows_name == 'MyOp' 
    210         assert self.foo.MyOp._ows_parameters.keys() == ['Bar', 'Baz'] 
    211         assert self.foo.MyOp._ows_constraints == {} 
    212132        assert self.foo.MyOp._ows_required_parameters == ['Bar'] 
     133        assert self.foo.MyOp._ows_optional_parameters == ['Baz'] 
    213134 
    214135    def testCall(self): 
    215         self.foo.ows_params = {'bar': 2} 
    216         assert self.foo.MyOp() == 3 
    217  
    218     def testFailMissingParameter(self): 
    219         self.foo.ows_params = {'bip': 2} 
    220         self.assertRaises(MissingParameterValue, self.foo.MyOp) 
    221  
    222     def testFailIncorrectParameterValue(self): 
    223         self.foo.ows_params = {'bar': 1, 'baz': 2} 
    224         self.assertRaises(InvalidParameterValue, self.foo.MyOp) 
    225  
    226     def testCallWithPossibleValue(self): 
    227         self.foo.ows_params = {'bar': 2, 'baz': 'b'} 
    228         assert self.foo.MyOp() == 3 
     136        assert self.foo.MyOp(2) == 3 
     137        assert self.foo.MyOp(bar=2) == 3 
     138        assert self.foo.MyOp(bar=2, baz=2) == 4 
     139 
     140    def testExtraArgs(self): 
     141        self.assertRaises(TypeError, lambda: self.foo.MyOp(1, 2, 3)) 
     142        self.assertRaises(TypeError, lambda: self.foo.MyOp(1, 2, x=3)) 
     143         
     144 
     145 
     146class TestWrapSignature(TestCase): 
     147    def setUp(self): 
     148        def f(x, y, z='default', z2='default2', *args, **kwargs): 
     149            return dict(x=x, y=y, z=z, z2=z2, args=args, kwargs=kwargs) 
     150        self.wrap = _wrap_sig(f, ['x', 'y'], ['z', 'z2'], True, True) 
     151 
     152    def test1(self): 
     153        self.assertRaises(TypeError, lambda: self.wrap(1)) 
     154 
     155    def test2(self): 
     156        d = self.wrap(1, 2) 
     157        self.assertEquals(d['x'], 1) 
     158        self.assertEquals(d['y'], 2) 
     159 
     160    def test3(self): 
     161        d = self.wrap(1,2) 
     162        self.assertEquals(d['z'], 'default') 
     163        self.assertEquals(d['z2'], 'default2') 
     164 
     165    def test4(self): 
     166        d = self.wrap(1, 2, 3) 
     167        self.assertEquals(d['z'], 3) 
     168        self.assertEquals(d['z2'], 'default2') 
     169 
     170    def test5(self): 
     171        d = self.wrap(1, 2, z2=3) 
     172        self.assertEquals(d['z'], 'default') 
     173        self.assertEquals(d['z2'], 3) 
     174 
     175    def test6(self): 
     176        d = self.wrap(*(1,2)) 
     177        self.assertEquals(d['x'], 1) 
     178        self.assertEquals(d['y'], 2) 
     179 
     180    def test7(self): 
     181        d = self.wrap(1, 2, 3, 4) 
     182        self.assertEquals(d['x'], 1) 
     183        self.assertEquals(d['y'], 2) 
     184        self.assertEquals(d['z'], 3) 
     185        self.assertEquals(d['z2'], 4) 
     186 
     187    def test8(self): 
     188        d = self.wrap(1,2, **dict(z=3, w=4)) 
     189        self.assertEquals(d['z'], 3) 
     190        self.assertEquals(d['kwargs']['w'],4) 
     191 
     192class TestWrapSignature2(TestWrapSignature): 
     193    def setUp(self): 
     194        """ 
     195        Make a function that accepts anything but has the same return value as in 
     196        TestWrapSignature. 
     197        """ 
     198        def f(*args, **kwargs): 
     199            d = {} 
     200 
     201            try: 
     202                d['x'] = args[0] 
     203            except IndexError: 
     204                d['x'] = kwargs['x'] 
     205 
     206            try: 
     207                d['y'] = args[1] 
     208            except IndexError: 
     209                d['y'] = kwargs['y'] 
     210 
     211            try: 
     212                d['z'] = args[2] 
     213            except IndexError: 
     214                try: 
     215                    d['z'] = kwargs['z'] 
     216                except KeyError: 
     217                    d['z'] = 'default' 
     218 
     219            try: 
     220                d['z2'] = args[3] 
     221            except IndexError: 
     222                try: 
     223                    d['z2'] = kwargs['z2'] 
     224                except KeyError: 
     225                    d['z2'] = 'default2' 
     226 
     227            for k in 'x', 'y', 'z', 'z2': 
     228                if k in kwargs: 
     229                    del kwargs[k] 
     230 
     231            d['args'] = args[4:] 
     232            d['kwargs'] = kwargs 
     233 
     234            return d 
     235 
     236        self.wrap = _wrap_sig(f, ['x', 'y'], ['z', 'z2'], True, True) 
Note: See TracChangeset for help on using the changeset viewer.