Handle the action request to produce or consume events.
Processes the action specified in the request by calling either
action_produce
or action_consume
based on the action type.
Parameters:
request : ActionRequest
The action request object containing the details of the request.
auth : AuthState
The authentication state object containing identity and
authentication information of the requester.
Returns:
ActionCallbackReturn
The result of the action.
Source code in action_provider/main.py
| def action_run(
request: ActionRequest,
auth: AuthState,
) -> ActionCallbackReturn:
"""Handle the action request to produce or consume events.
Processes the action specified in the request by calling either
`action_produce` or `action_consume` based on the action type.
Parameters:
----------
request : ActionRequest
The action request object containing the details of the request.
auth : AuthState
The authentication state object containing identity and
authentication information of the requester.
Returns:
-------
ActionCallbackReturn
The result of the action.
"""
caller_id = auth.effective_identity
request_id = request.request_id
full_request_id = f'{caller_id}:{request_id}'
logger.info(f'Run endpoint is called with request_id = {full_request_id}')
logger.debug(f'new request = {request}')
prev_request = _get_request_from_dynamo(full_request_id)
if not prev_request:
# New action
action_status = perform_action(full_request_id, request, auth)
return action_status
logger.debug(f'old request = {prev_request}')
if prev_request['request'] != request.json():
raise ActionConflict(
f'Request {request_id} already present with different param. ',
)
prev_action_id = prev_request['action_id']
prev_status, _ = _get_status_request(prev_action_id)
if prev_status.status in (
ActionStatusValue.SUCCEEDED,
ActionStatusValue.FAILED,
):
return prev_status
# Unfinished action
action_status = perform_action(full_request_id, request, auth)
return action_status
|