source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/test_saml_attribute_assertion.py@7077
Revision 7077, 5.4 KB checked in by pjkersha, 9 years ago (diff)
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for NDG Security MyProxy Extensions callout for adding SAML
3Attribute Assertions to issued X.509 Certificates
4
5NERC DataGrid Project
6"""
7__author__ = "P J Kershaw"
8__date__ = "29/10/09"
9__copyright__ = "(C) 2009 Science and Technology Facilities Council"
10__license__ = "BSD - see LICENSE file in top-level directory"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = '$Id:$'
13import logging
14logging.basicConfig(level=logging.DEBUG)
15
16import os
17from string import Template
18from cStringIO import StringIO
19
20from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
21from sqlalchemy.ext.declarative import declarative_base
22from sqlalchemy.orm import sessionmaker
23
24from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
25from ndg.security.test.unit import BaseTestCase
26from ndg.security.server.myproxy.certificate_extapp.saml_attribute_assertion \
27    import CertExtApp, CertExtConsoleApp
28
29
30class CertExtAppTestCase(BaseTestCase):
31    THIS_DIR = os.path.dirname(__file__)
32    OPENID_SQL_QUERY = ("select openid from users where username = "
33                        "'${username}'") 
34    INI_FILEPATH = os.path.join(THIS_DIR, 'config.ini')
35   
36    def __init__(self, *arg, **kw):
37        super(CertExtAppTestCase, self).__init__(*arg, **kw)           
38        self.startSiteAAttributeAuthority(withSSL=True, 
39                port=CertExtAppTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
40        self.initDb()
41       
42    def test01DbQuery(self):
43        myProxyCertExtApp = CertExtApp()
44        myProxyCertExtApp.connectionString = \
45                    CertExtAppTestCase.DB_CONNECTION_STR
46                   
47        myProxyCertExtApp.openIdSqlQuery = CertExtAppTestCase.OPENID_SQL_QUERY
48       
49        openid = myProxyCertExtApp.queryOpenId(CertExtAppTestCase.USERNAME)
50        self.assert_(openid == CertExtAppTestCase.OPENID_URI)
51       
52    def test02AttributeQuery(self):
53        myProxyCertExtApp = CertExtApp()
54        myProxyCertExtApp.attributeQuery.issuerName = \
55                                        "/CN=Authorisation Service/O=Site A"
56        myProxyCertExtApp.attributeQuery.subjectIdFormat = \
57                                        EsgSamlNamespaces.NAMEID_FORMAT                               
58        myProxyCertExtApp.attributeQuery.subjectID = \
59                                        CertExtAppTestCase.OPENID_URI
60                                       
61        myProxyCertExtApp.attributeQuery.sslCACertDir = \
62                                                CertExtAppTestCase.CACERT_DIR
63        myProxyCertExtApp.attributeQuery.sslCertFilePath = \
64                        os.path.join(CertExtAppTestCase.PKI_DIR, 'test.crt')
65        myProxyCertExtApp.attributeQuery.sslPriKeyFilePath = \
66                        os.path.join(CertExtAppTestCase.PKI_DIR, 'test.key')
67        myProxyCertExtApp.attributeQuery.sslValidDNs = \
68                                                CertExtAppTestCase.SSL_CERT_DN
69                               
70        response = myProxyCertExtApp.attributeQuery.send(
71                uri=CertExtAppTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI)
72        print(response)
73       
74    def test03End2End(self):
75        myProxyCertExtApp = CertExtApp()
76       
77        myProxyCertExtApp.connectionString = \
78                                        CertExtAppTestCase.DB_CONNECTION_STR
79                   
80        myProxyCertExtApp.openIdSqlQuery = ("select openid from users where "
81                                            "username = '%s'" %
82                                            CertExtAppTestCase.USERNAME)
83
84        myProxyCertExtApp.attributeAuthorityURI = \
85                    CertExtAppTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI
86        myProxyCertExtApp.attributeQuery.issuerName = \
87                            "/CN=Authorisation Service/O=Site A"
88
89        myProxyCertExtApp.attributeQuery.subjectIdFormat = \
90                                        EsgSamlNamespaces.NAMEID_FORMAT                                       
91        myProxyCertExtApp.attributeQuery.sslCACertDir = \
92                                                CertExtAppTestCase.CACERT_DIR
93        myProxyCertExtApp.attributeQuery.sslCertFilePath = \
94                        os.path.join(CertExtAppTestCase.PKI_DIR, 'test.crt')
95        myProxyCertExtApp.attributeQuery.sslPriKeyFilePath = \
96                        os.path.join(CertExtAppTestCase.PKI_DIR, 'test.key')
97        myProxyCertExtApp.attributeQuery.sslValidDNs = \
98                                                CertExtAppTestCase.SSL_CERT_DN
99       
100        assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME)
101        self.assert_(assertion)
102        print(assertion)
103
104    def test04FromConfigFile(self):
105        myProxyCertExtApp = CertExtApp.fromConfigFile(
106                                            CertExtAppTestCase.INI_FILEPATH)
107        assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME)
108        self.assert_(assertion)
109        print(assertion)
110
111    def test05ConsoleApp(self):
112        import sys
113        sys.argv = [
114            None, 
115            "-f", CertExtAppTestCase.INI_FILEPATH, 
116            "-u", CertExtAppTestCase.USERNAME
117        ]
118        try:
119            stdOut = sys.stdout
120            sys.stdout = StringIO()
121           
122            CertExtConsoleApp.run()
123            output = sys.stdout.getvalue()
124        finally:
125            sys.stdout = stdOut
126       
127        self.assert_(output)       
128        print(output)
129       
Note: See TracBrowser for help on using the repository browser.