source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py @ 5929

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py@5929
Revision 5929, 5.4 KB checked in by pjkersha, 11 years ago (diff)

Working unit tests for MyProxy? SAML Attribute assertion callout. TODO: add console script entry point.

Line 
1#!/usr/bin/env python
2"""Unit tests for NDG Security MyProxy Extensions callout for adding SAML
3Attribute Assertions to issued X.509 Certificates
4
5NERC DataGrid Project
6"""
7__author__ = "P J Kershaw"
8__date__ = "29/10/09"
9__copyright__ = "(C) 2009 Science and Technology Facilities Council"
10__license__ = "BSD - see LICENSE file in top-level directory"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = '$Id:$'
13import logging
14logging.basicConfig(level=logging.DEBUG)
15import os
16from string import Template
17
18from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
19from sqlalchemy.ext.declarative import declarative_base
20from sqlalchemy.orm import sessionmaker
21
22from ndg.security.test.unit import BaseTestCase
23from ndg.security.server.myproxy.certificate_extapp.saml_attribute_assertion \
24    import SamlAssertionMyProxyCertExtApp
25 
26from sqlalchemy.ext.declarative import declarative_base
27
28
29class SamlAssertionMyProxyCertExtAppTestCase(BaseTestCase):
30    THIS_DIR = os.path.dirname(__file__)
31    USERNAME = 'pjk'
32    OPENID_IDENTIFIER = 'philip.kershaw'
33    OPENID_TMPL = "https://openid.localhost/${userIdentifier}"
34    OPENID = Template(OPENID_TMPL).substitute(
35                                        dict(userIdentifier=OPENID_IDENTIFIER))
36   
37    DB_FILENAME = 'user.db'
38    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILENAME
39   
40    def __init__(self, *arg, **kw):
41        super(SamlAssertionMyProxyCertExtAppTestCase, self).__init__(*arg, 
42                                                                     **kw)           
43        self.startSiteAAttributeAuthority()
44        if not os.path.isfile(os.path.join(
45                        SamlAssertionMyProxyCertExtAppTestCase.THIS_DIR,
46                        SamlAssertionMyProxyCertExtAppTestCase.DB_FILENAME)):
47            self._createDb()
48       
49    def _createDb(self):
50        db = create_engine(
51                    SamlAssertionMyProxyCertExtAppTestCase.DB_CONNECTION_STR)
52       
53        metadata = MetaData()
54        table = Table('users', metadata,
55                      Column('id', Integer, primary_key=True),
56                      Column('username', String),
57                      Column('openid_identifier', String))
58        metadata.create_all(db)
59       
60        class User(declarative_base()):
61            __tablename__ = 'users'
62       
63            id = Column(Integer, primary_key=True)
64            username = Column('username', String(40))
65            openid_identifier = Column('openid_identifier', String(40))
66       
67            def __init__(self, username, openid_identifier):
68                self.username = username
69                self.openid_identifier = openid_identifier
70       
71        user = User(SamlAssertionMyProxyCertExtAppTestCase.USERNAME, 
72                    SamlAssertionMyProxyCertExtAppTestCase.OPENID_IDENTIFIER)
73       
74        Session = sessionmaker(bind=db)
75        session = Session()
76        session.add(user)
77        session.commit()
78       
79    def test01DbQuery(self):
80        myProxyCertExtApp = SamlAssertionMyProxyCertExtApp()
81        myProxyCertExtApp.connectionString = \
82                    SamlAssertionMyProxyCertExtAppTestCase.DB_CONNECTION_STR
83                   
84        myProxyCertExtApp.openIdSqlQuery = (
85                            "select openid_identifier from "
86                            "users where username = '%s'" %
87                            SamlAssertionMyProxyCertExtAppTestCase.USERNAME)
88       
89        identifier = myProxyCertExtApp.queryOpenId(
90                                SamlAssertionMyProxyCertExtAppTestCase.USERNAME)
91        self.assert_(identifier == 
92                     SamlAssertionMyProxyCertExtAppTestCase.OPENID_IDENTIFIER)
93       
94    def test02AttributeQuery(self):
95        myProxyCertExtApp = SamlAssertionMyProxyCertExtApp()
96        myProxyCertExtApp.attributeAuthorityURI = ('http://localhost:%d'
97                                                   '/AttributeAuthority/saml' % 
98        SamlAssertionMyProxyCertExtAppTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM
99        )
100        myProxyCertExtApp.issuerDN = "/O=Site A/CN=Authorisation Service"
101        myProxyCertExtApp.userOpenID = \
102                                SamlAssertionMyProxyCertExtAppTestCase.OPENID
103                               
104        assertion = myProxyCertExtApp.attributeQuery()
105        print(assertion)
106       
107    def test03End2End(self):
108        myProxyCertExtApp = SamlAssertionMyProxyCertExtApp()
109       
110        myProxyCertExtApp.connectionString = \
111                    SamlAssertionMyProxyCertExtAppTestCase.DB_CONNECTION_STR
112                   
113        myProxyCertExtApp.openIdSqlQuery = (
114                            "select openid_identifier from "
115                            "users where username = '%s'" %
116                            SamlAssertionMyProxyCertExtAppTestCase.USERNAME)
117       
118        myProxyCertExtApp.identityUriTemplate = \
119                            SamlAssertionMyProxyCertExtAppTestCase.OPENID_TMPL
120                           
121        myProxyCertExtApp.attributeAuthorityURI = ('http://localhost:%d'
122                                                   '/AttributeAuthority/saml' % 
123        SamlAssertionMyProxyCertExtAppTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM
124        )
125        myProxyCertExtApp.issuerDN = "/O=Site A/CN=Authorisation Service"
126       
127        assertion = myProxyCertExtApp(
128                                SamlAssertionMyProxyCertExtAppTestCase.USERNAME)
129        self.assert_(assertion)
130        print(assertion)
131       
Note: See TracBrowser for help on using the repository browser.