"""Wrapped client for the configured GitLab bot.""" import logging import random import re import gitlab from gitlab_bot.models import GitlabConfig log = logging.getLogger(__name__) class GitlabBot(object): """Bot for doing GitLab stuff. Might be used by the IRC bot, or daemons, or whatever.""" COUNTER_RESET_FORMAT = "Review counter reset." COUNTER_START_FORMAT = "Please obtain {0:d} reviews." NEW_REVIEWER_FORMAT = "{0:d} of {1:d} reviewed. Assigning to {2:s} to review." REVIEWS_DONE_FORMAT = "Reviews conducted. Assigning to {0:s} to approve." NOTE_COMMENT = [ "The computer is your friend.", "Thank you for complying.", ] def __init__(self): """Initialize the actual GitLab client.""" config = GitlabConfig.objects.first() self.client = gitlab.Gitlab(config.url, private_token=config.token) self.client.auth() def random_counter_reset_message(self): return "{0:s} {1:s} {2:s}".format(self.COUNTER_RESET_FORMAT, self.COUNTER_START_FORMAT, random.choice(self.NOTE_COMMENT)) def random_new_reviewer_message(self): return "{0:s} {1:s}".format(self.NEW_REVIEWER_FORMAT, random.choice(self.NOTE_COMMENT)) def random_reviews_done_message(self): return "{0:s} {1:s}".format(self.REVIEWS_DONE_FORMAT, random.choice(self.NOTE_COMMENT)) def get_code_review_acceptance_count(self, merge_request): """Walk the provided merge request and determine the number of code review signoffs in the comments.""" if type(merge_request) != gitlab.objects.ProjectMergeRequest: raise TypeError("merge_request must be a ProjectMergeRequest object") approve_count = 0 approvers = list() reset_found = False send_reset = False notes = sorted(self.client.project_mergerequest_notes.list(project_id=merge_request.project_id, merge_request_id=merge_request.id, all=True), key=lambda x: x.id) for note in notes: if not note.system: # note that we can't ignore note.system = True + merge_request.author, that might have been # a new push or something, so we only ignore normal notes from the author if note.author == merge_request.author: log.debug("skipping note from the merge request author") elif note.author.username == self.client.user.username: log.debug("saw a message from myself, maybe i already nagged about the reset") if note.body.find(self.COUNTER_RESET_FORMAT) >= 0: log.debug("...yup") send_reset = False else: log.debug("...nope") else: if note.body.find("LGTM") >= 0: log.debug("merge request '%s', note '%s' has a LGTM", merge_request.title, note.id) approve_count += 1 if reset_found: log.debug("ignoring the previous reset, since we have a counter") reset_found = False send_reset = False approvers.append(note.author.username) log.debug("approval count set to %d", approve_count) else: log.debug("merge request '%s', note '%s' does not have a LGTM", merge_request.title, note.id) else: log.debug("merge request '%s', note '%s' is a system message", merge_request.title, note.id) if re.match(r'Added \d+ commit', note.body): log.debug("setting the counter to 0, '%s' looks like a push!", note.body) approve_count = 0 approvers = list() reset_found = True send_reset = True else: log.debug("leaving the counter as it is, i don't think '%s' is a push", note.body) return approve_count, approvers, send_reset def scan_project_for_reviews(self, project): project_obj = self.client.projects.get(project.project_id) if not project_obj: return for merge_request in project_obj.mergerequests.list(state='opened'): log.debug("scanning merge request '%s'", merge_request.title) # check to see if the merge request needs a reviewer or a merge approve_count, approvers, send_reset = self.get_code_review_acceptance_count(merge_request) if approve_count < project.code_reviews_necessary: log.debug("%s needs a code review", merge_request.title) if merge_request.assignee is not None: log.debug("%s currently assigned to %s", merge_request.title, merge_request.assignee.username) excluded_candidates = approvers + [merge_request.author.username] candidates = [x for x in project.code_reviewers.split(',') if x not in excluded_candidates] # if the current assignee is in candidates, then we don't need to do anything if merge_request.assignee is None or merge_request.assignee.username not in candidates: if len(candidates) > 0: new_reviewer = random.choice(candidates) log.debug("%s is the new reviewer", new_reviewer) # find the user object for the new reviewer new_reviewer_obj = self.client.users.get_by_username(new_reviewer) # create note for the update assign_msg = {'body': self.random_new_reviewer_message().format(approve_count, project.code_reviews_necessary, new_reviewer)} self.client.project_mergerequest_notes.create(assign_msg, project_id=project_obj.id, merge_request_id=merge_request.id) # assign the merge request to the new reviewer self.client.update(merge_request, assignee_id=new_reviewer_obj.id) else: log.warning("no reviewers left to review %s, doing nothing", merge_request.title) else: log.info("%s is already assigned to a candidate reviewer", merge_request.title) # regardless of our decision to assign or not, if we're supposed to send the reset # message, we should if send_reset: log.info("adding reset message to %s", merge_request.title) reset_msg = {'body': self.random_counter_reset_message().format(project.code_reviews_necessary)} self.client.project_mergerequest_notes.create(reset_msg, project_id=project_obj.id, merge_request_id=merge_request.id) else: log.debug("%s has enough code reviews, it needs someone to merge it", merge_request.title) if merge_request.assignee is not None: log.debug("%s currently assigned to %s", merge_request.title, merge_request.assignee.username) excluded = [merge_request.author.username] candidates = [x for x in project.code_review_final_merge_assignees.split(',') if x not in excluded] if merge_request.assignee is None or merge_request.assignee.username not in candidates: if len(candidates) > 0: new_approver = random.choice(candidates) log.debug("%s is the new approver", new_approver) # find the user object for the new reviewer new_approver_obj = self.client.users.get_by_username(new_approver) # create note for the update assign_msg = {'body': self.random_reviews_done_message().format(new_approver)} self.client.project_mergerequest_notes.create(assign_msg, project_id=project_obj.id, merge_request_id=merge_request.id) # assign the merge request to the new reviewer self.client.update(merge_request, assignee_id=new_approver_obj.id) else: log.warning("no approvers left to approve %s, doing nothing", merge_request.title)