source: cows_wps/trunk/cows_wps/utils/output_file_splitter.py @ 5615

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/utils/output_file_splitter.py@7575
Revision 5615, 6.1 KB checked in by spascoe, 10 years ago (diff)

COWS WPS package copied from
 http://proj.badc.rl.ac.uk/svn/dcip/cows-wps/trunk.

This is a stripped down version of the DDP WPS. Some features are
removed and others have been deactivated until we reimplement them in a
more generic way.

Line 
1"""
2output_file_splitter.py
3=======================
4
5Holds code used to make decisions on splitting and zipping output files
6depending on their size etc.
7
8"""
9
10# Standard library imports
11import os
12
13# Local imports
14from cows_wps.utils import zip_utils
15from cows_wps.utils.parse_outputs_rules import outputs_rules_config_dict
16
17import logging
18log = logging.getLogger(__name__)
19
20# Local variables
21
22"""
23Thinking how to use this stuff and the zip stuff...
24
25getOutputsConfig
26
27execute...until all files are in outputs_list
28
29do we zip?
30
31 * YES: Work out size if all zipped together:
32   * use do_zip and no_zip to work out what to zip
33 * NO: Just ignore zipping
34
35This provides us with a set of outputs based on whether they are zipped.
36
37"""
38
39
40class OutputFileSplitter:
41    """
42    Class for use with multiple datasets to estimate volume and
43    then define a set of sensible file sizes and partitioning
44    of content accordingly.
45
46          * dimensions = ("duration", "n_runs")
47          * size_maps = {"frequency": {"daily":1, "hourly":24} # for use with non int/float types
48          * unit_size = 4 (for a float) x whatever
49          * split_dim = "n_runs" = i.e. the dimension that you want to split by.
50          * split_values = (100, 50, 20, 10, 2, 1) # list of possible splits you can make
51          * size_limit (optional) is defined in megabytes
52          * some kind of time handling option for getting a start/end/interval using DateTimeManager.py
53    """
54
55    def __init__(self, split_dim, dimensions, split_values, size_maps={},
56                 unit_size=4, size_limit=None):
57        """
58        Takes in the various options and defines instance variables.
59        """
60        self.split_dim = split_dim
61        self.dimensions = dimensions # this is a sequence of dimension names
62        if self.split_dim not in self.dimensions:
63            raise Exception("split_dim argument must be in dimensions list, '" + `self.split_dim` + "' is not.")
64
65        # Now set local rules
66        self.split_values = split_values
67        self.size_maps = size_maps
68        self.unit_size = unit_size
69
70        # Now parse the rules from config file
71        self.format_bloaters = outputs_rules_config_dict["format_bloaters"]
72        self.zip_rules = outputs_rules_config_dict["zip_rules"]
73        self.compression_ratios = outputs_rules_config_dict["compression_ratios"]
74
75        # Now override default size limit from user argument if defined
76        if size_limit == None:
77            size_limit = rule_dict["volume_rules"]["max_file_volume"]
78
79        self.size_limit = size_limit * (2 ** 20)
80
81    def _calculateVolume(self, zip_compression=1, format="csv", **args):
82        """
83        Based on the arguments given it calculates the volume of a file.
84        """
85        size = self.unit_size
86        for (key, value) in args.items():
87            if key in size_maps.keys():
88                value = size_maps[key][value]
89            size = size * value
90
91        size = size * self.format_bloaters[format] * zip_compression
92        return int(size)
93
94    def getSplits(self, format="csv", **args):
95        """
96        Based on arguments given it returns a list of splits that have been
97        split on the split dimension 'split_dim'.
98        The returned list contains:
99            [(split_start, split_end, file_size, zipped_or_not), (...), (...)]
100        """
101        args["format"] = format
102        start_split_dim = args[self.split_dim]
103
104        # Do we zip, if so, what do we zip?
105        zip_enabled = self.zip_rules["enable_zip"]
106        can_zip = False
107
108        if zip_enabled == 1:
109            if format in (self.zip_rules["do_zip"]):
110                can_zip = True
111                zip_compression = self.compression_ratios[format]
112                print "ZIP ENABLED!!!!", zip_compression, format
113                args["zip_compression"] = zip_compression
114
115        # See if not splitting is within limits
116        size = apply(self._calculateVolume, [], args)
117        if size <= self.size_limit:
118            #if start_split_dim == 1: return [(1, 1, size)]       
119            return [(1, start_split_dim, size, can_zip)]
120
121        for split in self.split_values:
122            args[self.split_dim] = split
123            size = apply(self._calculateVolume, [], args)
124           
125            if size <= self.size_limit:
126                splits = [(x, x + (split - 1), size, can_zip) for x in range(1, start_split_dim, split)]
127                return splits
128       
129        raise Exception("Could not calculate sensible file size splits.")       
130               
131   
132if __name__ == "__main__":
133
134    print "\n===Testing class interface...WXGEN example===\n"
135    split_dim = "n_runs"
136    dimensions = ("duration", "n_runs", "frequency")
137    size_maps = {"frequency": {"hourly":24, "daily":1}}
138    unit_size = 365.25 * 10 * 4
139    split_values = (100, 50, 10, 5, 3, 2, 1)
140    x = OutputFileSplitter(split_dim, dimensions, split_values, unit_size=unit_size, size_maps=size_maps)
141
142    wg_tests = [(30, 100, "daily"),
143             (70, 100, "daily"),
144             (100, 100, "daily"),
145             (100, 400, "daily"),
146             (100, 1000, "daily"),
147             (30, 100, "hourly"),
148             (100, 1000, "hourly")]
149
150    for (dur, n, freq) in wg_tests:
151        args = {"duration": dur, "n_runs": n, "frequency": freq, "format":"nc"}
152        print "Inputs:", (dur, n, freq)
153        resp = apply(x.getSplits, [], args)
154        print "\t\tOutputs:", resp
155
156    print "\n===Testing class interface...SAMPLED DATA example===\n"
157    split_dim = "n_vars"
158    dimensions = ("n_vars", "n_meaning_periods", "sample_size")
159    split_values  = (10, 5, 3, 2, 1)
160    unit_size = 4
161    x = OutputFileSplitter(split_dim, dimensions, split_values, unit_size=unit_size)
162
163    sd_tests = [(1, 1, 100, "nc"),
164                (1, 10, 100, "csv"),
165                (10, 10, 100, "csv"),
166                (10, 17, 1000, "csv"),
167                (100, 17, 10000, "csv")]
168
169    for (nv, nmp, ss, format) in sd_tests:
170        args = {"n_vars":nv, "n_meaning_periods":nmp, "sample_size":ss, "format":format}
171        print "Inputs:", (nv, nmp, ss, format)
172        resp = apply(x.getSplits, [], args)
173        print "\t\tOutputs:", resp
174
175   
Note: See TracBrowser for help on using the repository browser.