Add autoupdate.py

This commit is contained in:
2021-09-25 13:37:10 +00:00
parent eff38d65a4
commit b2feec3c51

648
autoupdate.py Normal file
View File

@@ -0,0 +1,648 @@
#!/usr/bin/env python3
import os
import glob
import sys
import re
import fileinput
import argparse
import textwrap
import subprocess
import json
import shutil
import requests
import semver
import git
import ruamel.yaml
# We need to disable .netrc authentication for requests, otherwise it will
# pick up injected oAuth crendentials from drone.
class NullAuth(requests.auth.AuthBase):
def __call__(self, r):
return r
repos = {
"nextcloud": [
{
"name": "nextcloud",
"type": "k8s",
"registry": "docker.io",
"container": "nextcloud",
"changelog": "https://nextcloud.com/changelog/",
},
],
"gitea": [
{
"name": "gitea",
"type": "k8s",
"registry": "docker.io",
"container": "gitea/gitea",
"changelog": "https://github.com/go-gitea/gitea/blob/master/CHANGELOG.md",
},
],
"pulumi": [
{
"type": "group",
"name": "pulumi",
"elements": [
{
"name": "pulumi",
"type": "ci",
"registry": "docker.io",
"container": "pulumi/pulumi-nodejs",
"changelog": "https://github.com/pulumi/pulumi/blob/master/CHANGELOG.md",
"primary": True,
},
{
"type": "npm",
"path": "./pulumi/",
},
]
},
],
"kubespray": [
{
"name": "kubespray",
"type": "submodule",
"path": "./ansible/third-party/kubespray",
"prefix": "v",
},
],
"restic": [
{
"name": "restic",
"type": "manual",
"regex": '(?<=restic_version: ")([^"]*)(?=")',
"file": "./ansible/group_vars/all.yml",
"prefix": "v",
"source": {
"type": "github",
"repo": "restic/restic",
},
},
],
"ansible": [
{
"name": "ntp",
"type": "submodule",
"path": "./ansible/roles/geerlingguy.ntp",
},
],
"cert-manager": [
{
"name": "cert-manager",
"type": "bespoke",
"function": "bespoke_cert_manager",
},
],
}
def removeprefix(string, prefix):
if string.startswith(prefix):
return string[len(prefix):]
return string
def removesuffix(string, suffix):
if string.endswith(suffix):
return string[:-len(suffix)]
return string
def bespoke_cert_manager(action, **kwargs):
if action == 'local_version':
files = glob.glob("./k8s/third-party/cert-manager/*")
assert(len(files) == 1)
file = files[0]
match = re.match('cert-manager\.v(.*)\.yaml', os.path.basename(file))
assert(match is not None)
try:
version = match.group(1)
except IndexError:
assert(False)
return version
elif action == 'remote_version':
tags = requests.get(f"https://api.github.com/repos/jetstack/cert-manager/releases").json()
latest = None
for tag in tags:
try:
version = semver.VersionInfo.parse(tag['tag_name'].lstrip('v'))
except ValueError:
# Shitty naming of tags, "v1.6.0-alpha.0" is not valid semver
continue
if version.prerelease is not None:
continue
if latest is None or latest < version:
latest = version
return latest
elif action == 'update':
old_version = kwargs['old_version']
new_version = kwargs['new_version']
old_file = f"./k8s/third-party/cert-manager/cert-manager.v{old_version}.yaml"
new_file = f"./k8s/third-party/cert-manager/cert-manager.v{new_version}.yaml"
os.remove(old_file)
with requests.get(f"https://github.com/jetstack/cert-manager/releases/download/v{new_version}/cert-manager.yaml", stream=True) as r:
with open(new_file, 'wb') as f:
shutil.copyfileobj(r.raw, f)
link = "./k8s/manifests/cert-manager/cert-manager.yaml"
os.remove(link)
os.symlink(
f"../../third-party/cert-manager/cert-manager.v{new_version}.yaml",
link,
)
elif action == 'get_paths':
return [
"./k8s/third-party/cert-manager",
"./k8s/manifests/cert-manager/cert-manager.yaml",
]
else:
raise NotImplementedError()
bespoke_functions = {
"bespoke_cert_manager": bespoke_cert_manager,
}
def get_current_version(appname, config):
type = config['type']
name = config['name']
if type == 'k8s':
url = f"{config['registry']}/{config['container']}"
directory = f'./k8s/manifests/{appname}'
for manifest in glob.glob(f'{directory}/*'):
with open(manifest, 'r') as f:
for line in f.readlines():
if 'image:' in line and url in line:
regex = f'^.*image: .*{re.escape(url)}:([^"\'\n]*)'
match = re.match(regex, line)
if match is None:
return None
try:
return match.group(1)
except IndexError:
return None
elif type == 'pulumi':
url = f"{config['registry']}/{config['container']}"
with open('./pulumi/versions.yml') as f:
versions = ruamel.yaml.YAML(typ='safe').load(f)
return versions['apps'][appname][name]['tag']
elif type == 'ci':
url = f"{config['registry']}/{config['container']}"
with open('./.drone.yml', 'r') as f:
for line in f.readlines():
if 'image:' in line and url in line:
regex = f'^.*image: .*{re.escape(url)}:([^"\'\n]*)'
match = re.match(regex, line)
if match is None:
return None
try:
return match.group(1)
except IndexError:
return None
elif type == 'manual':
with open(config['file'], 'r') as f:
for line in f.readlines():
match = re.match('^.*' + config['regex'], line)
if match is None:
continue
try:
return match.group(1)
except IndexError:
return None
return None
elif type == 'submodule':
submodule = git.Repo(config['path'])
submodule.git.fetch(tags=True)
version = submodule.git.describe(abbrev=0, always=True, tags=True)
return version
elif type == 'bespoke':
return bespoke_functions[config['function']](action="local_version")
else:
raise NotImplementedError()
def update_version(old_version, new_version, appname, config):
type = config['type']
name = config['name']
if type == 'k8s':
url = f"{config['registry']}/{config['container']}"
directory = f'./k8s/manifests/{appname}'
for manifest in glob.glob(f'{directory}/*'):
changed = False
new_content = []
with open(manifest, 'r') as f:
for line in f.readlines():
if 'image:' in line and url in line:
new_content.append(line.replace(old_version, new_version))
changed = True
else:
new_content.append(line)
if changed is True:
with open(manifest, 'w') as f:
for line in new_content:
f.write(line)
elif type == 'pulumi':
url = f"{config['registry']}/{config['container']}"
with open('./pulumi/versions.yml') as f:
versions = ruamel.yaml.load(f, ruamel.yaml.RoundTripLoader)
versions['apps'][appname][name]['tag'] = new_version
with open('./pulumi/versions.yml', 'w') as f:
ruamel.yaml.dump(versions, f, Dumper=ruamel.yaml.RoundTripDumper)
elif type == 'ci':
url = f"{config['registry']}/{config['container']}"
changed = False
new_content = []
with open('./.drone.yml', 'r') as f:
for line in f.readlines():
if 'image:' in line and url in line:
new_content.append(line.replace(old_version, new_version))
changed = True
else:
new_content.append(line)
if changed is True:
with open('./.drone.yml', 'w') as f:
for line in new_content:
f.write(line)
elif type == "manual":
changed = False
new_content = []
with open(config['file'], 'r') as f:
for line in f.readlines():
match = re.match('^.*' + config['regex'], line)
if match is not None:
replace = re.sub(config['regex'], new_version, line)
new_content.append(replace)
changed = True
else:
new_content.append(line)
if changed is True:
with open(config['file'], 'w') as f:
for line in new_content:
f.write(line)
elif type == 'submodule':
submodule = git.Repo(config['path'])
submodule.git.checkout(new_version)
elif type == 'bespoke':
return bespoke_functions[config['function']](
action="update",
old_version=old_version,
new_version=new_version
)
else:
raise NotImplementedError()
def commit_and_pr(appname, branchname, paths, message, config, body=''):
assert(repo.is_dirty())
current_branch = repo.head.ref
author_name = os.environ['AUTHOR_NAME']
author_email = os.environ['AUTHOR_EMAIL']
os.environ['GIT_AUTHOR_NAME'] = author_name
os.environ['GIT_AUTHOR_EMAIL'] = author_email
os.environ['GIT_COMMITTER_NAME'] = author_name
os.environ['GIT_COMMITTER_EMAIL'] = author_email
out = repo.git.ls_remote('--heads', 'origin', branchname)
if out != '':
print("Remote branch already exists. Skipping.")
return
repo.git.checkout('-b', branchname)
for path in paths:
repo.git.add(path)
repo.git.commit('--message', message)
repo.git.push("origin", branchname)
gitea_owner = os.environ['GITEA_OWNER']
gitea_repo = os.environ['GITEA_REPO']
gitea_token = os.environ['GITEA_TOKEN']
url = f"{os.environ['GITEA_URL']}/api/v1/repos/{gitea_owner}/{gitea_repo}/pulls"
if "changelog" in config:
if body == '':
body += '\n'
body += f"Changelog: {config['changelog']}"
if "notes" in config:
if body == '':
body += '\n'
body += config['notes']
response = requests.post(url,
data={
"title": message,
"base": "master",
"head": branchname,
"body": body,
},
headers={
"Authorization": f"token {gitea_token}",
},
auth=NullAuth()
)
if not response.ok:
print("Gitea request failed:")
print(response.status_code)
print(response.content)
sys.exit(1)
repo.git.checkout(current_branch)
parser = argparse.ArgumentParser()
parser.add_argument('--apps', nargs='*')
parser.add_argument('--check', action='store_true')
parser.add_argument('--disable-clean-git-check', action='store_true')
args = parser.parse_args()
repo = git.Repo('.')
if not args.disable_clean_git_check:
if repo.head.ref != repo.heads.master:
print("This must only be run on the master branch!")
sys.exit(1)
if repo.is_dirty():
print("There are uncommited changes!")
sys.exit(1)
updates_found = False
def handle_app(appname, config, group=False):
if config['type'] == 'group':
files = []
allbody = ''
has_changes = False
for element in config['elements']:
(has_updates, filelist, message, version, body) = handle_app(appname, element, group=True)
if not has_updates:
if element.get('primary') is True:
has_changes = False
break
else:
continue
has_changes = True
files.extend(filelist or [])
if element.get('primary') is True:
primary_version = version
primary_message = message
if body or 'changelog' in element:
if allbody:
allbody += '\n\n---\n\n'
if 'changelog' in element:
allbody += f"Changelog: {element['changelog']}\n"
if body:
allbody += body
if not args.check and has_changes:
branchname = f'updates/{appname}/{config["name"]}/{primary_version}'
commit_and_pr(appname, branchname, files, primary_message, config, allbody)
return
if config['type'] == 'npm':
subprocess.check_call('npm ci >/dev/null 2>&1', cwd=config['path'], shell=True)
outdated = json.loads(subprocess.run('npm outdated --json --dev', shell=True, cwd=config['path'], stdout=subprocess.PIPE).stdout)
if not outdated:
return False, None, None, None, None
for packagename, packageinfo in outdated.items():
print(f"Found new version for {packagename}: {packageinfo['latest']}")
if args.check:
return True, None, None, None, None
body = ''
body += '|Package|Current Version|New Version|\n'
body += '|---|---|---|\n'
for packagename, packageinfo in outdated.items():
latest = packageinfo['latest']
body += f'|`{packagename}`|`{packageinfo["current"]}`|`{latest}`|\n'
subprocess.run(f'npm install --save-exact {packagename}@{latest}', shell=True, cwd=config['path'])
if group is False:
message = f"{appname}: Update npm modules"
branchname = f'updates/{appname}/npm'
commit_and_pr(appname, branchname, [config['path']], message, config, body)
return True, None, None, None, None
else:
return True, [config['path']], None, None, body
current_version = get_current_version(appname, config)
if current_version is None:
raise Exception(f"Could not get current version of {appname}/{config['name']}")
if 'prefix' in config:
current_version = removeprefix(current_version, config['prefix'])
if 'suffix' in config:
current_version = removesuffix(current_version, config['suffix'])
if config.get('force_patch_number', False):
current_version += '.0'
try:
current_version = semver.VersionInfo.parse(current_version)
except ValueError:
print(f"Current version of {appname}/{config['name']} is not valid semver: {current_version}")
sys.exit(1)
print(f"Current version of {appname}/{config['name']}: {current_version}")
latest_remote = None
if config['type'] == 'manual':
if config['source']['type'] == 'http':
url = config['source']['url']
version = requests.get(url).text
if 'prefix' in config:
version = removeprefix(version, config['prefix'])
if 'suffix' in config:
version = removesuffix(version, config['suffix'])
if config.get('force_patch_number', False):
version += '.0'
version = semver.VersionInfo.parse(version)
if version.prerelease is not None:
return False, None, None, None, None
if config.get('skip_zero_patch', False) and version.patch == 0:
return False, None, None, None, None
latest_remote = version
elif config['source']['type'] == 'github':
tags = requests.get(f"https://api.github.com/repos/{config['source']['repo']}/releases").json()
latest = None
for tag in tags:
version = tag['tag_name']
if 'prefix' in config:
version = removeprefix(version, config['prefix'])
if 'suffix' in config:
version = removesuffix(version, config['suffix'])
if config.get('force_patch_number', False):
version += '.0'
version = semver.VersionInfo.parse(version)
if version.prerelease is not None:
continue
if config.get('skip_zero_patch', False) and version.patch == 0:
continue
if latest is None or latest < version:
latest = version
latest_remote = latest
else:
raise NotImplementedError()
message = f"{appname}: Update {config['name']} to v{latest_remote}"
elif config['type'] == 'submodule':
submodule = git.Repo(config['path'])
tags = submodule.git.ls_remote('--tags', 'origin').split('\n')
latest = None
for tag in tags:
version = removeprefix(tag.split('\t')[1], 'refs/tags/')
if 'prefix' in config:
version = removeprefix(version, config['prefix'])
if 'suffix' in config:
version = removesuffix(version, config['suffix'])
if config.get('force_patch_number', False):
version += '.0'
try:
version = semver.VersionInfo.parse(version)
if version.prerelease is not None:
continue
if config.get('skip_zero_patch', False) and version.patch == 0:
continue
except ValueError:
continue
if latest is None or latest < version:
latest = version
latest_remote = latest
message = f"{appname}: Update submodule {config['name']} to v{latest_remote}"
elif config['type'] == 'bespoke':
latest_remote = bespoke_functions[config['function']](action="remote_version")
message = f"{appname}: Update to v{latest_remote}"
elif config['type'] in ('pulumi', 'k8s', 'ci'):
url = f"{config['registry']}/{config['container']}"
if config['registry'] == 'docker.io':
tag_list = requests.get(f"https://registry.hub.docker.com/v1/repositories/{config['container']}/tags").json()
latest = None
for tag in tag_list:
version = tag['name']
if 'prefix' in config:
version = removeprefix(version, config['prefix'])
if 'suffix' in config:
version = removesuffix(version, config['suffix'])
if config.get('force_patch_number', False):
version += '.0'
try:
version = semver.VersionInfo.parse(version)
if version.prerelease is not None:
continue
if config.get('skip_zero_patch', False) and version.patch == 0:
continue
except ValueError:
continue
if latest is None or latest < version:
latest = version
latest_remote = latest
else:
raise NotImplementedError()
message = f"{appname}: Update image {url} to v{latest_remote}"
else:
raise NotImplementedError()
if latest_remote is None:
print("Did not receive any valid tags from the remote. Bailing")
sys.exit(1)
if latest_remote > current_version:
print(f"New version found: {latest_remote}")
else:
print(f"Latest version on remote: {latest_remote}")
print("Up to date, nothing to do!")
if latest_remote < current_version:
print("Warning: Looks like the current version is newer than on the remote")
return False, None, None, None, None
if args.check:
return True, None, None, None, None
print("Updating version ...")
current_version_full = str(current_version)
latest_full = str(latest_remote)
if config.get('force_patch_number', False):
current_version_full = removesuffix(current_version_full, '.0')
latest_full = removesuffix(latest_full, '.0')
if 'prefix' in config:
current_version_full = config['prefix'] + current_version_full
latest_full = config['prefix'] + latest_full
if 'suffix' in config:
current_version_full = current_version_full + config['suffix']
latest_full = latest_full + config['suffix']
update_version(current_version_full, latest_full, appname, config)
print("Done")
if config['type'] == 'k8s':
paths = [f'./k8s/manifests/{appname}']
elif config['type'] == 'pulumi':
paths = [f'./pulumi/versions.yml']
elif config['type'] == 'ci':
paths = [f'./.drone.yml']
elif config['type'] == 'manual':
paths = [config['file']]
elif config['type'] == 'submodule':
paths = [config['path']]
elif config['type'] == 'bespoke':
paths = bespoke_functions[config['function']](action="get_paths")
else:
raise NotImplementedError()
if group is False:
branchname = f'updates/{appname}/{config["name"]}/{latest_remote}'
commit_and_pr(appname, branchname, paths, message, config)
return True, None, None, None, None
else:
return True, paths, message, latest_remote, None
for appname, configs in repos.items():
if args.apps is not None:
if appname not in args.apps:
continue
for config in configs:
updates_found = handle_app(appname, config) or updates_found
if args.check and updates_found:
sys.exit(2)