source: cows_wps/trunk/process_modules/async_test.py @ 7014

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/async_test.py@7014
Revision 7014, 3.4 KB checked in by astephen, 10 years ago (diff)

renamed ddp_process_support to process_support

Line 
1#!/usr/bin/env python
2
3
4"""
5async_test.py
6=============
7
8Holds dummy class AsyncTest used to simulate how an asynchronous job
9might work. Includes testing of packaging up outputs as zip files and
10splitting them into large chunks.
11
12"""
13
14import os
15import sys
16import copy
17import shutil
18import getopt
19import random
20import time
21import operator
22
23from cows_wps.utils import case_sensitive_ordered_config_parser as cp
24from cows_wps.process_handler.context.process_status import STATUS
25from cows_wps.process_handler.fileset import FileSet, zipFileSet, FLAG
26from cows_wps.process_handler.process_support import writeMetadataFile, processOutputs
27import cows_wps.process_handler.process_support as process_support
28
29
30class Async(object):
31   
32    def __init__(self):
33        self.num_runs = 10
34   
35    def __call__(self, context):
36        #!TODO: Start time and end time should be managed by WPS or execute wrapper
37        startTime= time.time()
38   
39        path_name = context.inputs['CallFileThis']
40        format = context.inputs['DataOutputFormat']
41        #!TODO: Replace ddpParameters with entries in context.config.
42
43        context.log.info('Parameters: CallFileThis=%s, DataOutputFormat=%s' % (path_name, format))
44        fileSet = FileSet()
45        context.log.info('FileSet instantiated')
46
47        # We don't have a job-id in this framework so deduce one from context.processdir
48        job_id = os.path.basename(context.processDir)
49   
50        for i in range(1, (self.num_runs + 1)):
51            output_file = "%s_%.3d.%s" % (path_name, i, format)
52            f_size = 2 * (2**20)
53            info =  "Async output %s of %s" % (i, self.num_runs)
54            out_handle = open(os.path.join(context.outputDir, output_file), "w")
55            out_handle.write("B" * f_size)
56            out_handle.close()
57            context.log.info("Wrote: "+ output_file)
58            fileSet.contents.append(FileSet(FLAG.DATA, output_file, f_size, 'Async output %s of %s' % (i, self.num_runs)))
59            context.setStatus(STATUS.STARTED, "Job is now running", (int((100. / self.num_runs) * i)))
60            time.sleep(1)
61
62        #writeMetadataFile(context, context.inputs.items())
63       
64        processOutputs(context, fileSet)
65               
66        writeMetadataFile(context, context.inputs.items())
67
68        #context.outputs['FileSet'] = processOutputs(context, fileSet)
69
70        context.setStatus(STATUS.COMPLETED, "Process is complete")
71
72        completionTime = time.time()
73        process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
74
75
76    def dryRun(self, context):
77        path_name = context.inputs['CallFileThis']
78        format = context.inputs['DataOutputFormat']
79       
80        fileSet = FileSet()
81
82        for i in range(1, (self.num_runs + 1)):
83            output_file = "%s_%.3d.%s" % (path_name, i, format)
84            f_size = 2 * (2**20)
85            info =  "Async output %s of %s" % (i, self.num_runs)
86            fileSet.contents.append(FileSet(FLAG.DATA, output_file, f_size, 'Async output %s of %s' % (i, self.num_runs)))
87       
88        f_size = 10 * (2**20)
89        context.outputs['FileSet'] = processOutputs(context, fileSet, dryRun=True)
90
91        writeMetadataFile(context,None, dryRun=True) 
92        total_size = (f_size * self.num_runs)
93        duration = 10 * self.num_runs + 4
94       
95        context.setStatus(STATUS.ACCEPTED, "Process execution request has been accepted", 0)
96       
97        process_support.updateJobDetailsAfterDryRun(context, duration)
Note: See TracBrowser for help on using the repository browser.