source: ndgCommon/trunk/ndg/common/unittests/clients/xmldb/eXist/testatomclient.py @ 4997

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/ndgCommon/trunk/ndg/common/unittests/clients/xmldb/eXist/testatomclient.py@4997
Revision 4997, 9.4 KB checked in by cbyrom, 11 years ago (diff)

Add better error handling + add general keyword catcher to avoid
issues with slight differences between the search clients + extend test data and test suites - handling proxies better.

Line 
1'''
2A test class for the AtomClient class.
3
4@author C Byrom, Tessella Feb 2009
5'''
6
7import unittest,os, time
8from ndg.common.src.clients.xmldb.eXist.atomclient import *
9import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
10import ndg.common.unittests.testconstants as tc
11from ndg.common.unittests.testutils import testUtils as tu
12from ndg.common.src.models import AtomState
13from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
14import ndg.common.src.lib.utilities as utils
15
16class TestCase(unittest.TestCase): 
17
18    def setUp(self):
19        '''
20        set up data used in the tests.
21        '''
22        self.client = AtomClient(dbHostName = tc.EXIST_DB,
23                                 configFileName = tc.EXIST_DBCONFIG_FILE, 
24                                 setUpDB = True)
25        self.createdAtoms = []    # array to store paths to atoms created - in order to then delete them
26        self.utils = tu(tc.EXIST_DBCONFIG_FILE)
27       
28
29    def testInit(self):
30        self.assertTrue(self.client)
31        self.assertTrue(self.client.client)
32
33       
34    def testValidCheckAtomSchemaCompliance(self):
35        atom = self.utils.createAtomInEXist(tc.xmlString)
36        self.createdAtoms.append(atom.getFullPath())
37        atomPath = atom.getFullPath()
38        self.assertEqual([], self.client.checkAtomSchemaCompliance(atomPath))
39       
40    def testInvalidCheckAtomSchemaCompliance(self):
41        atom = self.utils.createAtomInEXist(tc.invalidXmlString)
42        self.createdAtoms.append(atom.getFullPath())
43        atomPath = atom.getFullPath()
44        errors = self.client.checkAtomSchemaCompliance(atomPath)
45        self.assertEqual(1, len(errors))
46        self.assertTrue(errors[0].startswith(" Invalid content was found starting with element 'contributor'"))
47       
48    def testInvalidEmailCheckAtomSchemaCompliance(self):
49        atom = self.utils.createAtomInEXist(tc.invalidEmailXmlString)
50        self.createdAtoms.append(atom.getFullPath())
51        atomPath = atom.getFullPath()
52        errors = self.client.checkAtomSchemaCompliance(atomPath)
53        self.assertEqual(2, len(errors))
54        self.assertTrue(errors[1].find("of element 'email' is not valid") > -1)
55
56    def testValidCheckAtomSchemaComplianceWithKeyword(self):
57        atom = self.utils.createAtom(tc.xmlString)
58        self.assertEqual([], self.client.checkAtomSchemaCompliance('', atom = atom))
59
60    def testInValidCheckAtomSchemaCompliance(self):
61        atom = self.utils.createAtom(tc.xmlString)
62        atom.author.name = ''
63        self.assertNotEqual([], self.client.checkAtomSchemaCompliance('', atom = atom))
64
65    def testGetAtomFileCollectionPath(self):
66        atom = self.utils.createAtomInEXist(tc.xmlString)
67        self.createdAtoms.append(atom.getFullPath())
68        path = self.client._AtomClient__getAtomFileCollectionPath(atom.datasetID, atom.ME.providerID)
69        self.assertTrue(atom.getFullPath().startswith(path))
70           
71    def testGetAtomFileCollectionPathInvalidDatasetID(self):
72        atom = self.utils.createAtomInEXist(tc.xmlString)
73        self.createdAtoms.append(atom.getFullPath())
74        self.assertEquals(None, self.client._AtomClient__getAtomFileCollectionPath(tc.INVALID_FILE, atom.ME.providerID))
75           
76    def testGetAtomFileCollectionPathInvalidProviderID(self):
77        atom = self.utils.createAtomInEXist(tc.xmlString)
78        self.createdAtoms.append(atom.getFullPath())
79        self.assertEquals(None, self.client._AtomClient__getAtomFileCollectionPath(atom.datasetID, tc.INVALID_FILE))
80
81    def testGetAtomPublicationState(self):
82        atom = self.utils.createAtomInEXist(tc.xmlString)
83        self.createdAtoms.append(atom.getFullPath())
84        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
85        self.assertEqual(AtomState.WORKING_STATE.title, state.title)
86        self.assertEqual(AtomState.WORKING_STATE, state)
87
88
89    def testChangeAtomPublicationState(self):
90        atom = self.utils.createAtomInEXist(tc.xmlString)
91        self.createdAtoms.append(atom.getFullPath())
92        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
93        self.assertEqual(AtomState.WORKING_STATE.title, state.title)
94        self.client.changeAtomPublicationState(atom, AtomState.OLD_STATE)
95        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
96        self.assertEqual(AtomState.OLD_STATE, state)
97        self.client.changeAtomPublicationState(atom, AtomState.WORKING_STATE)
98       
99       
100    def testCreateNoneAtom(self):
101        self.assertRaises(ValueError,self.client.createAtom, None)
102
103    def testCreateNotAtom(self):
104        self.assertRaises(ValueError,self.client.createAtom, [1,2,4])
105
106    def testCreateAtom(self):
107        atom = self.utils.createAtom(tc.xmlString)
108        createdAtom = self.client.createAtom(atom)
109        self.createdAtoms.append(createdAtom.getFullPath())
110        self.assertNotEquals(None, createdAtom)
111
112
113    def testSetUpEXistDB(self):
114        fileName = self.client.resources.xsd.keys()[0] + '.xsd'
115        pathName = dc.SCHEMAS_COLLECTION_PATH + '/' + fileName
116        self.client.deleteDoc(pathName)
117        self.assertEquals(True, self.client.isNewDoc(pathName))
118
119        newClient = AtomClient(dbHostName = tc.EXIST_DB,
120                               configFileName = tc.EXIST_DBCONFIG_FILE, 
121                               setUpDB = True)
122
123        self.assertEquals(False, self.client.isNewDoc(pathName))
124
125
126    def testNotSetUpEXistDB(self):
127        fileName = self.client.resources.xsd.keys()[0] + '.xsd'
128        pathName = dc.SCHEMAS_COLLECTION_PATH + '/' + fileName
129        self.client.deleteDoc(pathName)
130        self.assertEquals(True, self.client.isNewDoc(pathName))
131
132        newClient = AtomClient(dbHostName = tc.EXIST_DB,
133                               configFileName = tc.EXIST_DBCONFIG_FILE, 
134                               setUpDB = False)
135
136        self.assertEquals(True, self.client.isNewDoc(pathName))
137
138           
139    def testRunAsynchAtomPublish(self):
140        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
141        pathName = createdAtom.getFullPath()
142        self.assertEquals(False, self.client.isNewDoc(pathName))
143        createdAtom.state = AtomState.PUBLISHED_STATE
144        providerCollection = dc.PROVIDER_FEED_PATH + createdAtom.ME.providerID
145        self.createdAtoms.append(pathName)
146        feed = self.client.feedClient.getAtomFeed(providerCollection)
147       
148        if self.client.isNewCollection(providerCollection):
149            self.client.createCollections([providerCollection])
150            self.client.feedClient.createAtomFeed(providerCollection,
151                           self.client.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %createdAtom.ME.providerID)
152        self.client._AtomClient__runAsynchAtomPublish(createdAtom)
153        time.sleep(2)
154        feed2 = self.client.feedClient.getAtomFeed(providerCollection)
155        self.assertNotEqual(feed, feed2)
156
157           
158    def testPublishAtom(self):
159        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
160        self.createdAtoms.append(createdAtom.getFullPath())
161        createdAtom.state = AtomState.PUBLISHED_STATE
162        providerCollection = dc.PROVIDER_FEED_PATH + createdAtom.ME.providerID
163        feed = self.client.feedClient.getAtomFeed(providerCollection)
164       
165        if self.client.isNewCollection(providerCollection):
166            self.client.createCollections([providerCollection])
167            self.client.feedClient.createAtomFeed(providerCollection,
168                           self.client.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %createdAtom.ME.providerID)
169        self.client._AtomClient__publishAtom(createdAtom)
170        feed2 = self.client.feedClient.getAtomFeed(providerCollection)
171        self.assertNotEqual(feed, feed2)
172
173
174    def testCreateDIFDocumentFromAtom(self):
175        atom = self.utils.createAtom(tc.xmlString)
176        atom.atomTypeID = VTD.DE_TERM
177        atom = self.client.createAtom(atom)
178        self.createdAtoms.append(atom.getFullPath())
179        filePath = self.client._createDIFDocumentFromAtom(atom, dc.DIF_COLLECTION_PATH)
180        self.createdAtoms.append(filePath)
181        self.assertEquals(False, self.client.isNewDoc(filePath))
182
183
184    def testTransformAtomIntoDIF(self):
185        atom = self.utils.createAtom(tc.xmlString)
186        atom.atomTypeID = VTD.DE_TERM
187        atom = self.client.createAtom(atom)
188        self.createdAtoms.append(atom.getFullPath())
189        doc = self.client._transformAtomIntoDIF(atom)
190        self.assertNotEquals(None, doc)
191        self.assertTrue(doc.find('DIF') > -1)
192        self.assertTrue(doc.find(atom.datasetID) > -1)
193
194
195    def testNotLoadCollectionData(self):
196        self.assertFalse(self.utils.ac.atomCollections)
197
198
199    def testLoadCollectionData(self):
200        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
201        filePath = createdAtom.getFullPath()
202        self.createdAtoms.append(filePath)
203        newClient = AtomClient(dbHostName = tc.EXIST_DB, 
204                               configFileName = tc.EXIST_DBCONFIG_FILE,
205                               loadCollectionData = True)
206        self.assertTrue(newClient.atomCollections)
207        self.assertEquals(filePath, newClient.atomCollections[createdAtom.datasetID] + \
208                          '/' + createdAtom.atomName)
209
210       
211    def tearDown(self):
212        for path in self.createdAtoms:
213            # delete the test file, in case it has been used in a test
214            self.utils.deleteDoc(path)
215
216           
217if __name__=="__main__":
218    unittest.main()
Note: See TracBrowser for help on using the repository browser.