Module rpc_service

Start a server and serve JSON-RPC requests.

Workflow

Admin:

Loads user name into python shelf who are authorized to access the JSON-RPC
service before serving first request.

User:

Only the users who are added by the admin can get a JWT token by
sending a JSON-RPC request with parameters 'name' and 'jsessionid'
to '/public' endpoint.

In the subsequent requests user should pass this
token in 'x-access-token' header to '/private' endpoint for accessing
methods.

Note:

This program isn't intended to be used as it is without changing the
business logic. Basic authentication and authorization can be taken from
this program and your respective business logic can be implemented with
little ease.

Typical Usage:

$ pipenv shell
$ waitress-serve --port=$SERVER_PORT rpc_service:app

Example:

- Get a token:
$ curl -X POST -H 'Content-Type: application/json' -d '{"id":1, "jsonrpc":"2.0","params":{"name": $user, "jsessionid":$cookie_string}, "method":"get_token"}' url:port/public

For more info refer Github repo.

Expand source code
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''Start a server and serve JSON-RPC requests.

# Workflow

## Admin:
    Loads user name into python shelf who are authorized to access the JSON-RPC
    service before serving first request.

## User:
    Only the users who are added by the admin can get a JWT token by
    sending a JSON-RPC request with parameters 'name' and 'jsessionid'
    to '/public' endpoint.

    In the subsequent requests user should pass this
    token in 'x-access-token' header to '/private' endpoint for accessing
    methods.

## Note:
    This program isn't intended to be used as it is without changing the
    business logic. Basic authentication and authorization can be taken from
    this program and your respective business logic can be implemented with
    little ease.

# Typical Usage:
    $ pipenv shell
    $ waitress-serve --port=$SERVER_PORT rpc_service:app

# Example:
    - Get a token:
    $ curl -X POST -H 'Content-Type: application/json' -d '{"id":1, \
"jsonrpc":"2.0","params":{"name": $user, "jsessionid":$cookie_string}, \
"method":"get_token"}' url:port/public

For more info refer [Github]( https://github.com/leelavg/cigs ) repo.
'''

import datetime
import logging
import os
import re
import shelve
import ssl
import unicodedata
from collections import namedtuple
from functools import wraps
from logging.handlers import RotatingFileHandler

import jwt
import requests
from dotenv import load_dotenv
from flask import Flask, Response, has_request_context, json, jsonify, request
from ghapi.all import GhApi
from jira import JIRA, JIRAError
from jsonrpcserver import dispatch, method
from jsonrpcserver.exceptions import ApiError
from werkzeug.exceptions import HTTPException

# TODO: Fix SSL Issue while accessing Github
# pylint: disable=protected-access
ssl._create_default_https_context = ssl._create_unverified_context


# Global setup
def _validated_env_vars():
    ''' Validates environment variables.

    Takes note of Mandatory and Optional parameters passed in '.env' file and \
    stores them in a global namedtuple

    Returns:
        True if required environment variables are validated False otherwise

    Raises:
        Exception: If not all mandatory parameters required to serve the \
        requests are not specified in '.env' file
    '''

    fields = 'JIRA_SERVER JIRA_USERNAME JIRA_PASSWORD JIRA_CERT_PATH \
    GITHUB_OWNER GITHUB_REPO POLARION_URL POLARION_REPO POLARION_USERNAME \
    POLARION_PASSWORD POLARION_PROJECT POLARION_CERT_PATH JWT_SECRET \
    JWT_EXPIRY SERVER_PORT ADMIN_USER SHELF_NAME'

    try:
        mandatory = namedtuple('Environ_man', fields)._make(
            os.getenv(field) for field in fields.split() if os.getenv(field))
    except TypeError as excep:
        raise Exception('Insufficient environment variables: ',
                        str(excep)) from excep

    # Not mandatory (optional) environment values
    fields = 'LOG_FILE LOG_SIZE LOG_BKP_COUNT'
    optional = namedtuple('Environ_opt', fields)._make(
        os.getenv(field) for field in fields.split())
    env = namedtuple('Environ', mandatory._fields + optional._fields)
    return env(*mandatory, *optional)


if not load_dotenv(dotenv_path=os.getenv('DOTENV_PATH', '.env'),
                   override=os.getenv('OVERRIDE_ENV') or False):
    raise Exception('Not able to load environment variables from file')

# Store environment variables and create Flask instance
ENV = _validated_env_vars()
app = Flask(__name__)
owner, repo = ENV.GITHUB_OWNER, ENV.GITHUB_REPO


# Logging
# From flask logging documentation
class RequestFormatter(logging.Formatter):
    '''Log requester's remote address and request resource url too in log.'''
    def format(self, record):
        if has_request_context():
            record.url = request.url
            record.remote_addr = request.remote_addr
        else:
            record.url = None
            record.remote_addr = None

        return super().format(record)


def _get_logger(logger_name):
    '''Creates the logger for the application.

    Adds FileHandler to 'jsonrpcserver.dispatcher' and 'app' logger as well.

    Args:
        logger_name (str): Name of the logger to be created and returned

    Returns:
        A logger object with the received 'logger_name'
    '''

    # Create a custom logger
    logger = logging.getLogger(logger_name)
    log_file = ENV.LOG_FILE or 'rpc-service.log'

    # Create handlers
    file_handler = RotatingFileHandler(log_file,
                                       maxBytes=ENV.LOG_SIZE or 1e+7,
                                       backupCount=ENV.LOG_BKP_COUNT or 5)
    file_handler.setLevel(logging.INFO)

    # Create and set formatter
    formatter = RequestFormatter(
        '[%(asctime)s] %(name)s %(remote_addr)s for %(url)s '
        '%(levelname)s: %(message)s')
    file_handler.setFormatter(formatter)

    # Add handlers to logger
    logger.addHandler(file_handler)

    # Log messages from 'jsonrpcserver' as well to log file
    temp_logger = logging.getLogger('jsonrpcserver.dispatcher')
    temp_logger.addHandler(file_handler)
    temp_logger.setLevel(logging.INFO)

    # Add file handler to Flask as well and log message with new formatter
    app.logger.addHandler(file_handler)

    logger.propagate = False

    return logger


LOG = _get_logger(__name__)


# Create shelf and add ADMIN_USER
def _init_shelf():
    '''Initiates python shelve.

    Creates a new shelf if one doesn't exist to store list of users.
    '''
    with shelve.open(ENV.SHELF_NAME, writeback=True) as shelf:
        if not shelf.get('users'):
            shelf['users'] = [ENV.ADMIN_USER]
            LOG.info('Created new shelf with name %s', ENV.SHELF_NAME)


_init_shelf()

# Create required REST/SOAP client connections
jira = JIRA(basic_auth=(ENV.JIRA_USERNAME, ENV.JIRA_PASSWORD),
            options={
                'server': ENV.JIRA_SERVER,
                'verify': ENV.JIRA_CERT_PATH
            })

# Create a Github api connection
ghapi = GhApi()

# Pylero SOAP client is established during import and environment variables
# has to be validated as pylero picks credentials from there and let the
# 'Exception' stop the program execution if config is not correct
# pylint: disable=wrong-import-position
# TODO: Is it possible to cache suds client?
from pylero.work_item import TestCase  # noqa

POL_MARKUP = '<span style="font-size: 10pt;line-height: 1.5;">{}</span>'


# Start of Helper functions
def token_required(func):
    '''Decorator to validate JWT token in http header.'''
    @wraps(func)
    def decorated(*args, **kwargs):
        token = None

        if 'x-access-token' in request.headers:
            token = request.headers['x-access-token']

        if not token:
            return {'message': 'Token is missing!'}, 401

        try:
            data = jwt.decode(token, ENV.JWT_SECRET, algorithms=['HS256'])
            caller = data['sub']
        except jwt.exceptions.PyJWTError as excep:
            return {'message': str(excep)}, 401

        # All the functions using token will receive 'caller' as their first
        # argument
        return func(caller, *args, **kwargs)

    return decorated


# Start of basic error handling functions
@app.errorhandler(HTTPException)
def handle_http_exception(excep):
    '''Generic Error Handler for 'HTTPException'

    Args:
        excep (Exception): Type of exception raised

    Returns:
        response JSON object
    '''
    response = excep.get_response()
    response.data = json.dumps({
        'code': excep.code,
        'name': excep.name,
        'description': excep.description,
    })
    response.content_type = 'application/json'
    return response


@app.errorhandler(Exception)
def handle_exception(excep):
    '''Generic Exception handler for web requests with a pass through for \
    HTTPException.

    Args:
        excep (Exception): Type of exception raised

    Returns:
        JSON Object and Error code as a response
    '''
    code = 500
    if isinstance(excep, HTTPException):
        code = excep.code
    return jsonify(error=str(excep)), code


# Start of methods that are served at '/public' endpoint
@method
def get_token(name, jsessionid, skip_auth=False):
    '''Creates a token and returns to the caller.

    Generates a JWT token based on the info received. Cookie string will be \
    deleted after proving the user authenticity against jira server.

    Args:
        name (str): Name of the user requesting the token
        jsessionid (str): Cookie string recevied after authenticating \
        to '/rest/auth/latest/session' of Jira instance
        skip_auth (bool): Restricted to ADMIN_USER

    Raises:
        InvalidParamsError: If either 'name' or 'jsessionid' param missing \
        from JSON-RPC request
        ApiError: If the user isn't supposed to use this JSON-RPC service

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"get_token", "params": {"name": "USERNAME",
        "jsessionid": "COOKIE_STRING"}, "id": "INTEGER/STRING"}
    '''

    with shelve.open(ENV.SHELF_NAME, flag='r') as shelf:
        if name not in shelf.get('users'):
            raise ApiError(
                'Unauthorized access', -32000,
                'Please contact admin for getting access to this service')

    if name == ENV.ADMIN_USER and skip_auth:
        # If ADMIN_USER doesn't want to be authenticated against jira
        pass
    else:
        # Authenticate user against Jira using the Cookie string
        auth_url = f'{ENV.JIRA_SERVER}/rest/auth/latest/session'
        cookies = {'JSESSIONID': jsessionid}
        resp = requests.get(auth_url,
                            cookies=cookies,
                            verify=ENV.JIRA_CERT_PATH)
        if resp:
            # Proving the identity using authenticated Jira cookie
            username = resp.json()['name']
            if name != username:
                raise ApiError(
                    'Unauthorized access', -32000,
                    f'Supplied "JSESSIONID" doesn\'t belong to {name}')

            # After authentication, logout the session which invalidates cookie
            del_ses = requests.delete(auth_url,
                                      cookies=cookies,
                                      verify=ENV.JIRA_CERT_PATH)
            if not del_ses:
                raise ApiError(
                    'Please logout of Jira once manually', -32000,
                    'Deletion of cookie failed and manual logout is needed')
        else:
            raise ApiError(
                'Invalid JSESSIONID supplied', -32000,
                f'Please contact {ENV.ADMIN_USER} for authenticating to '
                'JIRA correctly')

    # Generate and return JWT
    expiry = (datetime.datetime.utcnow() +
              datetime.timedelta(days=int(ENV.JWT_EXPIRY)))

    payload = {
        'exp': expiry,
        'sub': name,
    }
    return 'Token: ' + jwt.encode(payload, ENV.JWT_SECRET, algorithm='HS256')


# Start of methods that are served at '/private' endpoint
@method
def add_user(context, name):
    '''Adds new user to python shelve.

    Only accessible to ADMIN_USER.

    Args:
        context (dict): A dict of form {'caller': 'CALLER_NAME'}
        name (str): Name of the user to add

    Returns:
        String response on successly adding user

    Raises:
        ApiError: If user is not authorized to this resource

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"add_user", "params": {"name": "USERNAME"},
        "id": "INTEGER/STRING"}
    '''
    if context['caller'] == ENV.ADMIN_USER:
        with shelve.open(ENV.SHELF_NAME, writeback=True) as shelf:
            if name not in shelf.get('users'):
                shelf['users'].append(name)
                return f'{name} added to shelf'
        return f'{name} already exists in shelf'
    raise ApiError('Unauthorized access', -32000, 'Restricted to admin user')


@method
def get_all_users(context):
    '''Returns all the users stored in shelf.

    Only accessible to ADMIN_USER.

    Args:
        context (dict): A dict of form {'caller': 'CALLER_NAME'}

    Returns:
        All the users stored in shelf

    Raises:
        ApiError: If user is not authorized to this resource

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"get_all_users", "id": "INTEGER/STRING"}
    '''
    if context['caller'] == ENV.ADMIN_USER:
        with shelve.open(ENV.SHELF_NAME, flag='r') as shelf:
            if shelf.get('users'):
                return shelf['users']
    raise ApiError('Unauthorized access', -32000, 'Restricted to admin user')


# Business logic
# pylint: disable=line-too-long
@method
def to_done(context, issue_id):
    '''Validate and update fields across Jira, Github and Polarion using \
    ADMIN_USER credentials.

    Args:
        context (dict): A dict of form {'caller': 'CALLER_NAME'}
        issue_id (str): Jira Issue ID for fields updation

    Raises:
        InvalidParamsError: If 'issue_id' param is missing from JSON-RPC \
        request.

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"to_done", "params": {"issue_id":
        "JIRA_ISSUE_ID", "id": "INTEGER/STRING"}
    '''

    # Holds all the info required to perform below operations
    info_dict = {}

    # Jira
    try:
        issue = jira.issue(issue_id, fields='summary,comment,assignee')
    except JIRAError as err:
        raise ApiError('Jira Error', -32000,
                       f'{err.status_code}: {err.text}') from err

    if issue.fields.assignee.name != context['caller']:
        raise ApiError('Not an assignee', -3200,
                       'This issue is not assigned to you')

    summary = issue.fields.summary
    info_dict['pol_id'] = summary.split(None, 1)[0]

    comment = issue.fields.comment.comments[-1].body

    # Change unicode to normalized form
    comment = unicodedata.normalize('NFKD', comment)

    # Expects comment body as below
    # PR: 12345
    # fn: test_function_1 test_function_2
    info_dict['PR'] = re.search(r'pr: (\d+)', comment).group(1)

    # One polarion test may correspond to many 'test_' functions in worst case
    temp_fns = re.findall(r'(test_\w+)', comment)
    if not (info_dict['PR'] and temp_fns):
        raise ApiError(
            'Malformed comment body', -32000, 'Comment expression '
            'should be of form: PR: 12345\nfn: test_func_1 test_func_2')

    # Github
    # Test if PR is merged
    try:
        ghapi.pulls.check_if_merged(owner=owner,
                                    repo=repo,
                                    pull_number=info_dict['PR'])
        info_dict['pr_status'] = 'MERGED'
    # pylint: disable=broad-except
    # Can't catch ghapi specific error here
    except Exception as excep:
        if '404' in str(excep):
            raise ApiError('Github error', -32000,
                           f'PR: {info_dict["PR"]} is not merged') from excep
        else:
            raise ApiError(
                'Github error', -32000,
                f'PR: {info_dict["PR"]} doesn\'t exist in {owner}/{repo}'
            ) from excep

    # Take note of test_script and function name
    info_dict['fn_path'] = []
    files = ghapi.pulls.list_files(owner, repo, info_dict['PR'])
    for each_file in files.items:
        pr_content = str(each_file)
        all_fns = re.findall(r'def (test_\w+)', pr_content)
        fpath = re.findall(r'filename: (.*)', pr_content)[0]

        for fn in all_fns:
            # Check 'test_' function from file matches any function
            # given in Jira comment and take note of the file path
            if fn in temp_fns:
                info_dict['fn_path'].append((fn, fpath))
        if len(info_dict['fn_path']) != len(temp_fns):
            raise ApiError('Patch function doesn\'t exist', -32000,
                           f'{temp_fns} doesn\'t exist in {info_dict["PR"]}')

    # Polarion
    try:
        testcase = TestCase(work_item_id=info_dict['pol_id'])
    except Exception as excep:
        raise ApiError('Polarion error', -32000, str(excep)) from excep
    if testcase.caseautomation != 'automated':
        testcase.caseautomation = 'automated'
        setattr(testcase, 'testcase-automation_id',
                ' '.join(entry[0] for entry in info_dict['fn_path']))
        script_path = '\n'.join(
            POL_MARKUP.format(entry[1]) for entry in info_dict['fn_path'])
        testcase.automation_script = script_path
        try:
            testcase.update()
        except Exception as excep:
            raise ApiError('Polarion error', -32000, str(excep)) from excep
    else:
        raise ApiError(
            'Polarion error', -32000, f'{info_dict["id"]} is '
            'already marked as "automated" in Polarion')

    # Jira
    # Transistion to 'Done' state
    # Example: To get what transistions are possible for current Jira project
    # >>> trans = jira.transitions(issue)
    # >>> available = [(t['id'], t['name']) for t in trans]; print(available)
    # >>> [('21', 'In Progress'), ('31', 'Done'), ('51', 'To Do'), ('61', 'In
    # Review')]
    # >>> jira_state = [num[0] for num in available if num[1] == 'Done' ][0]
    try:
        jira.transition_issue(issue, '31')
    except JIRAError as err:
        raise ApiError(
            'Jira Error', -32000,
            ('Polarion fields are updated but mark Jira manually to '
             '"Done", error:' + str(err))) from err

    # Add JIRA comment with gathered info if update is successful
    body = '\n'.join(
        str(key) + ': ' + str(value) for key, value in info_dict.items()
        if key not in ('PR', 'pol_id'))
    body += f'\nGithub: {owner}/{repo}/pulls/{info_dict["PR"]}'
    body += f'\nPolarion: {ENV.POLARION_URL}/#/project/{ENV.POLARION_PROJECT}/workitem?id={info_dict["pol_id"]}'  # noqa
    body += '\nGithub status is verified and Polarion fields are updated'

    try:
        jira.add_comment(issue, body)
    except JIRAError as err:
        raise ApiError(
            'Jira Error', -32000,
            ('Polarion and Jira fields are updated but unable to add Jira '
             'comment' + str(err))) from err

    return 'Github fields are validated, Jira and Polarion fields are updated'


@app.route('/private', methods=['POST'])
@token_required
def private(caller):
    '''Generic route for private methods which requires authentication.

    ### Currently dispatches below methods:
        get_all_users: Restricted to ADMIN_USER
        add_user: Restricted to ADMIN_USER
        to_done: Available to all authenticated users

    ### Note:
        Most of the times 'caller' will be inferred from token and
        'user' from JSON-API Call params

    ### Please refer specific method docstrings for an example API call that \
        the function accepts.
    '''
    req = request.get_data().decode()
    response = dispatch(
        req,
        context={'caller': caller},
        debug=True,
    )
    return Response(str(response),
                    response.http_status,
                    mimetype='application/json')


@app.route('/public', methods=['POST'])
def public():
    '''Generic route for public methods which doesn't require authentication.

    ### Currently dispatches below methods:
        get_token: Available to users at the discretion of ADMIN_USER

    ### Note:
        Typically used to acquire a token by proving the user identity.

    ### Please refer specific method docstrings for an example API call that \
        the function accepts.
    '''
    req = request.get_data().decode()
    response = dispatch(
        req,
        debug=True,
    )
    return Response(str(response),
                    response.http_status,
                    mimetype='application/json')

Functions

def add_user(context, name)

Adds new user to python shelve.

Only accessible to ADMIN_USER.

Args

context : dict
A dict of form {'caller': 'CALLER_NAME'}
name : str
Name of the user to add

Returns

String response on successly adding user

Raises

ApiError
If user is not authorized to this resource

Example (JSON-RPC request):

{"jsonrpc": "2.0", "method":"add_user", "params": {"name": "USERNAME"},
"id": "INTEGER/STRING"}
Expand source code
@method
def add_user(context, name):
    '''Adds new user to python shelve.

    Only accessible to ADMIN_USER.

    Args:
        context (dict): A dict of form {'caller': 'CALLER_NAME'}
        name (str): Name of the user to add

    Returns:
        String response on successly adding user

    Raises:
        ApiError: If user is not authorized to this resource

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"add_user", "params": {"name": "USERNAME"},
        "id": "INTEGER/STRING"}
    '''
    if context['caller'] == ENV.ADMIN_USER:
        with shelve.open(ENV.SHELF_NAME, writeback=True) as shelf:
            if name not in shelf.get('users'):
                shelf['users'].append(name)
                return f'{name} added to shelf'
        return f'{name} already exists in shelf'
    raise ApiError('Unauthorized access', -32000, 'Restricted to admin user')
def get_all_users(context)

Returns all the users stored in shelf.

Only accessible to ADMIN_USER.

Args

context : dict
A dict of form {'caller': 'CALLER_NAME'}

Returns

All the users stored in shelf

Raises

ApiError
If user is not authorized to this resource

Example (JSON-RPC request):

{"jsonrpc": "2.0", "method":"get_all_users", "id": "INTEGER/STRING"}
Expand source code
@method
def get_all_users(context):
    '''Returns all the users stored in shelf.

    Only accessible to ADMIN_USER.

    Args:
        context (dict): A dict of form {'caller': 'CALLER_NAME'}

    Returns:
        All the users stored in shelf

    Raises:
        ApiError: If user is not authorized to this resource

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"get_all_users", "id": "INTEGER/STRING"}
    '''
    if context['caller'] == ENV.ADMIN_USER:
        with shelve.open(ENV.SHELF_NAME, flag='r') as shelf:
            if shelf.get('users'):
                return shelf['users']
    raise ApiError('Unauthorized access', -32000, 'Restricted to admin user')
def get_token(name, jsessionid, skip_auth=False)

Creates a token and returns to the caller.

Generates a JWT token based on the info received. Cookie string will be deleted after proving the user authenticity against jira server.

Args

name : str
Name of the user requesting the token
jsessionid : str
Cookie string recevied after authenticating to '/rest/auth/latest/session' of Jira instance
skip_auth : bool
Restricted to ADMIN_USER

Raises

InvalidParamsError
If either 'name' or 'jsessionid' param missing from JSON-RPC request
ApiError
If the user isn't supposed to use this JSON-RPC service

Example (JSON-RPC request):

{"jsonrpc": "2.0", "method":"get_token", "params": {"name": "USERNAME",
"jsessionid": "COOKIE_STRING"}, "id": "INTEGER/STRING"}
Expand source code
@method
def get_token(name, jsessionid, skip_auth=False):
    '''Creates a token and returns to the caller.

    Generates a JWT token based on the info received. Cookie string will be \
    deleted after proving the user authenticity against jira server.

    Args:
        name (str): Name of the user requesting the token
        jsessionid (str): Cookie string recevied after authenticating \
        to '/rest/auth/latest/session' of Jira instance
        skip_auth (bool): Restricted to ADMIN_USER

    Raises:
        InvalidParamsError: If either 'name' or 'jsessionid' param missing \
        from JSON-RPC request
        ApiError: If the user isn't supposed to use this JSON-RPC service

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"get_token", "params": {"name": "USERNAME",
        "jsessionid": "COOKIE_STRING"}, "id": "INTEGER/STRING"}
    '''

    with shelve.open(ENV.SHELF_NAME, flag='r') as shelf:
        if name not in shelf.get('users'):
            raise ApiError(
                'Unauthorized access', -32000,
                'Please contact admin for getting access to this service')

    if name == ENV.ADMIN_USER and skip_auth:
        # If ADMIN_USER doesn't want to be authenticated against jira
        pass
    else:
        # Authenticate user against Jira using the Cookie string
        auth_url = f'{ENV.JIRA_SERVER}/rest/auth/latest/session'
        cookies = {'JSESSIONID': jsessionid}
        resp = requests.get(auth_url,
                            cookies=cookies,
                            verify=ENV.JIRA_CERT_PATH)
        if resp:
            # Proving the identity using authenticated Jira cookie
            username = resp.json()['name']
            if name != username:
                raise ApiError(
                    'Unauthorized access', -32000,
                    f'Supplied "JSESSIONID" doesn\'t belong to {name}')

            # After authentication, logout the session which invalidates cookie
            del_ses = requests.delete(auth_url,
                                      cookies=cookies,
                                      verify=ENV.JIRA_CERT_PATH)
            if not del_ses:
                raise ApiError(
                    'Please logout of Jira once manually', -32000,
                    'Deletion of cookie failed and manual logout is needed')
        else:
            raise ApiError(
                'Invalid JSESSIONID supplied', -32000,
                f'Please contact {ENV.ADMIN_USER} for authenticating to '
                'JIRA correctly')

    # Generate and return JWT
    expiry = (datetime.datetime.utcnow() +
              datetime.timedelta(days=int(ENV.JWT_EXPIRY)))

    payload = {
        'exp': expiry,
        'sub': name,
    }
    return 'Token: ' + jwt.encode(payload, ENV.JWT_SECRET, algorithm='HS256')
def handle_exception(excep)

Generic Exception handler for web requests with a pass through for HTTPException.

Args

excep : Exception
Type of exception raised

Returns

JSON Object and Error code as a response

Expand source code
@app.errorhandler(Exception)
def handle_exception(excep):
    '''Generic Exception handler for web requests with a pass through for \
    HTTPException.

    Args:
        excep (Exception): Type of exception raised

    Returns:
        JSON Object and Error code as a response
    '''
    code = 500
    if isinstance(excep, HTTPException):
        code = excep.code
    return jsonify(error=str(excep)), code
def handle_http_exception(excep)

Generic Error Handler for 'HTTPException'

Args

excep : Exception
Type of exception raised

Returns

response JSON object

Expand source code
@app.errorhandler(HTTPException)
def handle_http_exception(excep):
    '''Generic Error Handler for 'HTTPException'

    Args:
        excep (Exception): Type of exception raised

    Returns:
        response JSON object
    '''
    response = excep.get_response()
    response.data = json.dumps({
        'code': excep.code,
        'name': excep.name,
        'description': excep.description,
    })
    response.content_type = 'application/json'
    return response
def private(caller)

Generic route for private methods which requires authentication.

Currently dispatches below methods:

get_all_users: Restricted to ADMIN_USER
add_user: Restricted to ADMIN_USER
to_done: Available to all authenticated users

Note:

Most of the times 'caller' will be inferred from token and
'user' from JSON-API Call params

Please refer specific method docstrings for an example API call that the function accepts.

Expand source code
@app.route('/private', methods=['POST'])
@token_required
def private(caller):
    '''Generic route for private methods which requires authentication.

    ### Currently dispatches below methods:
        get_all_users: Restricted to ADMIN_USER
        add_user: Restricted to ADMIN_USER
        to_done: Available to all authenticated users

    ### Note:
        Most of the times 'caller' will be inferred from token and
        'user' from JSON-API Call params

    ### Please refer specific method docstrings for an example API call that \
        the function accepts.
    '''
    req = request.get_data().decode()
    response = dispatch(
        req,
        context={'caller': caller},
        debug=True,
    )
    return Response(str(response),
                    response.http_status,
                    mimetype='application/json')
def public()

Generic route for public methods which doesn't require authentication.

Currently dispatches below methods:

get_token: Available to users at the discretion of ADMIN_USER

Note:

Typically used to acquire a token by proving the user identity.

Please refer specific method docstrings for an example API call that the function accepts.

Expand source code
@app.route('/public', methods=['POST'])
def public():
    '''Generic route for public methods which doesn't require authentication.

    ### Currently dispatches below methods:
        get_token: Available to users at the discretion of ADMIN_USER

    ### Note:
        Typically used to acquire a token by proving the user identity.

    ### Please refer specific method docstrings for an example API call that \
        the function accepts.
    '''
    req = request.get_data().decode()
    response = dispatch(
        req,
        debug=True,
    )
    return Response(str(response),
                    response.http_status,
                    mimetype='application/json')
def to_done(context, issue_id)

Validate and update fields across Jira, Github and Polarion using ADMIN_USER credentials.

Args

context : dict
A dict of form {'caller': 'CALLER_NAME'}
issue_id : str
Jira Issue ID for fields updation

Raises

InvalidParamsError
If 'issue_id' param is missing from JSON-RPC request.

Example (JSON-RPC request):

{"jsonrpc": "2.0", "method":"to_done", "params": {"issue_id":
"JIRA_ISSUE_ID", "id": "INTEGER/STRING"}
Expand source code
@method
def to_done(context, issue_id):
    '''Validate and update fields across Jira, Github and Polarion using \
    ADMIN_USER credentials.

    Args:
        context (dict): A dict of form {'caller': 'CALLER_NAME'}
        issue_id (str): Jira Issue ID for fields updation

    Raises:
        InvalidParamsError: If 'issue_id' param is missing from JSON-RPC \
        request.

    ### Example (JSON-RPC request):
        {"jsonrpc": "2.0", "method":"to_done", "params": {"issue_id":
        "JIRA_ISSUE_ID", "id": "INTEGER/STRING"}
    '''

    # Holds all the info required to perform below operations
    info_dict = {}

    # Jira
    try:
        issue = jira.issue(issue_id, fields='summary,comment,assignee')
    except JIRAError as err:
        raise ApiError('Jira Error', -32000,
                       f'{err.status_code}: {err.text}') from err

    if issue.fields.assignee.name != context['caller']:
        raise ApiError('Not an assignee', -3200,
                       'This issue is not assigned to you')

    summary = issue.fields.summary
    info_dict['pol_id'] = summary.split(None, 1)[0]

    comment = issue.fields.comment.comments[-1].body

    # Change unicode to normalized form
    comment = unicodedata.normalize('NFKD', comment)

    # Expects comment body as below
    # PR: 12345
    # fn: test_function_1 test_function_2
    info_dict['PR'] = re.search(r'pr: (\d+)', comment).group(1)

    # One polarion test may correspond to many 'test_' functions in worst case
    temp_fns = re.findall(r'(test_\w+)', comment)
    if not (info_dict['PR'] and temp_fns):
        raise ApiError(
            'Malformed comment body', -32000, 'Comment expression '
            'should be of form: PR: 12345\nfn: test_func_1 test_func_2')

    # Github
    # Test if PR is merged
    try:
        ghapi.pulls.check_if_merged(owner=owner,
                                    repo=repo,
                                    pull_number=info_dict['PR'])
        info_dict['pr_status'] = 'MERGED'
    # pylint: disable=broad-except
    # Can't catch ghapi specific error here
    except Exception as excep:
        if '404' in str(excep):
            raise ApiError('Github error', -32000,
                           f'PR: {info_dict["PR"]} is not merged') from excep
        else:
            raise ApiError(
                'Github error', -32000,
                f'PR: {info_dict["PR"]} doesn\'t exist in {owner}/{repo}'
            ) from excep

    # Take note of test_script and function name
    info_dict['fn_path'] = []
    files = ghapi.pulls.list_files(owner, repo, info_dict['PR'])
    for each_file in files.items:
        pr_content = str(each_file)
        all_fns = re.findall(r'def (test_\w+)', pr_content)
        fpath = re.findall(r'filename: (.*)', pr_content)[0]

        for fn in all_fns:
            # Check 'test_' function from file matches any function
            # given in Jira comment and take note of the file path
            if fn in temp_fns:
                info_dict['fn_path'].append((fn, fpath))
        if len(info_dict['fn_path']) != len(temp_fns):
            raise ApiError('Patch function doesn\'t exist', -32000,
                           f'{temp_fns} doesn\'t exist in {info_dict["PR"]}')

    # Polarion
    try:
        testcase = TestCase(work_item_id=info_dict['pol_id'])
    except Exception as excep:
        raise ApiError('Polarion error', -32000, str(excep)) from excep
    if testcase.caseautomation != 'automated':
        testcase.caseautomation = 'automated'
        setattr(testcase, 'testcase-automation_id',
                ' '.join(entry[0] for entry in info_dict['fn_path']))
        script_path = '\n'.join(
            POL_MARKUP.format(entry[1]) for entry in info_dict['fn_path'])
        testcase.automation_script = script_path
        try:
            testcase.update()
        except Exception as excep:
            raise ApiError('Polarion error', -32000, str(excep)) from excep
    else:
        raise ApiError(
            'Polarion error', -32000, f'{info_dict["id"]} is '
            'already marked as "automated" in Polarion')

    # Jira
    # Transistion to 'Done' state
    # Example: To get what transistions are possible for current Jira project
    # >>> trans = jira.transitions(issue)
    # >>> available = [(t['id'], t['name']) for t in trans]; print(available)
    # >>> [('21', 'In Progress'), ('31', 'Done'), ('51', 'To Do'), ('61', 'In
    # Review')]
    # >>> jira_state = [num[0] for num in available if num[1] == 'Done' ][0]
    try:
        jira.transition_issue(issue, '31')
    except JIRAError as err:
        raise ApiError(
            'Jira Error', -32000,
            ('Polarion fields are updated but mark Jira manually to '
             '"Done", error:' + str(err))) from err

    # Add JIRA comment with gathered info if update is successful
    body = '\n'.join(
        str(key) + ': ' + str(value) for key, value in info_dict.items()
        if key not in ('PR', 'pol_id'))
    body += f'\nGithub: {owner}/{repo}/pulls/{info_dict["PR"]}'
    body += f'\nPolarion: {ENV.POLARION_URL}/#/project/{ENV.POLARION_PROJECT}/workitem?id={info_dict["pol_id"]}'  # noqa
    body += '\nGithub status is verified and Polarion fields are updated'

    try:
        jira.add_comment(issue, body)
    except JIRAError as err:
        raise ApiError(
            'Jira Error', -32000,
            ('Polarion and Jira fields are updated but unable to add Jira '
             'comment' + str(err))) from err

    return 'Github fields are validated, Jira and Polarion fields are updated'
def token_required(func)

Decorator to validate JWT token in http header.

Expand source code
def token_required(func):
    '''Decorator to validate JWT token in http header.'''
    @wraps(func)
    def decorated(*args, **kwargs):
        token = None

        if 'x-access-token' in request.headers:
            token = request.headers['x-access-token']

        if not token:
            return {'message': 'Token is missing!'}, 401

        try:
            data = jwt.decode(token, ENV.JWT_SECRET, algorithms=['HS256'])
            caller = data['sub']
        except jwt.exceptions.PyJWTError as excep:
            return {'message': str(excep)}, 401

        # All the functions using token will receive 'caller' as their first
        # argument
        return func(caller, *args, **kwargs)

    return decorated

Classes

class RequestFormatter (fmt=None, datefmt=None, style='%', validate=True)

Log requester's remote address and request resource url too in log.

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:str.format ({}) formatting or :class:string.Template formatting in your format string.

Changed in version: 3.2

Added the style parameter.

Expand source code
class RequestFormatter(logging.Formatter):
    '''Log requester's remote address and request resource url too in log.'''
    def format(self, record):
        if has_request_context():
            record.url = request.url
            record.remote_addr = request.remote_addr
        else:
            record.url = None
            record.remote_addr = None

        return super().format(record)

Ancestors

  • logging.Formatter

Methods

def format(self, record)

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Expand source code
def format(self, record):
    if has_request_context():
        record.url = request.url
        record.remote_addr = request.remote_addr
    else:
        record.url = None
        record.remote_addr = None

    return super().format(record)