source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py @ 5937

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py@5937
Revision 5937, 5.6 KB checked in by pjkersha, 11 years ago (diff)
Line 
1#!/usr/bin/env python
2"""Unit tests for NDG Security MyProxy Extensions callout for adding SAML
3Attribute Assertions to issued X.509 Certificates
4
5NERC DataGrid Project
6"""
7__author__ = "P J Kershaw"
8__date__ = "29/10/09"
9__copyright__ = "(C) 2009 Science and Technology Facilities Council"
10__license__ = "BSD - see LICENSE file in top-level directory"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = '$Id:$'
13import logging
14logging.basicConfig(level=logging.DEBUG)
15import os
16from string import Template
17from cStringIO import StringIO
18
19from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
20from sqlalchemy.ext.declarative import declarative_base
21from sqlalchemy.orm import sessionmaker
22
23from ndg.security.test.unit import BaseTestCase
24from ndg.security.server.myproxy.certificate_extapp.saml_attribute_assertion \
25    import CertExtApp, CertExtConsoleApp
26 
27from sqlalchemy.ext.declarative import declarative_base
28
29
30class CertExtAppTestCase(BaseTestCase):
31    THIS_DIR = os.path.dirname(__file__)
32    USERNAME = 'pjk'
33    OPENID_IDENTIFIER = 'philip.kershaw'
34    OPENID_TMPL = "https://openid.localhost/${userIdentifier}"
35    OPENID = Template(OPENID_TMPL).substitute(
36                                        dict(userIdentifier=OPENID_IDENTIFIER))
37    OPENID_SQL_QUERY = ("select openid_identifier from users where username "
38                        "= '${username}'") 
39   
40    DB_FILENAME = 'user.db'
41    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILENAME
42   
43    def __init__(self, *arg, **kw):
44        super(CertExtAppTestCase, self).__init__(*arg, **kw)           
45        self.startSiteAAttributeAuthority()
46       
47        if not os.path.isfile(os.path.join(CertExtAppTestCase.THIS_DIR,
48                                           CertExtAppTestCase.DB_FILENAME)):
49            self._createDb()
50       
51    def _createDb(self):
52        db = create_engine(CertExtAppTestCase.DB_CONNECTION_STR)
53       
54        metadata = MetaData()
55        table = Table('users', metadata,
56                      Column('id', Integer, primary_key=True),
57                      Column('username', String),
58                      Column('openid_identifier', String))
59        metadata.create_all(db)
60       
61        class User(declarative_base()):
62            __tablename__ = 'users'
63       
64            id = Column(Integer, primary_key=True)
65            username = Column('username', String(40))
66            openid_identifier = Column('openid_identifier', String(40))
67       
68            def __init__(self, username, openid_identifier):
69                self.username = username
70                self.openid_identifier = openid_identifier
71       
72        user = User(CertExtAppTestCase.USERNAME, 
73                    CertExtAppTestCase.OPENID_IDENTIFIER)
74       
75        Session = sessionmaker(bind=db)
76        session = Session()
77        session.add(user)
78        session.commit()
79       
80    def test01DbQuery(self):
81        myProxyCertExtApp = CertExtApp()
82        myProxyCertExtApp.connectionString = \
83                    CertExtAppTestCase.DB_CONNECTION_STR
84                   
85        myProxyCertExtApp.openIdSqlQuery = CertExtAppTestCase.OPENID_SQL_QUERY
86       
87        identifier = myProxyCertExtApp.queryOpenId(CertExtAppTestCase.USERNAME)
88        self.assert_(identifier == CertExtAppTestCase.OPENID_IDENTIFIER)
89       
90    def test02AttributeQuery(self):
91        myProxyCertExtApp = CertExtApp()
92        myProxyCertExtApp.attributeAuthorityURI = (
93                            'http://localhost:%d/AttributeAuthority/saml' % 
94                            CertExtAppTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM)
95        myProxyCertExtApp.issuerDN = "/O=Site A/CN=Authorisation Service"
96        myProxyCertExtApp.userOpenID = CertExtAppTestCase.OPENID
97                               
98        assertion = myProxyCertExtApp.attributeQuery()
99        print(assertion)
100       
101    def test03End2End(self):
102        myProxyCertExtApp = CertExtApp()
103       
104        myProxyCertExtApp.connectionString = \
105                                        CertExtAppTestCase.DB_CONNECTION_STR
106                   
107        myProxyCertExtApp.openIdSqlQuery = ("select openid_identifier from "
108                                            "users where username = '%s'" %
109                                            CertExtAppTestCase.USERNAME)
110       
111        myProxyCertExtApp.identityUriTemplate = CertExtAppTestCase.OPENID_TMPL
112                           
113        myProxyCertExtApp.attributeAuthorityURI = (
114                        'http://localhost:%d/AttributeAuthority/saml' % 
115                        CertExtAppTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM)
116        myProxyCertExtApp.issuerDN = "/O=Site A/CN=Authorisation Service"
117       
118        assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME)
119        self.assert_(assertion)
120        print(assertion)
121
122    def test04FromConfigFile(self):
123        configFilePath = os.path.join(CertExtAppTestCase.THIS_DIR, 'config.ini')
124        myProxyCertExtApp = CertExtApp.fromConfigFile(configFilePath)
125        assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME)
126        self.assert_(assertion)
127        print(assertion)
128
129    def test05ConsoleApp(self):
130        configFilePath = os.path.join(CertExtAppTestCase.THIS_DIR, 'config.ini')
131        import sys
132        sys.argv = [
133            None, 
134            "-f",
135            configFilePath,
136            "-u",
137            CertExtAppTestCase.USERNAME
138        ]
139        try:
140            stdOut = sys.stdout
141            sys.stdout = StringIO()
142           
143            CertExtConsoleApp.run()
144            output = sys.stdout.getvalue()
145        finally:
146            sys.stdout = stdOut
147       
148        self.assert_(output)       
149        print(output)
150       
Note: See TracBrowser for help on using the repository browser.