source: cows/trunk/cows/pylons/decorators.py @ 4008

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows/trunk/cows/pylons/decorators.py@4008
Revision 4008, 6.4 KB checked in by spascoe, 11 years ago (diff)

New COWS distribution. See [4005] for description.

Line 
1# Copyright (C) 2007 STFC & NERC (Science and Technology Facilities Council).
2# This software may be distributed under the terms of the
3# Q Public License, version 1.0 or later.
4# http://ndg.nerc.ac.uk/public_docs/QPublic_license.txt
5"""
6Decorators for annotating OWS server controllers with information used
7to populate the cows model.
8
9@author: Stephen Pascoe
10"""
11
12from cows.util import make_domain
13from cows.exceptions import * 
14import inspect
15
16def _wrap_sig(func, requiredArgs, optionalArgs, withVarkw=False, withVarargs=False):
17    """
18    Wrap a function in a function with a definied signature
19    (i.e. args, *args and **kwargs).  This function works around a
20    problem PEP-0262 is designed to address.  When wrapping functions using
21    decorators you loose the function signature (visible via
22    inspect.getargspec()).  This is a problem for Pylons because it uses inspection to
23    dispatch controller actions.
24
25    Not all signature information is retained in the wrapper.  Optional arguments are
26    supported but their default values are not visible (the wrapped method will handle
27    them as usual).
28
29    @param func: A function to be wrapped
30    @param requiredArgs: Required argument names
31    @param optionalArgs: Optional argument names
32    @param withVarargs: If True allow variable arguments
33    @param withVarkw: If True allow variable keyword arguments
34    @return: A function with the given argument signature which wraps func.
35
36    """
37
38    # Default acts as a singleton to mark optional arguments
39    class Default:
40        pass
41   
42    if withVarkw:
43        varkw = 'varkw_in'
44    else:
45        varkw = None
46    if withVarargs:
47        varargs = 'varargs_in'
48    else:
49        varargs = None
50
51    args = requiredArgs + optionalArgs
52    first_default = len(requiredArgs)
53
54    def formatarg(arg):
55        i = args.index(arg)
56        if i < first_default:
57            return arg
58        else:
59            return '%s=_wrap_default' % arg
60
61    def process(localVars):
62        args = [localVars[x] for x in requiredArgs]
63        args += localVars.get('varargs_in', [])
64        kwargs = localVars.get('varkw_in', {})
65        for arg in optionalArgs:
66            if localVars[arg] != Default:
67                kwargs[arg] = localVars[arg]
68        return args, kwargs
69
70    wrap_vars = dict(_wrap_func=func, _wrap_process=process, _wrap_default=Default)
71    wrap_sig = inspect.formatargspec(args, varargs, varkw,
72                                     formatarg=formatarg)
73
74    wrap_expr = """
75def %s%s:
76    args, kwargs = _wrap_process(locals())   
77    return _wrap_func(*args, **kwargs)""" % (func.__name__, wrap_sig)
78
79    print wrap_expr
80    exec wrap_expr in wrap_vars
81
82    return wrap_vars[func.__name__]
83
84
85def ows_operation(method):
86    """
87    A decorator which defines a method as a OWS operation.
88
89    The method is anotated with the attributes of the OWS protocol which are then
90    interogated during dispatch by OwsController to enforce OWS operation calling
91    behaviour.
92   
93    """
94
95    method._ows_name = method.__name__
96
97    return method
98
99#-----------------------------------------------------------------------------
100
101from unittest import TestCase
102
103class TestOperationDecorator(TestCase):
104    def setUp(self):
105        class Foo(object):
106            @ows_operation(['Bar'], ['Baz'])
107            def MyOp(self, bar, baz=1):
108                return bar+baz
109
110        self.foo = Foo()
111
112    def testOwsProtocol(self):
113        # Check OWS protocol is adhered to
114        assert self.foo.MyOp._ows_name == 'MyOp'
115        assert self.foo.MyOp._ows_required_parameters == ['Bar']
116        assert self.foo.MyOp._ows_optional_parameters == ['Baz']
117
118    def testCall(self):
119        assert self.foo.MyOp(2) == 3
120        assert self.foo.MyOp(bar=2) == 3
121        assert self.foo.MyOp(bar=2, baz=2) == 4
122
123    def testExtraArgs(self):
124        self.assertRaises(TypeError, lambda: self.foo.MyOp(1, 2, 3))
125        self.assertRaises(TypeError, lambda: self.foo.MyOp(1, 2, x=3))
126       
127
128
129class TestWrapSignature(TestCase):
130    def setUp(self):
131        def f(x, y, z='default', z2='default2', *args, **kwargs):
132            return dict(x=x, y=y, z=z, z2=z2, args=args, kwargs=kwargs)
133        self.wrap = _wrap_sig(f, ['x', 'y'], ['z', 'z2'], True, True)
134
135    def test1(self):
136        self.assertRaises(TypeError, lambda: self.wrap(1))
137
138    def test2(self):
139        d = self.wrap(1, 2)
140        self.assertEquals(d['x'], 1)
141        self.assertEquals(d['y'], 2)
142
143    def test3(self):
144        d = self.wrap(1,2)
145        self.assertEquals(d['z'], 'default')
146        self.assertEquals(d['z2'], 'default2')
147
148    def test4(self):
149        d = self.wrap(1, 2, 3)
150        self.assertEquals(d['z'], 3)
151        self.assertEquals(d['z2'], 'default2')
152
153    def test5(self):
154        d = self.wrap(1, 2, z2=3)
155        self.assertEquals(d['z'], 'default')
156        self.assertEquals(d['z2'], 3)
157
158    def test6(self):
159        d = self.wrap(*(1,2))
160        self.assertEquals(d['x'], 1)
161        self.assertEquals(d['y'], 2)
162
163    def test7(self):
164        d = self.wrap(1, 2, 3, 4)
165        self.assertEquals(d['x'], 1)
166        self.assertEquals(d['y'], 2)
167        self.assertEquals(d['z'], 3)
168        self.assertEquals(d['z2'], 4)
169
170    def test8(self):
171        d = self.wrap(1,2, **dict(z=3, w=4))
172        self.assertEquals(d['z'], 3)
173        self.assertEquals(d['kwargs']['w'],4)
174
175class TestWrapSignature2(TestWrapSignature):
176    def setUp(self):
177        """
178        Make a function that accepts anything but has the same return value as in
179        TestWrapSignature.
180        """
181        def f(*args, **kwargs):
182            d = {}
183
184            try:
185                d['x'] = args[0]
186            except IndexError:
187                d['x'] = kwargs['x']
188
189            try:
190                d['y'] = args[1]
191            except IndexError:
192                d['y'] = kwargs['y']
193
194            try:
195                d['z'] = args[2]
196            except IndexError:
197                try:
198                    d['z'] = kwargs['z']
199                except KeyError:
200                    d['z'] = 'default'
201
202            try:
203                d['z2'] = args[3]
204            except IndexError:
205                try:
206                    d['z2'] = kwargs['z2']
207                except KeyError:
208                    d['z2'] = 'default2'
209
210            for k in 'x', 'y', 'z', 'z2':
211                if k in kwargs:
212                    del kwargs[k]
213
214            d['args'] = args[4:]
215            d['kwargs'] = kwargs
216
217            return d
218
219        self.wrap = _wrap_sig(f, ['x', 'y'], ['z', 'z2'], True, True)
Note: See TracBrowser for help on using the repository browser.