1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
#!/usr/bin/env python3
import os
import sys
import argparse
import hashlib
import re
import yaml
whoami = os.path.basename(sys.argv[0])
BANNER = f'''//
// This file is automatically generated by {whoami}.
// Edits will be automatically overwritten if the build is
// run in maintainer mode.
//'''
def warn(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class Main:
SOURCES = [whoami, 'job.yml']
DESTS = {
'decl': 'libqpdf/qpdf/auto_job_decl.hh',
'init': 'libqpdf/qpdf/auto_job_init.hh',
}
SUMS = 'job.sums'
def main(self, args=sys.argv[1:], prog=whoami):
options = self.parse_args(args, prog)
self.top(options)
def parse_args(self, args, prog):
parser = argparse.ArgumentParser(
prog=prog,
description='Generate files for QPDFJob',
)
mxg = parser.add_mutually_exclusive_group(required=True)
mxg.add_argument('--check',
help='update checksums if files are not up to date',
action='store_true', default=False)
mxg.add_argument('--generate',
help='generate files from sources',
action='store_true', default=False)
return parser.parse_args(args)
def top(self, options):
if options.check:
self.check()
elif options.generate:
self.generate()
else:
exit(f'{whoami} unknown mode')
def get_hashes(self):
hashes = {}
for i in sorted([*self.SOURCES, *self.DESTS.values()]):
m = hashlib.sha256()
try:
with open(i, 'rb') as f:
m.update(f.read())
hashes[i] = m.hexdigest()
except FileNotFoundError:
pass
return hashes
def check(self):
hashes = self.get_hashes()
match = False
try:
old_hashes = {}
with open(self.SUMS, 'r') as f:
for line in f.readlines():
m = re.match(r'^(\S+) (\S+)\s*$', line)
if m:
old_hashes[m.group(1)] = m.group(2)
match = old_hashes == hashes
except Exception:
pass
if not match:
exit(f'{whoami}: auto job inputs have changed')
def update_hashes(self):
hashes = self.get_hashes()
with open(self.SUMS, 'w') as f:
print(f'# Generated by {whoami}', file=f)
for k, v in hashes.items():
print(f'{k} {v}', file=f)
def generate(self):
warn(f'{whoami}: regenerating auto job files')
with open('job.yml', 'r') as f:
data = yaml.safe_load(f.read())
self.validate(data)
self.generate_decl(data)
self.generate_init(data)
# Update hashes last to ensure that this will be rerun in the
# event of a failure.
self.update_hashes()
# DON'T ADD CODE TO generate AFTER update_hashes
def check_keys(self, what, d, exp):
if not isinstance(d, dict):
exit(f'{what} is not a dictionary')
actual = set(d.keys())
extra = actual - exp
if extra:
exit(f'{what}: unknown keys = {extra}')
def validate(self, data):
self.check_keys('top', data, set(['choices', 'options']))
for o in data['options']:
self.check_keys('top', o, set(
['table', 'bare', 'positional',
'optional_parameter', 'required_parameter',
'required_choices', 'from_table']))
def to_identifier(self, label, prefix, const):
identifier = re.sub(r'[^a-zA-Z0-9]', '_', label)
if const:
identifier = identifier.upper()
else:
identifier = identifier.lower()
identifier = re.sub('_([a-z])', lambda x: x.group(1).upper(),
identifier)
return prefix + identifier
def generate_decl(self, data):
with open(self.DESTS['decl'], 'w') as f:
print(BANNER, file=f)
for o in data['options']:
table = o['table']
if table in ('main', 'help'):
continue
i = self.to_identifier(table, 'O_', True)
print(f'static constexpr char const* {i} = "{table}";', file=f)
def generate_init(self, data):
with open(self.DESTS['init'], 'w') as f:
print(BANNER, file=f)
for k, v in data['choices'].items():
print(f'char const* {k}_choices[] = {{', file=f, end='')
for i in v:
print(f'"{i}", ', file=f, end='')
print('0};', file=f)
if __name__ == '__main__':
try:
os.chdir(os.path.dirname(os.path.realpath(__file__)))
Main().main()
except KeyboardInterrupt:
exit(130)
|