buildbot-transifex/buildbot_transifex/webhook.py
shekhar-cis 89305ec602
All checks were successful
buildbot/multibuild_parent Build done.
buildbot/travis_bionic Build done.
Update Webhook response variabel
2023-01-06 20:49:37 +05:30

176 lines
5.9 KiB
Python

"""Transifex webhook handler """
import os
import base64
import time
import json
import hmac
import hashlib
from subprocess import call
from base64 import b64encode
import requests
from buildbot.process.properties import Properties
from buildbot.util import bytes2unicode, unicode2bytes
from buildbot.www.hooks.base import BaseHookHandler
from twisted.internet import defer
from twisted.python import log
from dateutil.parser import parse as dateparse
_HEADER_USER_AGENT = 'User-Agent'
_HEADER_SIGNATURE = 'X-TX-Signature'
_HEADER_URL_PATH = 'X-TX-Url'
HTTP_DATE = 'date'
_EVENT_KEY = 'event'
transifex_request_data = {}
class TransifexHandler(BaseHookHandler):
def __init__(self, master, secret, options, transifex_dict):
if not options:
options = {}
self.secret = secret
self.master = master
self.options = options
self.transifex_dict = transifex_dict
def returnMessage(self, status = False, message = "Unimplemented"):
output = json.dumps({"status": "OK" if status else "FAIL", "message": message})
return [output, [('Content-type', 'application/json')]]
def verifyTransifexSignature(
self, request, content, signature, header_signature
):
http_verb = 'POST'
http_url_path = request.getHeader(_HEADER_URL_PATH)
http_gmt_date = request.getHeader(HTTP_DATE)
content_md5 = hashlib.md5(content).hexdigest()
msg = b'\n'.join([
http_verb, http_url_path, http_gmt_date, content_md5
])
tx_signature = base64.b64encode(
hmac.new(
key=self.rendered_secret,
msg=msg,
digestmod=hashlib.sha256
).digest()
)
if tx_signature != header_signature:
return False
if signature != request.getHeader(_HEADER_SIGNATURE):
return False
def process_translation_completed(self, payload, transifex_dict, event_type, codebase):
changes = []
transifex_response = self._transform_variables(payload, transifex_dict)
if 'pybitmessage-test' in transifex_response['project'] and 'messagespot' in transifex_response['resource']:
if 'translation_completed' in transifex_response['event']:
ts = int(time.time())
lang = transifex_response['language']
return
# if isinstance(self.options, dict):
# commits = commits[:1]
# # for commit in commits:
# # files = []
# # for kind in ('added', 'modified', 'removed'):
# # files.extend(commit.get(kind, []) or [])
change = {
'author': 'buildbot-transifex,
'resource': transifex_response['resource'],
'branch': transifex_dict['branch'],
'project': transifex_response['project'],
'event': event_type,
'properties': {
'branch': branch,
'revision': revision,
'language': lang,
'resource': resource,
'project': project
}
}
if codebase is not None:
change['codebase'] = codebase
changes.insert(0, change)
return changes
def _transform_variables(self, payload, transifex_dict):
project = payload.get('project', 'None')
transform_values = {
project: {
"repository": "https://github.com/Bitmessage/PyBitmessage",
"branch": "v0.6"
}
}
return transform_values
@defer.inlineCallbacks
def getChanges(self, request):
self.secret = None
if isinstance(self.options, dict):
self.secret = self.options.get('secret')
try:
content = request.content.read()
content_text = bytes2unicode(content)
payload = json.loads(content_text)
except Exception as exception:
raise ValueError('Error loading JSON: ' + str(exception))
if self.secret is not None:
p = Properties()
p.master = self.master
option = self.options
self.rendered_secret = yield p.render(self.secret)
signature = hmac.new(
unicode2bytes(self.rendered_secret),
unicode2bytes(content_text.strip()),
digestmod=hashlib.sha256)
header_signature = bytes2unicode(
request.getHeader(_HEADER_SIGNATURE))
self.verifyTransifexSignature(
request, content, self.rendered_secret,
signature, header_signature
)
event_type = payload.get("event", "None")
language = payload.get("language", 'None')
project = payload.get("project", 'None')
resource = payload.get("resource", 'None')
transifex_request_data['branch'] = "v0.6"
transifex_request_data['revision'] = ""
transifex_request_data["properties"] = "langugage"
transifex_request_data["properties"] = "resource"
transifex_request_data["properties"] = "project"
transifex_request_data["properties"] = {
"branch": branch,
"revision": revision
}
transifex_request_data["changes"] = {
"author": "buildbot-transifex",
"repository": project,
}
log.msg("Received event '{}' from transifex".format(event_type))
codebase = ""
changes = []
handler_function = getattr(self, 'process_{}'.format(event_type), None)
if not handler_function:
log.msg("Ignoring transifex event '{}'".format(event_type))
else:
changes = handler_function(payload, event_type, codebase)
return (changes, 'transifex')
transifex = TransifexHandler(transifex_dict)