source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py @ 5935

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py@5935
Revision 5935, 5.7 KB checked in by pjkersha, 11 years ago (diff)

1.3.0 Release

Completed MyProxy? SAML Attribute assertion callout and added console script entry point.

Line 
1#!/usr/bin/env python
2"""Unit tests for NDG Security MyProxy Extensions callout for adding SAML
3Attribute Assertions to issued X.509 Certificates
4
5NERC DataGrid Project
6"""
7__author__ = "P J Kershaw"
8__date__ = "29/10/09"
9__copyright__ = "(C) 2009 Science and Technology Facilities Council"
10__license__ = "BSD - see LICENSE file in top-level directory"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = '$Id:$'
13import logging
14logging.basicConfig(level=logging.DEBUG)
15import os
16from string import Template
17from cStringIO import StringIO
18
19from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
20from sqlalchemy.ext.declarative import declarative_base
21from sqlalchemy.orm import sessionmaker
22
23from ndg.security.test.unit import BaseTestCase
24from ndg.security.server.myproxy.certificate_extapp.saml_attribute_assertion \
25    import CertExtApp, CertExtConsoleApp
26 
27from sqlalchemy.ext.declarative import declarative_base
28
29
30class CertExtAppTestCase(BaseTestCase):
31    THIS_DIR = os.path.dirname(__file__)
32    USERNAME = 'pjk'
33    OPENID_IDENTIFIER = 'philip.kershaw'
34    OPENID_TMPL = "https://openid.localhost/${userIdentifier}"
35    OPENID = Template(OPENID_TMPL).substitute(
36                                        dict(userIdentifier=OPENID_IDENTIFIER))
37    OPENID_SQL_QUERY = ("select openid_identifier from users where username "
38                        "= '${username}'") 
39   
40    DB_FILENAME = 'user.db'
41    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILENAME
42   
43    def __init__(self, *arg, **kw):
44        super(CertExtAppTestCase, self).__init__(*arg, **kw)           
45        self.startSiteAAttributeAuthority()
46       
47        if not os.path.isfile(os.path.join(CertExtAppTestCase.THIS_DIR,
48                                           CertExtAppTestCase.DB_FILENAME)):
49            self._createDb()
50       
51    def _createDb(self):
52        db = create_engine(CertExtAppTestCase.DB_CONNECTION_STR)
53       
54        metadata = MetaData()
55        table = Table('users', metadata,
56                      Column('id', Integer, primary_key=True),
57                      Column('username', String),
58                      Column('openid_identifier', String))
59        metadata.create_all(db)
60       
61        class User(declarative_base()):
62            __tablename__ = 'users'
63       
64            id = Column(Integer, primary_key=True)
65            username = Column('username', String(40))
66            openid_identifier = Column('openid_identifier', String(40))
67       
68            def __init__(self, username, openid_identifier):
69                self.username = username
70                self.openid_identifier = openid_identifier
71       
72        user = User(CertExtAppTestCase.USERNAME, 
73                    CertExtAppTestCase.OPENID_IDENTIFIER)
74       
75        Session = sessionmaker(bind=db)
76        session = Session()
77        session.add(user)
78        session.commit()
79       
80    def test01DbQuery(self):
81        myProxyCertExtApp = CertExtApp()
82        myProxyCertExtApp.connectionString = \
83                    CertExtAppTestCase.DB_CONNECTION_STR
84                   
85        myProxyCertExtApp.openIdSqlQuery = CertExtAppTestCase.OPENID_SQL_QUERY
86       
87        identifier = myProxyCertExtApp.queryOpenId(
88                                CertExtAppTestCase.USERNAME)
89        self.assert_(identifier == 
90                     CertExtAppTestCase.OPENID_IDENTIFIER)
91       
92    def test02AttributeQuery(self):
93        myProxyCertExtApp = CertExtApp()
94        myProxyCertExtApp.attributeAuthorityURI = ('http://localhost:%d'
95                                                   '/AttributeAuthority/saml' % 
96        CertExtAppTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM
97        )
98        myProxyCertExtApp.issuerDN = "/O=Site A/CN=Authorisation Service"
99        myProxyCertExtApp.userOpenID = CertExtAppTestCase.OPENID
100                               
101        assertion = myProxyCertExtApp.attributeQuery()
102        print(assertion)
103       
104    def test03End2End(self):
105        myProxyCertExtApp = CertExtApp()
106       
107        myProxyCertExtApp.connectionString = \
108                    CertExtAppTestCase.DB_CONNECTION_STR
109                   
110        myProxyCertExtApp.openIdSqlQuery = (
111                            "select openid_identifier from "
112                            "users where username = '%s'" %
113                            CertExtAppTestCase.USERNAME)
114       
115        myProxyCertExtApp.identityUriTemplate = \
116                            CertExtAppTestCase.OPENID_TMPL
117                           
118        myProxyCertExtApp.attributeAuthorityURI = ('http://localhost:%d'
119                                                   '/AttributeAuthority/saml' % 
120        CertExtAppTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM
121        )
122        myProxyCertExtApp.issuerDN = "/O=Site A/CN=Authorisation Service"
123       
124        assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME)
125        self.assert_(assertion)
126        print(assertion)
127
128    def test04FromConfigFile(self):
129        configFilePath = os.path.join(CertExtAppTestCase.THIS_DIR, 'config.ini')
130        myProxyCertExtApp = CertExtApp.fromConfigFile(configFilePath)
131        assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME)
132        self.assert_(assertion)
133        print(assertion)
134
135    def test05ConsoleApp(self):
136        configFilePath = os.path.join(CertExtAppTestCase.THIS_DIR, 'config.ini')
137        import sys
138        sys.argv = [
139            None, 
140            "-f",
141            configFilePath,
142            "-u",
143            CertExtAppTestCase.USERNAME
144        ]
145        try:
146            stdOut = sys.stdout
147            sys.stdout = StringIO()
148           
149            CertExtConsoleApp.run()
150            output = sys.stdout.getvalue()
151        finally:
152            sys.stdout = stdOut
153       
154        self.assert_(output)       
155        print(output)
156       
Note: See TracBrowser for help on using the repository browser.