-
Notifications
You must be signed in to change notification settings - Fork 0
4. API Overview
This wiki page is designed to present the public API of XCOFDK for Python by:
- remarks on the source code documentation:
- an overview of the API organized by currently available subsystems:
- a collection of hands-on information on selected parts of the API:
For a general description of the architecture and responsibility of the subsystems refer to the wiki page 3.-Architecture.
Content:
- 4. API Overview
- 4.1 Source Code Documentation
- 4.2 xmt - Subsystem Multithreading
- 4.3 rtecfg - Subsystem RTE Configuration
- 4.4 fwctrl - Subsystem Framework Control
- 4.5 xmsg - Subsystem Messaging
- 4.6 xmp - Subsystem Multiprocessing
- 4.7 Miscellaneous
The source code documentation of the public API of XCOFDK for Python is a comprehensive guide to the interfaces of the framework.
It is designed to serve as a single-sourced documentation of the API through public subpackages of package xcofdk:
-
subpacakge xcofdk.fwcom:
providing common and frequently used definitions, -
subpacakge xcofdk.fwapi:
the actual public API composed of the subpackages below:- subpacakge xcofdk.fwapi.apiif:
collection of public interface classes, - subpacakge xcofdk.fwapi.fwctrl:
providing API functions to control the framework, e.g. start or stop of the framework, or error reporting via logging, - subpacakge xcofdk.fwapi.rtecfg:
providing API functions for pre-start configuration of the framework, - subpacakge xcofdk.fwapi.xmt:
providing both multithreading and life-cycle management, - subpacakge xcofdk.fwapi.xmsg:
providing the subsystem of messaging, - subpacakge xcofdk.fwapi.xmp:
providing the subsystem of multiprocessing.
- subpacakge xcofdk.fwapi.apiif:
NOTE:
The source code documentation intensively uses doctest lines with a leading '>>>' for both:
- (highlighting of) embedded code snippet inside docstrings.
Modern IDE editors, e.g. PyCharm (Community Edition), are able to display them as expected,- cross referencing to existing documentation provided at some other place, e.g. to the respective API
documentation of the parent class.
Again, modern IDE editors are able to generate and display the corresponding hyperlink, too.By no means, however, such lines are designed for any kind of interactive example or (regression) testing,
as otherwise commonly intended when using Python developer tool doctest . Especially, as none of the
modules of the package xcofdk is designed to be executed as the main module '__main__'.
This is also true for the respective main module of the provided examples where the purpose of embedded
code snippet (if any) is basically to illustrate sample code or usage hints only, but never any kind of doctest-ing.
The public API of the subsystem multithreading provides all necessary interface classes and functions to create and run the task model of an application, that is a set of task instances which are designed according to the task abstraction specified by the architecture of XCOFDK. Hence, application tasks are the major subject of this subsystem.
The task classes are avaiable via two subpackages enabling convenient import of them:
-
xcofdk.fwapi
interface and concrete classes for rapid construction (RC), e.g.:
from xcofdk.fwapi import IRCTask
from xcofdk.fwapi import SyncTask -
xcofdk.fwapi.xmt
interface and abstract classes for subclassing (SC), e.g.:
from xcofdk.fwapi.xmt import IXTask
from xcofdk.fwapi.xmt import XTask
The class diagram below illustrates available task classes:
-
class diagram Task Classes:
-
RC classes:
SyncTask
AsyncTask
SyncCommTask
AsyncCommTask
MessageDrivenTask
XFSyncTask
XFAsyncTask
XFSyncCommTask
XFAsyncCommTask
XFMessageDrivenTask
-
SC abstract classes:
XTask
XMainTask
NOTE:
- The interface class ITask is common for all application tasks representing almost the complete API of a task.
- The interface classes IRCTask and IRCCommTask are specific for RC classes listed above.
- The interface class IXTask is specific for user-defined classes derived from SC abstract classes listed above.
- Most of the public API presented in this wiki page will focus on RC classes as the API of the subsystem
multithreading is either identical or quite similar regardless of the used approach for task construction
(i.e. rapid construction (RC) or subclassing (SC), respectively).- However, the API will also be discussed separately for both construction approaches wherever necessary.
The phased execution frame (or 3-PhXF for short) of a task instance to be created is specified by passing corresponding parameter(s) to the constructor of the respective task class explained in subsections below.
The task classes of this group are derived from either one of the classes RCTask or RCCommTask, both defined in module rctask.py:
- class SyncTask
- class AsyncTask
- class SyncCommTask
- class AsyncCommTask
- class MessageDrivenTask
The latter three classes are designed for tasks capable of full communication, that is tasks with support for an external message queue enabling them to be the destination of messages (see 3.1.5.1 General Pattern of Messaging). The corresponding parameters to specifiy the callback function(s) for the desired execution frame phase(s) are as follows:
| Parameter | : | Execution Frame Phase |
|---|---|---|
| runCallback_ | : | run phase |
| setupCallback_ | : | setup phase (optional) |
| teardownCallback_ | : | teardown phase (optional) |
| procExtMsgCallback_ | : | (sub-)phase for processing of external messages |
The code snippet below demonstrates the general prototypes of a callback function to be passed via the parameters above:
from xcofdk.fwapi import IMessage
from xcofdk.fwcom import EExecutionCmdID
from xcofdk.fwapi import IRCTask
from xcofdk.fwapi import IRCCommTask
def MySetup(*args_, **kwargs_) -> EExecutionCmdID:
# a callback function for setup phase.
pass
def MyRun(*args_, **kwargs_) -> EExecutionCmdID:
# a callback function for run phase.
pass
def MyTeardown() -> EExecutionCmdID:
# a callback function for teardown phase.
pass
def MyMsgProcessor(xmsg_ : IMessage) -> EExecutionCmdID:
# a callback function for (sub-)phase of external message processing.
pass
def MySetupByRef(myTsk_ : IRCTask | IRCCommTask, *args_, **kwargs_) -> EExecutionCmdID:
# same as 'MySetup()' above with 'myTsk_' refers to the task instance.
pass
def MyRunByRef(myTsk_ : IRCTask | IRCCommTask, *args_, **kwargs_) -> EExecutionCmdID:
# same as 'MyRun()' above with 'myTsk_' refers to the task instance.
pass
def MyTeardownByRef(myTsk_ : IRCTask | IRCCommTask) -> EExecutionCmdID:
# same as 'MyTeardown()' above with 'myTsk_' refers to the task instance.
pass
def MyMsgProcessorByRef(myTsk_ : IRCTask | IRCCommTask, xmsg_ : IMessage) -> EExecutionCmdID:
# same as 'MyMsgProcessor()' above with 'myTsk_' refers to the task instance.
passwith:
- EExecutionCmdID described below,
- the positional arguments args_ and/or keywork arguments kwargs_ (if any) are passed accordingly when the task instance is started (see ITask.Start()),
-
myTsk_ refers to the task instance (the callback function was specified for) currently being executed
(see IRCTask, IRCCommTask).
This type of callback function must be specified whenever the optional parameter bRefToCurTaskRequired_ passed to the constructor, e.g. SyncTask.__init__(), is set to True.
The task classes of this group have a leading XF in their name, also referred to as XF task classes. They are also derived from either one of the classes RCTask or RCCommTask, both defined in module rctask.py:
- class XFSyncTask
- class XFAsyncTask
- class XFSyncCommTask
- class XFAsyncCommTask
- class XFMessageDrivenTask
Again, the latter three classes are designed for tasks capable of full communication. The class (instance) is always specified by passing below paraemter to the constructor:
| Parameter | : | Execution Frame Phase |
|---|---|---|
|
phasedXFCallback_ |
: |
run phase setup phase (optional) teardown phase (optional) (sub-)phase for processing of external messages |
Here, the callback function phasedXFCallback_ must be a class (instance) with the class design is execution-frame-like (or XF-like for short). Such a class is exepcted to provide one or more pre-defined instance methods as depicted below:
from xcofdk.fwcom import EExecutionCmdID
from xcofdk.fwapi import IMessage
from xcofdk.fwapi import IRCTask
from xcofdk.fwapi import IRCCommTask
class MyExecutionFrameLikeClass:
def __init__(self, *args_, **kwargs_):
pass
# optional
def SetUpTask(self, *args_, **kwargs_) -> EExecutionCmdID:
pass
# mandatory, except for task instances created via class 'XFMessageDrivenTask'
def RunTask(self, *args_, **kwargs_) -> EExecutionCmdID:
pass
# optional
def TearDownTask(self) -> EExecutionCmdID:
pass
# optional, except for task instances created via class 'XFMessageDrivenTask'
def ProcessExternalMessage(self, xmsg_ : IMessage) -> EExecutionCmdID:
pass
# or if the reference to the task instance is required:
class MyExecutionFrameLikeClassByRef:
def __init__(self, *args_, **kwargs_):
pass
# optional
def SetUpTask(self, myTsk_ : IRCTask | IRCCommTask, *args_, **kwargs_) -> EExecutionCmdID:
pass
# mandatory, except for task instances created via class 'XFMessageDrivenTask'
def RunTask(self, myTsk_ : IRCTask | IRCCommTask, *args_, **kwargs_) -> EExecutionCmdID:
pass
# optional
def TearDownTask(self, myTsk_ : IRCTask | IRCCommTask) -> EExecutionCmdID:
pass
# optional, except for task instances created via class 'XFMessageDrivenTask'
def ProcessExternalMessage(self, myTsk_ : IRCTask | IRCCommTask, xmsg_ : IMessage) -> EExecutionCmdID:
pass
#...with EExecutionCmdID described below, args_, kwargs_ and myTsk_ explained in section 4.2.2.1 RC Classes for passing Callback Functions. Also, note that the parameter phasedXFCallback_ can be specified:
- either by an instance of the specified XF-like class
(e.g. an instance of class MyExecutionFrameLikeClassByRef above), - or by that XF-like class itself (see task template below).
Regardless of how phasedXFCallback_ is specified, the actual instance of the XF-like class is later accessible via task property IRCTask.phasedXFInstance once the task is started.
The use case of a user-defined XF-like class passed to an XF task class as callback function (see section 4.2.2.2 RC Classes for passing Class Instances) is especially valueable whenever a considerable amount of instances of a given XF task class is needed with the application wishes to be released from the otherwise necessary pre-creation or pre-management of the individual task instances to be started.
When a task instance created this way, e.g. _tsk shown below, is started, the framwork will create a new default instance of the specified XF-like class, e.g. MyExecutionFrameLikeClass2() below, and use that instance as if it were passed to the constructor of the XF task class, e.g. XFAsyncTask, instead:
from xcofdk.fwcom import EExecutionCmdID
from xcofdk.fwapi import IMessage
from xcofdk.fwapi import IRCTask
from xcofdk.fwapi import IRCCommTask
from xcofdk.fwapi import XFAsyncTask
class MyExecutionFrameLikeClass2:
def __init__(self, **kwargs_):
pass
# followed by the pre-defined instance methods as shown for class
# 'MyExecutionFrameLikeClass' or 'MyExecutionFrameLikeClassByRef' above
#...
_tsk = XFAsyncTask(MyExecutionFrameLikeClass2)
_tsk.Start()
#...In other words, given an XF task class T1 and a user-defined XF-like class T2, the terms:
-
task template:
referts to the general construct of T1(T2 [, ...]), -
task template specialization:
refers to a given specialization of the task template above by use of concrete classes for T1 and T2,
(e.g. as shown for the task instance _tsk in code snippet above), -
auto-instantiation of task template:
refers to the semantic of the Start() operation of a task template specialization explained above
(with the auto-created XF-like class instance is accessible through the supplementary property IRCTask.phasedXFInstance).
However, unless otherwise stated, the terms task template and task template specialization are used interchangeably for easier readability. Also, with regard to the full availability of task templates, note that as of current release of xcofdk-py:
- a given task template (specialization) can be started only once. That is, even though its associated callback function given as an instance of the specified XF-like class is auto-created by the framework when that task template is started, it effectively is still a single application task to be created in advance,
- work is in progress to make a task template (once created) can be used to create a new instance of the specified XF task class each time that task template is started via its specialized Start() operation.
The abstract classes XTask and XMainTask are available for the purpose of subclassing. Here, the callback functions of the execution frame are pre-defined instance methods of class XTask, which are slightly differently named, but basically much like the execution frame of RC classes for passing class instances.
The code snippet below illustrates the respective instance methods to be overriden and implemented by a derived class to specify the desired exectuion frame of an instance of that class:
from xcofdk.fwcom import override
from xcofdk.fwcom import EExecutionCmdID
from xcofdk.fwapi import IMessage
from xcofdk.fwapi.xmt import XTask
from xcofdk.fwapi.xmt import XTaskProfile
class MyDerivedTask(XTask):
def __init__(self, taskProfile_ : XTaskProfile =None, *args_, **kwargs_):
super().__init__(taskProfile_=taskProfile_)
pass
# optional (if configured by task profile)
@override
def SetUpXTask(self, *args_, **kwargs_) -> EExecutionCmdID:
pass
# mandatory, except for task instances configured to support blocking external queue
@override
def RunXTask(self, *args_, **kwargs_) -> EExecutionCmdID:
pass
# optional (if configured by task profile)
@override
def TearDownXTask(self) -> EExecutionCmdID:
pass
# optional (if configured to support non-blocking external queue),
# except for task instances configured to support blocking external queue
@override
def ProcessExternalMessage(self, xmsg_ : IMessage) -> EExecutionCmdID:
pass
# followed by task- and/or application-specific extensions or customizations
# planned for this task class
#...with:
- EExecutionCmdID described below,
- the positional arguments args_ and/or keywork arguments kwargs_ (if any) are passed accordingly when the task instance is started (see XTask.Start()),
- XTaskProfile explained in more detail below.
In conformity with the specification of 3-PhXF the specified callback functions are expected to use pre-defined command IDs of the Enum class EExecutionCmdID as return value in order to determine the desired program flow from within the execution frame phase at hand. The semantic of the IDs is exactly the same as specified by the architecture (see also the activity diagram Phased Execution Flow):
-
EExecutionCmdID.CONTINUE:
if returned from within the setup phase, then continue with the run phase,
if returned from within a cyclic run phase, then continue with the next iteration,
if returned from within a messaging (sub-)phase, then continue with the next message,
same meaning as for EExecutionCmdID.STOP (see below) otherwise. -
EExecutionCmdID.STOP:
if returned from within the run phase, then continue with the teardown phase (if configured),
stop the execution indicating a normal and complete termination otherwise. -
EExecutionCmdID.CANCEL:
cancel the execution indicating a normal, but incomplete termination.
Note that canceling a task is reserved for use by applications only. The framework does not ever use it, as it is designed to provide applications with their own logical and/or functional attribution of a normal termination which they may wish to rather consider incomplete. -
EExecutionCmdID.ABORT:
abort the execution indicating an abnormal termination.
Alternatively, the static method EExecutionCmdID.FromBool() may also be used with below mapping based on the boolean value passed to:
- True -> EExecutionCmdID.CONTINUE
- False -> EExecutionCmdID.STOP
- None -> EExecutionCmdID.ABORT
Instances of class XTaskProfile are used for the configuration of task instances (see general discussion in 3.1.1.3 Task Configuration). The bullet points below illustrates the related class diagram followed by a few remarks:
-
class diagram of XTaskProfile which implements its interface class ITaskProfile:
-
a task instance is always associated with its own instance of XTaskProfile,
-
once a task instance is created, its associated task profile is a read-only copy of the task profile instance used to create that task instance and cannot be modified anymore (see ITaskProfile.isFrozen),
-
for RC classes the task profile is auto-created by the framework based on the corresponding optional parameters passed to the constructor, e.g. for the asynchronous task _mainTsk as shown below:
- setupCallback_
- aliasName_ (see ITask.aliasName())
- bMainTask_ (see IRCTask.isMainTask())
from xcofdk.fwapi import AsyncTask def MySetup(startParam_ : int =None) -> EExecutionCmdID: pass def MyRun() -> EExecutionCmdID: pass _mainTsk = AsyncTask(MyRun, setupCallback_=MySetup, aliasName_='MainTask', bMainTask_=True) #...
-
for user-defined classes derived from SC abstract classes:
-
the task profile is either auto-created by the framework or created by the application and passed to the constructor of the derived class. For more convenient, there are also two staticmethods available as shorthand for creation of task profile instances:
Once such a SC task instance is created, its task profile is accessible via the property IXTask.taskProfile. The code snippet below demonstrates a typical example:
from xcofdk.fwcom import override from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi.xmt import XTask from xcofdk.fwapi.xmt import XTaskProfile # user-defined task class derived from 'XTask' class MyServiceTask(XTask): def __init__(self, taskProfile_ : XTaskProfile =None): super().__init__(taskProfile_=taskProfile_) @override def RunXTask(self) -> EExecutionCmdID: pass #... # task profile will be auto-created by the framework _srvAA = MyServiceTask() # task profile created and configured by the application using property setters _tp = XTaskProfile() _tp.aliasName = 'ServiceBB' _tp.runPhaseFrequencyMS = 60 # and passed to the task constructor _srvBB = MyServiceTask(taskProfile_=_tp) #...
-
whenever a task profile instance is created, all its properties are assigned with their respective default value. Applications may then opt to modify the default accordingly:
from xcofdk.fwapi import xlogif from xcofdk.fwapi.xmt import XTaskProfile _tp_ = XTaskProfile() xlogif.LogInfo(f'A default task profile instance:\n{_tp_}') # the output of the log entry above would look like: # [09:32:55.333 XINF] A default task profile instance: # Task profile : # aliasName : None # isMainTask : False # isSyncTask : False # isPrivilegedTask : False # isRunPhaseEnabled : True # isSetupPhaseEnabled : False # isTeardownPhaseEnabled : False # isCyclicRunPhase : True # isInternalQueueEnabled : False # isExternalQueueEnabled : False # isExternalQueueBlocking : False # runPhaseFrequencyMS : 100 # runPhaseMaxProcessingTimeMS : 50 # this task profile is supposed to be used for applications's main task # (i.e. the singleton of class 'XMainTask') _tp_.isMainTask = True #...
-
The public API of the subsystem multithreading related to creating new task instances is one of the few operations which can be done even before the runtime environment (RTE) is available, i.e. started. The following subsections present the API of task instantiation in detail.
The implementation of XCOFDK as presented by xcofdk-py currently supports instances of Python class threading.Thread as host thread instances which provide the execution context of a task instanae behind the scenes. Accordingly, GUI frameworks, e.g. tkinter, based on the supported host threads are supported, too.
Consequently, as the subsystem multithreading represents the foundation of the framework of xcofdk-py, the application code of a Python program supposed to use the API is expected to be running within the execution context of a supported host thread at runtime, especially when the RTE is started (see subsystem fwctrl). For the majority of Python applications this is always the case, except for programs using other (GUI) frameworks with their own threading concept and related classes.
Available RC classes for synchronous tasks are as follows:
The code snippet below demonstrates the instantiation of a synchronous task, i.e. _syncTsk, with its phased execution frame specified by the callback functions MyRun and MySetup passed to the constructor:
from xcofdk.fwapi import SyncTask
def MySetup(someParam_ : list) -> EExecutionCmdID:
pass
def MyRun() -> EExecutionCmdID:
pass
# create a single-cycle sync. task
_syncTsk = SyncTask(MyRun, setupCallback_=MySetup, aliasName_='MySyncService')
#...Available RC classes for asynchronous tasks are as follows:
The code snippet below demonstrates the instantiation of an asynchronous task, i.e. _asyncTsk, with its phased execution frame specified by the callback functions MyRun and MyTeardown passed to the constructor:
from xcofdk.fwapi import AsyncTask
def MyRun(someParam_ : list) -> EExecutionCmdID:
pass
def MyTeardown() -> EExecutionCmdID:
pass
# create a single-cycle async. task
_asyncTsk = AsyncTask(MyRun, teardownCallback_=MyTeardown, aliasName_='AsyncSrv_')
#...For single-cycle tasks, also referred to as non-cyclic tasks, the run phase is executed only once. The listing below points out a few remarks with regard to instantiation of single-cycle tasks:
- RC tasks (be synchronous or asynchronous) are configured non-cyclic by default,
- also, whenever the optional parameter runCallbackFrequency_ passed to the constructor is set to 0, the task instance to be created will be configured non-cyclic,
- for message driven tasks, however, runCallbackFrequency_ is not available, as they always are non-cyclic and message driven by definition.
For cylic tasks the run phase is executed cyclically in a frequency specified by the optional parameter runCallbackFrequency_ passed to the constructor. The listing below points out a few remarks with regard to instantiation of cyclic tasks:
- the value specified by runCallbackFrequency_ must be either a positive integer number, i.e. int, or a positive floating-point number, i.e. float,
- the unit of the frequency is taken as millisecond [ms] if an integer number passed to, as second [s] otherwise,
- for message driven tasks, however, runCallbackFrequency_ is not available, as they always are non-cyclic and message driven by definition,
- the code snippet below demonstrates how to create a cyclic task instance:
from xcofdk.fwapi import SyncTask from xcofdk.fwapi import AsyncTask def MySyncRun(someParam_ : dict) -> EExecutionCmdID: pass def MyAsyncRun(someParam_ : dict, optParam_ : float =None) -> EExecutionCmdID: pass # create a cyclic sync. task with the frequency of the run phase set to 100 [ms] _syncTsk2 = SyncTask(MySyncRun, aliasName_='CyclicSyncService', runCallbackFrequency_=100) # create a cyclic async. task with the frequency of the run phase set to 60 [ms] _asyncTsk2 = AsyncTask(MyAsyncRun, aliasName_='CyclicAsyncSrv_', runCallbackFrequency_=60) #...
All task instances can send messages (see 3.1.5.1 General Pattern of Messaging), provided the designated destination is capable of receiving messages. The related task operations to send a message are as follows:
The receiver task must be configured to support an external queue which may be blocking or non-blocking. While SC task instances must be configured via the property setter ITaskProfile.isExternalQueueEnabled of their task profile in order to be able to receive messages, below classes are available for RC tasks to achieve the same:
Hence, the term full communication task refers to a task instance capable of both sending and receiving messages. When instantiating an RC task for full communication, the constructor paraemter to be used depends on the specification of the phased exectuion frame:
-
PhXF specified by callback functions:
with the mandatory parameter procExtMsgCallback_ of the RC task classes below shall be used:The code snippet below demonstrates an example with messages delivered to the task _syncCommTsk are each presented to the task by the callback function MyMsgProcessor() which is called by the framework before the run phase MyRun() is going to be executed next:
from xcofdk.fwapi import IMessage from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCCommTask from xcofdk.fwapi import SyncCommTask def MyRun(myTsk_ : IRCCommTask, customerID_ : int) -> EExecutionCmdID: pass def MyMsgProcessor(myTsk_ : IRCCommTask, xmsg_ : IMessage) -> EExecutionCmdID: pass # create a cyclic sync. full-comm task _syncCommTsk = SyncCommTask(MyRun, MyMsgProcessor, bRefToCurTaskRequired_=True, runCallbackFrequency_=80) #...
-
PhXF specified by class (instance):
with the mandatory parameter phasedXFCallback_ of the RC task classes below shall be used:The code snippet below demonstrates an example with messages delivered to the task _asyncCommTsk are each presented to the task by the instance method ProcessExternalMessage() which is called by the framework before the run phase MyRun() is going to be executed next:
from xcofdk.fwapi import IMessage from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCCommTask from xcofdk.fwapi import XFAsyncCommTask class MyXFLikeClass: def __init__(self, someParam_ : int): pass def RunTask(self, myTsk_ : IRCCommTask, customerID_ : int) -> EExecutionCmdID: pass def ProcessExternalMessage(self, myTsk_ : IRCCommTask, xmsg_ : IMessage) -> EExecutionCmdID: pass # create PhXF instance to be passed to task's constructor _myXF = MyXFLikeClass(4711) # create a cyclic async. full-comm task passing '_myXF' to the constructor _asyncCommTsk = XFAsyncCommTask(_myXF, bRefToCurTaskRequired_=True, runCallbackFrequency_=80) #...
However, note that:
- in case of non-cyclic tasks, a continuous processing of queued messages (if any) also requires the receiver tasks to trigger the related procedure themselves (see section 4.2.7.2 Triggering Message Processing),
- for message driven tasks the run phase is differently defined.
Message driven tasks are full communication tasks with four specific features:
- the run phase is only given by the callback for processing external messages,
- the underlying external queue for received messages is configured to be blocking
(see IRCCommTask.isExternalQueueBlocking and ITaskProfile.isExternalQueueBlocking).
Hence, their run phase is in waiting mode as long as there are no new messages to be processed, so it is blocked.
But, as soon as new messages are received, the specified callback is repeatedly called for the number of currently queued messages, - they need a separate polling mechanism to ensure that they can be stopped (if still running) in the course of framework's coordinated shutdown,
- accordingly, they are always non-cyclic, asynchronous tasks.
Avaiable RC classes for message driven tasks are:
-
MessageDrivenTask: with the mandatory constructor parameter procExtMsgCallback_ specifies both the run phase and the callback function for processing of delivered messages. The code snippet below demonstrates an example:
from xcofdk.fwapi import IMessage from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCCommTask from xcofdk.fwapi import MessageDrivenTask def MyMsgProcessor(myTsk_ : IRCCommTask, xmsg_ : IMessage) -> EExecutionCmdID: pass # create a message driven task with default polling frequency of 100 [ms] _mdTsk = MessageDrivenTask(MyMsgProcessor, aliasName_='JobServer', bRefToCurTaskRequired_=True) #...
-
XFMessageDrivenTask: with the instance method ProcessExternalMessage() of the mandatory constructor parameter phasedXFCallback_ specifies both the run phase and the callback for processing of delivered messages. The code snippet below demonstrates an example:
from xcofdk.fwapi import IMessage from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCCommTask from xcofdk.fwapi import XFMessageDrivenTask class MyXFLikeClass: def __init__(self, initParam_ : int): pass def MySetUp(self, myTsk_ : IRCCommTask, startParam_ : float) -> EExecutionCmdID: pass def ProcessExternalMessage(self, myTsk_ : IRCCommTask, xmsg_ : IMessage) -> EExecutionCmdID: pass # create PhXF instance to be passed to task's constructor _myXF = MyXFLikeClass(4711) # create a message driven task with polling frequency set to 200 [ms] _mdTsk = XFMessageDrivenTask(_myXF, aliasName_='XFJobServer', bRefToCurTaskRequired_=True, pollingFrequency_=200) #...
As mentioned in section Task Classes, the interface class ITask is common for all task instances. It also provides most of the API available for tasks, while its derived classes IRCTask, IRCCommTask and IXTask each provide a few additional API specific to RC tasks or SC tasks, respectively.
The complete API of a task instance is a rich set of attributes and functions covering both task properties for data abstraction and task operations for operational abstraction. But in practice, an application normally uses only a relatively small subset of the whole API depending on its (functional) requirements. This subsection discusses available task properties, while the operational abstraction is presented in subsection 4.2.7 Task Operations.
NOTE:
The subsections 4.2.6 Task Properties and 4.2.7 Task Operations present only a selection of the API of
task instances.
For the complete API refer to the respectivve source code documentation of iftask.py, ifrctask.py and
ifxtask.py.
The general task properties include below API:
-
isSyncTask:
returns True if the task instance is configured to be executed synchronously, False otherwise. -
isFirstRunPhaseIteration:
returns True if it is the initial iteration of the run phase being executed right now, False otherwise (see also task operation Start()). -
taskUID:
returns the auto-generated unique ID of the task if started, None otherwise. -
taskName:
returns the auto-generated string object representing unique name of this instance if started, task's alias name (see below) otherwise. -
aliasName:
returns the alias name of the task based on the parameter aliasName_ passed to the constructor.
The alias name is an arbitrary, non-empty and printable string literal without spaces, it optionally may end with a trailing '_':-
if None was passed to, then:
- Tsk_<INST_NO> for RC tasks without full communication support,
- CTsk_<INST_NO> for RC tasks with full communication support,
- XTsk_<INST_NO> for SC tasks without full communication support,
- CXTsk_<INST_NO> for SC tasks with full communication support,
with:
- INST_NO is the unique instance number of the task (see taskCompoundUID below),
- C stands for capable of full communication,
-
otherwise, the alias name which was passed to the constructor,
with the same rule regarding a trailing underscore, i.e. '_', goes here, too.
-
-
taskCompoundUID:
returns an auto-generated namedtuple object with below immutable fields:- uid, unique ID of the task (see taskUID above),
- instNo, unique, 1-based integer value representing the instance number auto-assigned to the task.
-
currentRunPhaseIterationNo:
returns -1 if the run phase of the task has not been entered yet, 0-based current iteration number otherwise.
Note that for non-cyclic tasks (if started) the value returned will always be 0.
The general description of task states is presented in the sections 3.1.1.1 Task Execution Frame and 3.1.1.5 Task Execution State. A subset of task states applications may frequently use is as follows (see also the state diagram Task State Machine):
-
isAttachedToFW:
returns True if the task is (still) attached to the framework, False otherwise.
Once correctly constructed a task remains attached to the framework until it is terminated (see isTerminated below) or requested to be detached from the framework (see task operation DetachFromFW()). -
isStarted:
returns True if the task has been started (see task operation Start()), False otherwise. -
isRunning:
returns True if the task is started (see isStarted above), but not terminated or terminating (see isTerminated and isTerminating below), False otherwise.
In other words, a task is in this state as soon as it enters its 3-PhXF, that is its setup phase (if configured) or run phase. And, it remains in this state as long as the run phase is not left. -
isTerminated:
returns True if one of the task properties isDone or isCanceled or isFailed below resolves to True, False otherwise. -
isDone:
returns True if the task has finished its execution upon normal termination, False otherwise.
A task particularly reaches this state upon a request to Stop(), or if it intentionally requested to do so by returning EExecutionCmdID.STOP out of its 3-PhXF. An example:from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCTask from xcofdk.fwapi import xlogif from xcofdk.fwapi import SyncTask def RunCB(myTsk_ : IRCTask) -> EExecutionCmdID: xlogif.LogInfo(f'Entered run phase of {myTsk_.aliasName}: isRunning={myTsk_.isRunning}') return EExecutionCmdID.STOP def Main(): fwapi.StartXcoFW() # create and start a non-cyclic sync. task _syncTsk = SyncTask(RunCB, aliasName_='SyncTask', bRefToCurTaskRequired_=True) _syncTsk.Start() _msg = f'{_syncTsk.aliasName} terminated:\n\tisDone={_syncTsk.isDone} ' _msg += f'isCanceled={_syncTsk.isCanceled} isFailed={_syncTsk.isFailed}' xlogif.LogInfo(_msg) fwapi.JoinXcoFW() return 0 # When executed, the output of 'Main()' should look like below: # [14:23:13.879 XINF][Tsk_501001] Entered run phase of SyncTask: isRunning=True # [14:23:13.880 XINF][Tsk_501001] SyncTask terminated: # isDone=True isCanceled=False isFailed=False
-
isCanceled:
returns True if the task has finished its execution due to a Cancel() request (or if it intentionally requested to do so by returning EExecutionCmdID.CANCEL out of its 3-PhXF), False otherwise. An example:from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCTask from xcofdk.fwapi import xlogif from xcofdk.fwapi import SyncTask def RunCB(myTsk_ : IRCTask) -> EExecutionCmdID: xlogif.LogInfo(f'Entered run phase of {myTsk_.aliasName}: isRunning={myTsk_.isRunning}') return EExecutionCmdID.CANCEL def Main(): fwapi.StartXcoFW() # create and start a non-cyclic sync. task _syncTsk = SyncTask(RunCB, aliasName_='SyncTask', bRefToCurTaskRequired_=True) _syncTsk.Start() _msg = f'{_syncTsk.aliasName} terminated:\n\tisDone={_syncTsk.isDone} ' _msg += f'isCanceled={_syncTsk.isCanceled} isFailed={_syncTsk.isFailed}' xlogif.LogInfo(_msg) fwapi.JoinXcoFW() return 0 # When executed, the output of 'Main()' should look like below: # [14:35:33.692 XINF][Tsk_501001] Entered run phase of SyncTask: isRunning=True # [14:35:33.692 XINF][Tsk_501001] SyncTask terminated: # isDone=False isCanceled=True isFailed=False
-
isFailed:
returns True if the task has finished its execution upon abnormal termination, False otherwise.
A task reaches this state if and only if the execution of its 3-PhXF causes a qualified fatal error, or if it intentially requests the framework to abort its execution by returning EExecutionCmdID.ABORT out of its 3-PhXF (see also the discussion on the special task state aborting in section 3.1.1.5 Task Execution State). An example:from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCTask from xcofdk.fwapi import xlogif from xcofdk.fwapi import SyncTask from xcofdk.fwapi import AsyncTask def RunCB(myTsk_ : IRCTask) -> EExecutionCmdID: xlogif.LogInfo(f'Entered run phase of {myTsk_.aliasName}: isRunning={myTsk_.isRunning}') return EExecutionCmdID.ABORT def Main(bSyncTask_ =False): fwapi.StartXcoFW() # create and start a non-cyclic (a)sync. task if bSyncTask_: _tsk = SyncTask(RunCB, aliasName_='MySyncTask', bRefToCurTaskRequired_=True) else: _tsk = AsyncTask(RunCB, aliasName_='MyAsyncTask', bRefToCurTaskRequired_=True) _tsk.Start() _msg = f'{_tsk.aliasName} terminated:\n\tisDone={_tsk.isDone} ' _msg += f'isCanceled={_tsk.isCanceled} isFailed={_tsk.isFailed}' xlogif.LogInfo(_msg) fwapi.JoinXcoFW() return 0 # When executed, the output of 'Main()' should look like below: # [14:38:14.421 XINF][Tsk_501001] Entered run phase of MyAsyncTask: isRunning=True # [14:38:14.553 XINF][MainThread] MyAsyncTask terminated: # isDone=False isCanceled=False isFailed=True # # -------------------------------------------------------------------------------- # Resulted LC status : FAILED # ----- LcState : LcStopped , TMgrStopped , FwMainStopped , XTaskFailed # ----- FRC error code : -10023 # ----- # ----- FRC: [UID:501001][COMP:XTask][EC:-10023][Tsk_501001:501001] # ----- [14:38:14.422 FTL:-10023][Tsk_501001] [FwThrd][Tsk_501001][#xc=0] 3-PhXF callback function (of run-phase) returned ABORT. # ----- [Callstack]: # ...
-
isTerminating:
returns True if one of the task properties isStopping or isCanceling below resolves to True, False otherwise. -
isStopping:
returns True as soon as the task leaves its running state (see isRunning above) due to a Stop() request (or by EExecutionCmdID.STOP as described above), False otherwise (see also section 3.1.1.5.1 Asymmetric State Transition)). An example:from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCTask from xcofdk.fwapi import xlogif from xcofdk.fwapi import SyncTask def RunCB(myTsk_ : IRCTask) -> EExecutionCmdID: xlogif.LogInfo(f'Entered run phase of {myTsk_.aliasName}: isRunning={myTsk_.isRunning}') return EExecutionCmdID.STOP def TeardownCB(myTsk_ : IRCTask) -> EExecutionCmdID: _msg = f'Entered teardown phase of {myTsk_.aliasName}:\n\tisRunning={myTsk_.isRunning} ' _msg += f'isStopping={myTsk_.isStopping} isCanceling={myTsk_.isCanceling}' xlogif.LogInfo(_msg) return EExecutionCmdID.STOP def Main(): fwapi.StartXcoFW() # create and start a non-cyclic sync. task _syncTsk = SyncTask(RunCB, teardownCallback_=TeardownCB, aliasName_='SyncTask', bRefToCurTaskRequired_=True) _syncTsk.Start() _msg = f'{_syncTsk.aliasName} terminated:\n\tisDone={_syncTsk.isDone} ' _msg += f'isCanceled={_syncTsk.isCanceled} isFailed={_syncTsk.isFailed}' xlogif.LogInfo(_msg) fwapi.JoinXcoFW() return 0 # When executed, the output of 'Main()' should look like below: # [15:08:31.808 XINF][Tsk_501001] Entered run phase of SyncTask: isRunning=True # [15:08:31.810 XINF][Tsk_501001] Entered teardown phase of SyncTask: # isRunning=False isStopping=True isCanceling=False # [15:08:31.811 XINF][Tsk_501001] SyncTask terminated: # isDone=True isCanceled=False isFailed=False
-
isCanceling:
returns True as soon as the task leaves its running state (see isRunning above) due to a Cancel() request (or by EExecutionCmdID.CANCEL as described above), False otherwise (see also section 3.1.1.5.1 Asymmetric State Transition)). An example:from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCTask from xcofdk.fwapi import xlogif from xcofdk.fwapi import SyncTask def RunCB(myTsk_ : IRCTask) -> EExecutionCmdID: xlogif.LogInfo(f'Entered run phase of {myTsk_.aliasName}: isRunning={myTsk_.isRunning}') return EExecutionCmdID.CANCEL def TeardownCB(myTsk_ : IRCTask) -> EExecutionCmdID: _msg = f'Entered teardown phase of {myTsk_.aliasName}:\n\tisRunning={myTsk_.isRunning} ' _msg += f'isStopping={myTsk_.isStopping} isCanceling={myTsk_.isCanceling}' xlogif.LogInfo(_msg) return EExecutionCmdID.STOP def Main(): fwapi.StartXcoFW() # create and start a non-cyclic sync. task _syncTsk = SyncTask(RunCB, teardownCallback_=TeardownCB, aliasName_='SyncTask', bRefToCurTaskRequired_=True) _syncTsk.Start() _msg = f'{_syncTsk.aliasName} terminated:\n\tisDone={_syncTsk.isDone} ' _msg += f'isCanceled={_syncTsk.isCanceled} isFailed={_syncTsk.isFailed}' xlogif.LogInfo(_msg) fwapi.JoinXcoFW() return 0 # When executed, the output of 'Main()' should look like below: # [15:46:27.424 XINF][Tsk_501001] Entered run phase of SyncTask: isRunning=True # [15:46:27.426 XINF][Tsk_501001] Entered teardown phase of SyncTask: # isRunning=False isStopping=False isCanceling=True # [15:46:27.428 XINF][Tsk_501001] SyncTask terminated: # isDone=False isCanceled=True isFailed=False
-
isAborting:
[DEPRECATED] This task state property is still present for backward compatibily reason only, so it will always return False.
In addition to the general task properties, below additional properties are available, too:
-
ITask
- isErrorFree
- isFatalErrorFree
- currentError (see also section 4.2.9 Current Task Error)
- IRCTask
-
IRCCommTask
-
isExternalQueueBlocking
(see also sections 4.2.5.6 Full Communication Tasks and 4.2.5.6.1 Message Drive Tasks)
-
isExternalQueueBlocking
- IXTask
This section presents available task operations as part of the operational abstraction of task instances.
When starting a task, e.g. _tsk shown below, the start parameter(s) to be passed to _tsk.Start() must correspond to the signature of the callback functions used to specify the phased execution frame (3-PhXF) of _tsk with its execution type synchronous or asynchronous doesn't matter. A few examples below illustrate possible common use cases:
-
no start parameter(s) required:
from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import SyncTask def RunCB() -> EExecutionCmdID: # do some work #... return EExecutionCmdID.STOP # create a non-cyclic sync. task _tsk = SyncTask(RunCB)
Here, there is nothing to be passed to when later starting the task, i.e. _tsk.Start().
-
start parameter(s) required by setup phase:
from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import SyncTask def SetupCB(startParamA_: int, startParamB_ : list =None) -> EExecutionCmdID: # do some initialization stuff #... return EExecutionCmdID.CONTINUE def RunCB() -> EExecutionCmdID: # do some work #... return EExecutionCmdID.STOP # create a non-cyclic sync. task _tsk = SyncTask(RunCB, setupCallback_=SetupCB)
Here, when later starting the task, the positional parameter startParamA_ must be passed to (optionally followed by startParamB_), e.g. _tsk.Start(4711) or _tsk.Start(98765, startParamB_=[1, 2, 3]).
-
start parameter(s) required by run phase:
from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCTask from xcofdk.fwapi import AsyncTask def RunCB(myTsk_ : IRCTask, startParamA_: int, startParamB_ : list =None) -> EExecutionCmdID: if myTsk_.isFirstRunPhaseIteration: # do some initialization stuff pass return EExecutionCmdID.CONTINUE if myTsk_.currentRunPhaseIterationNo < 100: # do normal work pass return EExecutionCmdID.CONTINUE # stop the task return EExecutionCmdID.STOP # create a cyclic async. task _tsk = AsyncTask(RunCB, bRefToCurTaskRequired_=True, runCallbackFrequency_=75)
Here, starting the task is much like the use case of setup phase shown above, that is the positional parameter startParamA_ must be passed to (optionally followed by startParamB_), e.g. _tsk.Start(0xBEEF) or _tsk.Start(98765, startParamB_=[-1, -2, -3]).
However, for cyclic tasks the task property isFirstRunPhaseIteration can be used to determine whether some initialization routine is necessary.
The task operation ITask.Stop() is always a non-blocking request. It simply notifies the task about the stop request and returns (see also the state diagram Task State Machine). Also note that:
- requests to stop a task which is in a transitional state of termination (see isTerminating) already are ignored by the framework,
- detaching a task from the framework (see DetachFromFW() below) causes that task to be stopped by the framework as well (if still running),
- upon initiating the shutdown sequence, the framework always asks running tasks to stop their execution via this API function, but never via the API function Cancel() below.
See alos task state property isStopping.
The task operation ITask.Cancel() is quite similar to the above-mentioned Stop() operation, except that the framework never makes any use of it. Its major purpose is to provide applications with their own logical and/or functional attribution of the execution result of a task which isTerminated, but not isFailed. Such an attribution by an application might look like below:
-
DONE as result of Stop() or EExecutionCmdID.STOP:
finished or stopped the job as expected, or -
CANCELED as result of Cancel() or EExecutionCmdID.CANCEL:
not failed, but broken for whatever application-specific reason.
See alos task state property isCanceling.
This task operation is used, whenever the application wishes to wait for the termination of a task (see isTerminated) in a blocking manner:
from xcofdk.fwcom import EExecutionCmdID
from xcofdk.fwapi import AsyncTask
def RunCB() -> EExecutionCmdID:
# do some work
#...
return EExecutionCmdID.STOP
def StartAndWaitForTask():
# create and start a non-cyclic async. task
_tsk = AsyncTask(RunCB)
_tsk.Start()
# wait for task's terminatiion for a max. wait time of 1.5 [s]
_tsk.Join(maxWaitTime_=1500)
if not _tsk.isTerminated:
#...
passwith:
- requests to join a task which isTerminated already are ignored by the framework,
- attempts to join a task from within its own 3-PhXF are ignored by the framework, too,
- otherwise, the requester is blocked (for the specified wait time) until the task isTerminated,
- the optional parameter maxWaitTime_ explained in source code documentation of ITask.Join().
As explained for isAttachedToFW, task instances are considered attached to the framework once correctly constructed. Accordingly, as soon as a task isTerminated, its task state becomes isDetachedFromFW, too.
The main purpose of detaching a task from the framework is to release the application or system resources used for that task. Note that:
- detaching is always done automatically by the framework upon termination of tasks,
- use of this API operation by intention and before task's termination, is much like requesting the framework to stop the task (if applicable),
- as soon as a task is detached from the framework, it won't be able to use the full set of its reqular API anymore. Its task state will be set in accordance to the task state right after the internal handling of the request is completed, e.g. isStopping, and not updated anymore.
-
Task-owned Data:
API functions to provide both getter and setter access to task-owned or user data (if any).
The framework provides the thread-safa and synchronized access mechanism of user data only, it never makes any use of these API functions on its own: -
Self-check:
API functions which applications can use to ensure that the task state is still up to date, especially after a time-consuming operation:The return value of the operation is:
- True if the task is not started yet or its state property isRunning resolves to True,
- False otherwise.
Also note, if called for a task which is not the currently running task, then the last available result of ITask.SelfCheck() of that task will be returned. An example:
from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCTask from xcofdk.fwapi import AsyncTask def RunCB(myTsk_ : IRCTask) -> EExecutionCmdID: # do some time-consuming work #... # perform a self-check, and go to sleep for 20 [ms] if still running if not myTsk_.SelfCheckSleep(20): # task is not running anymore (for whatever reason) return EExecutionCmdID.CANCEL if myTsk_.isCanceling else EExecutionCmdID.STOP # keep going with the run phase #... def StartMyTask(): # create and start a non-cyclic async. task _tsk = AsyncTask(RunCB, bRefToCurTaskRequired_=True) _tsk.Start() #...
-
Triggering Message Processing:
For non-cyclic tasks or after time-consuming operations, the API functions below can be used to manually trigger the processing of queued messages, that is to make the corresponding callback function of task's 3-PhXF is called for each received message:However, any call to this API function will be ignored if the task is processing messages already, especially for message driven tasks. An example:
from xcofdk.fwapi import IMessage from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IRCCommTask from xcofdk.fwapi import AsyncCommTask def RunCB(myTsk_ : IRCCommTask) -> EExecutionCmdID: # do some time-consuming work #... # trigger processing of all messages queued so far (if any) myTsk_.TriggerExternalQueueProcessing() # perform a self-check if not myTsk_.SelfCheck(): return EExecutionCmdID.CANCEL if myTsk_.isCanceling else EExecutionCmdID.STOP # keep going with the run phase (returning STOP if necessary) #... # NOTE: # 'MsgProcessorCB()' is called for each received message before next time 'RunCB()' is called return EExecutionCmdID.CONTINUE def MsgProcessorCB(myTsk_ : IRCCommTask, xmsg_ : IMessage) -> EExecutionCmdID: # process received message #... def StartMyTask(): # create and start a cyclic async. full-comm task _tsk = AsyncCommTask(RunCB, MsgProcessorCB, bRefToCurTaskRequired_=True, runCallbackFrequency_=100) _tsk.Start() #...
The term current running task refers to the task instance of class RCTask or XTask, respectively, which is currently being executed. There are two module functions to get access to:
-
GetCurTask()
returns a reference to the instance of IRCTask being executed right now if called out of the 3-PhXF of that RC task instance, None otherwise.
Note also that the task state of the returned task is isRunning unless the call was made out of the teardown phase. An example:from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import xlogif from xcofdk.fwapi import GetCurTask from xcofdk.fwapi import SyncTask def MyRun() -> EExecutionCmdID: _myTsk = GetCurTask() xlogif.LogInfo(f'Entered run phase of task: {_myTsk.aliasName}') return EExecutionCmdID.STOP def StartMyTask(): # create and start a non-cyclic sync. task _tsk = SyncTask(RunCB) _tsk.Start() #...
-
GetCurXTask()
similar to GetCurTask() above, but for SC task instances inherited from the abstract class XTask. Its import instruction looks like below:from xcofdk.fwapi.xmt import GetCurXTask
One of the important aspects of current running task is the fact of representing the lifecycle entity at that time as explained in 3.1.3 Lifecycle Management. Hence, with respect to error handling, current running task is particularly representing the lifecycle, too.
For a given task instance, the task property ITask.currentError returns None if that task is currently error-free, or a reference to an instance of the interface class ITaskError which represents the current error of a given task otherwise:
Accordingly and as required by the procedure of qualification of errors (specified by the architecture), there are additional properties and operations available for task instances, they are basically related to the possible resolution of current task error (if any) by applications:
- ITask.isErrorFree
- ITask.isFatalErrorFree
- ITask.ClearCurrentError()
- ITask.SetError()
- ITask.SetErrorEC()
- ITask.SetFatalError()
- ITask.SetFatalErrorEC()
Note also that:
- the above-mentioned operations will simply be ignored if not called by the current running task,
- current task error is the major subject of the wiki page 5. Error Handling designed to provide an elaborated discussion of this topic accompanied by related examples.
In accordance to the specification of the subsystem rtecfg, the API of this subsystem is available via the subpackage xcofdk.fwapi.rtecfg.
Access to the RTE configuration is provided by RtePolicyGetConfig() which returns an instance of the interface class IRteConfig representing current RTE configuration:
- isValid
- isAutoStopEnabled
- isForcedAutoStopEnabled
- isTerminalModeEnabled
- isSubSystemMessagingDisabled
- isSubSystemMultiProcessingDisabled
- isExceptionTrackingOfChildProcessesDisabled
- isExperimentalFreeThreadingBypassed
- isLogRDConsoleSinkDisabled
- isLogRDFileSinkEnabled
- isLogRDTcpSinkEnabled
As long as the framework is not started (see fwctrl.StartXcoFW() below), applications can change the default configuration of the RTE. In general, the API functions to re-configure the RTE can be grouped based on the respective RTE policy they address (see also Enum class ERtePolicyID):
-
addressing auto-stop upon shutdown:
the RTE policies of this group affect the way how the framework handles the request to be joined, i.e. fwctrl.JoinXcoFW(), or whenever the coordinated shutdown sequence is initiated. Their major purpose is to make the otherwise mandatory call to stop the framework, i.e. fwctrl.StopXcoFW() optional, except for the use case of TermialMode:-
RtePolicyEnableAutoStop():
enables RTE policy for AutoStop, that is when joining the framework, the RTE will first wait for all currently running tasks to stop before entering the shutdown sequence. This RTE policy is enabled by default, -
RtePolicyEnableForcedAutoStop():
enables RTE policy for ForcedAutoStop, that is when joining the framework, the RTE will first request all currently running tasks to stop before entering the shutdown sequence, -
RtePolicyEnableTerminalMode():
enables RTE policy for TerminalMode, that is when joining the framework, the RTE will wait for an explicit request to stop the framework, i.e. for a call to fwctrl.StopXcoFW(), before entering the shutdown sequence.
However, note that:
- the API functions of this group are mutually exclusive with any re-configuration by a different auto-stop policy will result in an invalid configuration,
- an explicit request to stop the framework remains a valid operation regardless of the configured auto-stop policy.
-
RtePolicyEnableAutoStop():
-
addressing availability of specific subsystems:
-
RtePolicyDisableSubSystemMessaging():
disables framework's subsystem for messaging, i.e. xmsg, -
RtePolicyDisableSubSystemMultiProcessing():
disables framework's subsystem for multiprocessing),
-
RtePolicyDisableSubSystemMessaging():
-
addressing multiprocessing:
-
RtePolicyDisableExceptionTrackingOfChildProcesses():
disables the ability of the framework for exception tracking of child processes,
-
RtePolicyDisableExceptionTrackingOfChildProcesses():
-
addressing the GIL issue:
-
RtePolicyBypassExperimentalFreeThreadingGuard():
allows running the framework with an experimental free-threaded CPython interpreter with GIL is disabled, see also:- system-specific function sys._is_gil_enabled(),
- CPython build configuration option --disable-gil,
NOTE:
- The framework supports the stable version 3.14.0 being the first Python version officially supporting
free-threaded (FT) Python. - Python versions 3.13 and pre-releases of 3.14.0 are considered by the framework supporting
experimental free-threaded Python only.
-
-
addressing redirection of log output:
-
RtePolicyDisableLogRDConsoleSink():
disables console output, -
RtePolicyEnableLogRDFileSink():
enables log output to the specified file sink, -
RtePolicyEnableLogRDTcpSink():
enables log output to the specified TCP connection sink.
Section 4.7.1 Redirection of Log Output explains this group in more detail.
-
RtePolicyDisableLogRDConsoleSink():
The API of the subpackage xcofdk.fwapi.fwctrl is mostly provided in accordance to the specification of the subsystem fwctrl. For more convenient, the API and/or the related modules are available via the subpackage xcofdk.fwapi, too:
-
__init__.py:
API to control the RTE, i.e. start, stop and join,
-
xlogif.py:
API to submit errors, -
curtask.py:
API to access and/or modify current task error, -
fwutil.py:
helper API functions.
The code snippet below demonstrates a common pattern of how to use the API functions available to control the RTE of the framework:
# file: main.py
from xcofdk.fwcom import EExecutionCmdID
from xcofdk.fwapi import rtecfg
from xcofdk.fwapi import AsyncTask
def RunCB() -> EExecutionCmdID:
# do whatever the main task is supposed to do
#...
return EExecutionCmdID.STOP
def Main():
# step 1:
# (optional) change default RTE configuration,
# e.g. allow running free-threaded Python
rtecfg.RtePolicyBypassExperimentalFreeThreadingGuard()
# step 2:
# start the RTE (of the framework)
fwapi.StartXcoFW()
# step 3:
# create and start the starter task,
# e.g. application's main task, here a non-cyclic async. task
_mainTsk = AsyncTask(RunCB, aliasName_='MainTask', bMainTask_=True)
_mainTsk.Start()
# step 4:
# (optional) here, no need to stop the framework
# as by default 'AutoStop' policy is enabled unless changed in step 1
#fwapi.StopXcoFW()
# step 5:
# wait for framework's termination
fwapi.JoinXcoFW()
return 0The API functions available to control the RTE of the framework are as follows:
-
StartXcoFW():
starts the RTE. The start of the framework may also happen somewhere else other than application's main module or program's entry point. Also, once started, both stopping the framework and/or waiting for its shutdown can also placed at some other proper code locatiion according to application's flow of control.It is also possible to pass start options via the optional argument fwStartOptions_. If specified, then either a space separated sequence of string literals or a list of string literals must be supplied:
Start Option : Description --log-level LOG_LEVEL : with LOG_LEVEL is one of the case-sensistive string literals below used as user (or application) log level:
trace | debug | info | warning | error
It defauls to info.--fw-log-level FW_LOG_LEVEL : with FW_LOG_LEVEL is one of the case-sensistive string literals below used as framework log level:
info | kpi | warning | error
It defaults to error.--disable-log-timestamp : defaulting to False, it disables/hides the timestamp of the output of submitted logs. --disable-log-highlighting : defaulting to False, it disables color highlighting of the console output of submitted logs. --disable-log-callstack : defaulting to False, it disables/hides the callstack of fatal errors (if any). --suppress-start-preamble : defaulting to False, it suppresses the log output of framework's start preamble. Note that the respective list of the log levels shown above are given in ascending order of their precedence. So, at runtime all user/framework log requests with a precedence lower than the specified user/framework log level will be ignored by the framework.
NOTE:
- The respective list of the log levels shown above are given in ascending order of their precedence. So, at
runtime all user/framework log requests with a precedence lower than the specified user/framework log
level will be ignored by the framework.
- Color highlighting of the log output is supported for Python versions 3.9 and higher. In general, the start
option --disable-log-highlighting should always be supplied whenever used console program of
the platform, e.g. Windows Commond Prompt cmd.exe, does not support colored output to stdout.
- The respective list of the log levels shown above are given in ascending order of their precedence. So, at
-
StopXcoFW():
an asynchronous request to stop the RTE if it is (still) running. Note that:- a request to stop the framework will also initiate its coordinated shutdown sequence if not done already,
- unless the RTE policy of TerminalMode is enabled, an explicit request to stop the
framework is optional
(see also auto-stop RTE policies).
-
JoinXcoFW():
a request to synchronously wait for the RTE to finish its execution (if still running).
Note that requests to join the RTE from within a running application task will be denied by the framework.
Also, a few additional API functions are provided for more convenient as they might be frequently used by applications:
-
JoinTasks():
request to wait for termination (of a subset) of all application tasks currently running, -
JoinProcesses():
request to wait for termination (of a subset) of all child proceesses currently running, -
TerminateProcesses():
request to terminate (a subset of) all child proceesses currently running.
The API functions for logging are provided by the module xlogif.py. There are two groups of logging functions:
-
submittig ordinary logs:
depending on the configured user log level (see start option --log-level) will produce a logging output for the message passed to:API Property/Function : Return Type LogTrace(logMsg_ : str) : - LogDebug(logMsg_ : str) : - LogInfo(logMsg_ : str) : - LogWarning(logMsg_ : str) : - -
submitting error logs:
regardless of the configured user log level, the API functions of this group will always produce a logging output. In addition, they will also trigger framework's procedure of qualification of errors, so applications are enabled to submit user or fatal errors whenever they wish to:API Property/Function : Return Type LogError(logMsg_ : str) : - LogFatal(logMsg_ : str) : - LogException(logMsg_ : str, logXcp_ : Exception) : - LogErrorEC(logMsg_ : str, errorCode_ : int) : - LogFatalEC(logMsg_ : str, errorCode_ : int) : - LogExceptionEC(logMsg_ : str, logXcp_ : Exception, errorCode_ : int) : -
The API functions provided by the module curtask.py enable applications to get access to (modify) current task error of current running task:
| API Function | : | Description |
|---|---|---|
| IsCurTaskErrorFree() | : | Returns True if current task error as returned by GetCurTaskError() (see below) resloves to None, False otherwise. |
| IsCurTaskFatalErrorFree() | : | Returns False if current task error as returned by GetCurTaskError() (see below) resloves to a qualified fatal error, True otherwise. |
| GetCurTaskError() | : | Returns current error (if any) of current running task. |
| ClearCurTaskError() | : | Request to clear current error (if any) of current running task. |
| SetCurTaskError() SetCurTaskErrorEC() |
: | Request to set or replace current running task's current error to be a user error. |
| SetCurTaskFatalError() SetCurTaskFatalErrorEC() |
: | Request to set or replace current running task's current error to be a fatal error. |
The API functions provided by the module fwutil.py are as follows:
| API Function | : | Description |
|---|---|---|
| IsFwAvailable() | : | Returns True if the RTE of the framework has been started, False otherwise. |
| IsLcFailureFree() | : | Returns False if the RTE of the framework has encountered an LC failure, True otherwise. |
| IsTaskRunning(taskUID_) | : | Returns True if the task state property ITask.isRunning of the task passed to resolves to True, False otherwise. |
| GetLcFailure() | : | Returns an instance of class LcFailure if the RTE of the framework has encountered an LC failure, None otherwise. |
| GetXcofdkVersion() | : | Returns the version of the installed XCOFDK, i.e. Python package xcofdk. |
| GetPythonVersion() | : | Returns running Python interpreter's version. |
| GetPlatform() | : | Returns the system (or OS) name of the underlying platform, in general one of the string literals below: 'Linux', 'Darwin', 'Java', 'Windows'. |
| GetAvailableCpuCoresCount() | : | Returns total number of available CPU cores on the host machine if it could be retrieved, 0 otherwise. |
Unless the messaging subsystem is disabled (see related RTE configuration API), the subpackage xcofdk.fwapi.xmsg is responsible for providing the public API required for comminication via messaging (see also sections General Pattern of Messaging and Definitions and Concepts):
-
enumerator class EXmsgPredefinedID:
providing pre-defined messaging IDs:Pre-defined ID : Description DontCare : Wildcard specifier for an endpoint or a message filter part. MainTask : Alias specifier for application's main task (if any) as destination. Broadcast : Alias specifier for the destination of a broadcast message. MinUserDefinedID : Pre-definition of the integer vlaue, i.e. 5001, of user-defined IDs (if any). -
IMessage:
whenever a reference to (a received) message object is needed. This interface class provides below properties:API Property/Function : Return Type msgUniqueID -> int msgHeader -> IMessageHeader msgPayload -> IPayload -
IMessageHeader:
whenever a reference to the header object of a message is needed. This interface class provides below properties:API Property/Function : Return Type isInternalMessage -> bool isBroadcastMessage -> bool msgLabel -> Union[IntEnum, int] msgCluster -> Union[IntEnum, int] msgSender -> int msgReceive -> int -
IPayload:
whenever a reference to the payload object of a message is needed, or if applications wish to derive user-defined payload classes by subclassing. When processing messages, the relevant API most frequently used by applications is as follows:API Property/Function : Return Type numParameters -> int IsIncludingParameter() -> bool GetParameter() -> Any DetachContainer() -> Union[dict, None] -
XMessenger.SendMessage()
API function to send a message from current running task as sender to the specified receiver task as destination.
However, the shortcuts below are the preferred API functions to be used instead: -
XMessenger.BroadcastMessage():
API function to broadcast a message from current running task as sender to all application tasks currently available, i.e. all tasks whose task state property ITask.isRunning resolves to True.
However, the shortcuts below are the preferred API functions to be used instead:
Again, for more convenient most of the API needed for (task) communication is also available through subpackage xcofdk.fwapi directly. The code snippet below shows corresponding import instructions for both RC classes and SC classes:
from xcofdk.fwapi import IMessage
from xcofdk.fwapi import IMessageHeader
from xcofdk.fwapi import IPayload
from xcofdk.fwapi import IRCTask # providing above-mentioned shortcuts for RC tasks
from xcofdk.fwapi import IRCCommTask # ditto for full-communication RC tasks
from xcofdk.fwapi.xmt import IXTask # providing above-mentioned shortcuts for SC tasks
#...The following subsections provide example code snippet demonstrating below most common features used when messaging:
It is often useful to define applicatiion-specific messaging IDs (if any) in a way, so that defined IDs can easily be shared by sender and/or receiver tasks.
In general, application-specific messaging IDs may be either plain integer values or some properly defined integer-based enumerator members. Regardless of how they are defined, however, their respective integer value must always be larger than or equal to the pre-defined IntEnum member EXmsgPredefinedID.MinUserDefinedID. The code snippet below shows an example:
# file : appmsgdefs.py
from enum import auto
from enum import unique
from enum import IntEnum
from xcofdk.fwcom import EXmsgPredefinedID
@unique
class EAppMsgID(IntEnum):
# first enum with the min. value of user defined IDs set properly
DontCare = EXmsgPredefinedID.MinUserDefinedID.value
# clusters
CL_JOBS = auto()
# job labels
LBL_JobRequest = auto()
LBL_JobReply = auto()
LBL_JobQuit = auto()
LBL_JobQuitACK = auto()
# misc. labels
LBL_Jingling = auto()
LBL_MagicMagic = auto()
# payload keys
PK_JobID = auto()Regardless of capability for full communication, any possible task instance can send messages. The code snippet below shows a regular task, i.e. a task which cannot be the destinaion of any message, sending some messages:
# file : magictask.py
from xcofdk.fwcom import EExecutionCmdID
from xcofdk.fwapi import GetCurTask
from .appmsgdefs import EAppMsgID
# run phase callback function of magic task: a non-cyclic, async. task
def MagicTaskRunCB(jobServerTID_ : int) -> EExecutionCmdID:
_myTsk = GetCurTask()
# do some work
pass
# broadcast a message with a mandatory label supplied
_myTsk.BroadcastMessage(EAppMsgID.LBL_Jingling)
# send a message to the job server passed in via direct addressing:
# - with no message filter info supplied,
# - but supplying a payload agreed on by a trusted contract
_myTsk.SendMessage(jobServerTID_, msgPayload_={'trusted-key-its-me' : 'dont-care'})
# send a message to the main task via alias addressing
_myTsk.SendMessage(EXmsgPredefinedID.MainTask, msgLabelID_=EAppMsgID.LBL_MagicMagic)
# do some more work
pass
# done, stop task
return EExecutionCmdID.STOPFor tasks capable of full communication use of XF-like classes is the recommended way to specify their 3-PhXF in favor of better readability and maintenance, even though use of callback functions will perfectly work, too. The code snippets below demonstrate two full-communication tasks:
-
main task:
# file : maintask.py from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IMessage from xcofdk.fwapi import IRCCommTask from xcofdk.fwapi import xlogif from xcofdk.fwapi import AsyncTask from .appmsgdefs import EAppMsgID from .magictask import MagicTaskRunCB # XF-like class of the main task: a cyclic, sync. full-comm task class FCMainTaskCB: def __init__(self): self.__js = EAppMsgID.DontCare def RunTask(self, myTsk_: IRCCommTask, jobServerTID_: int) -> EExecutionCmdID: # first iteration? if myTsk_.isFirstRunPhaseIteration: # create and start the magic task: a non-cyclic, async. task _magictsk = AsyncTask(MagicTaskRunCB, aliasName_='MagicTask') _magictsk.Start(jobServerTID_) # job server acknowledged request to quit? elif self.__js == EAppMsgID.LBL_JobQuitACK: # done, stop own task xlogif.LogInfo(f'[{myTsk_.aliasName}] Going to stop...') return EExecutionCmdID.STOP # already requested job server to quit? elif self.__js == EAppMsgID.LBL_JobQuit: # nothing to do, wait for ACK return EExecutionCmdID.CONTINUE # enough job requests sent already? if myTsk_.currentRunPhaseIterationNo > 100: # request job server to quit self.__js = EAppMsgID.LBL_JobQuit myTsk_.SendMessage(jobServerTID_, msgLabelID_=EAppMsgID.LBL_JobQuit, msgClusterID_=EAppMsgID.CL_JOBS) else: # send a new job request myTsk_.SendMessage(jobServerTID_, msgLabelID_=EAppMsgID.LBL_JobRequest, msgClusterID_=EAppMsgID.CL_JOBS) return EExecutionCmdID.CONTINUE def ProcessExternalMessage(self, myTsk_: IRCCommTask, xmsg_: IMessage) -> EExecutionCmdID: if xmsg_.msgHeader.msgLabel == EAppMsgID.LBL_Jingling: # process jingling broadcast pass elif xmsg_.msgHeader.msgLabel == EAppMsgID.LBL_MagicMagic: # process magic-magic message pass elif xmsg_.msgHeader.msgLabel == EAppMsgID.LBL_JobQuitACK: # job server acknowledge quit request self.__js = EAppMsgID.LBL_JobQuitACK elif xmsg_.msgHeader.msgLabel == EAppMsgID.LBL_JobReply: # process job reply pass else: # ignore all other messages pass return EExecutionCmdID.CONTINUE
-
server task:
# file : servertask.py from collections import namedtuple from typing import List from xcofdk.fwcom import EExecutionCmdID from xcofdk.fwapi import IMessage from xcofdk.fwapi import IRCCommTask from xcofdk.fwapi import xlogif from .appmsgdefs import EAppMsgID # data type representing a job request JobRequest = namedtuple('JobRequest', ['ownerTID', 'jobID']) # XF-like class of the server task: a cyclic, async. full-comm task class FCJobServerCB: def __init__(self): self.__jobID = 0 self.__bQuit = False def RunTask(self, myTsk_: IRCCommTask) -> EExecutionCmdID: # requested to quit the service? if self.__bQuit: #TBD: # - wait for some additional iterations before stopping own task. # - this is to make sure, that all messages, which are still on their way to # be delivered to the server, would not cause a 'failed-send-operation' error. # xlogif.LogInfo(f'[{myTsk_.aliasName}] Going to stop...') return EExecutionCmdID.STOP # proccess jobs still pending _finished = self.__ProcessPendingJobs() # notify all job owners about their finished job for _jr in _finished: # prepare payload of the reply message _pl = {EAppMsgID.PK_JobID: _jr.jobID} # send reply for the finished job myTsk_.SendMessage(_jr.ownerTID, EAppMsgID.LBL_JobReply, EAppMsgID.CL_JOBS, _pl) return EExecutionCmdID.CONTINUE def ProcessExternalMessage(self, myTsk_: IRCCommTask, xmsg_: IMessage) -> EExecutionCmdID: _pl = xmsg_.msgPayload if (_pl is not None) and _pl.IsIncludingParameter('trusted-key-its-me'): xlogif.LogInfo(f'[{myTsk_.aliasName}] Received trusted message from task {xmsg_.msgHeader.msgSender}.') elif self.__bQuit: # service is going to quit, so ignore the message pass elif xmsg_.msgHeader.msgCluster != EAppMsgID.CL_JOBS: # ignroe anything else other than message cluster 'CL_JOBS' pass elif xmsg_.msgHeader.msgLabel == EAppMsgID.LBL_JobQuit: # acknowledge request for quit self.__bQuit = True myTsk_.SendMessage(xmsg_.msgHeader.msgSender, EAppMsgID.LBL_JobQuitACK, EAppMsgID.CL_JOBS) elif xmsg_.msgHeader.msgLabel != EAppMsgID.LBL_JobRequest: # unexpected message other than a new job request, ignore it pass else: # store the job request received _jr = JobRequest(xmsg_.msgHeader.msgSender, self.__jobID) self.__StoreJobRequest(_jr) self.__jobID += 1 if (self.__jobID % 20) == 0: xlogif.LogInfo(f'[{myTsk_.aliasName}] Got {self.__jobID} job requests so far.') return EExecutionCmdID.CONTINUE def __StoreJobRequest(self, newReq_ : JobRequest): # tbd: stores a new job request pass def __ProcessPendingJobs(self) -> List[JobRequest]: # tbd: returns list of finished jobs return []
With the example task model (consisting of MagicTaskRunCB, FCMainTaskCB and FCJobServerCB) given above, a possible example program to run the task model is depicted below:
# file : main.py
from xcofdk import fwapi
from xcofdk.fwapi import rtecfg
from xcofdk.fwapi import XFSyncCommTask
from xcofdk.fwapi import XFAsyncCommTask
from .maintask import FCMainTaskCB
from .servertask import FCJobServerCB
def Main() -> int:
# optional: disable subsystem xmp
rtecfg.RtePolicyDisableSubSystemMultiProcessing()
# start the framework
fwapi.StartXcoFW()
# create and start the job server: a cyclic, async. full-comm task
_jobsrv = XFAsyncCommTask( phasedXFCallback_=FCJobServerCB(), aliasName_='JobSrv'
, bRefToCurTaskRequired_=True, runCallbackFrequency_=50)
_jobsrv.Start()
# create and start the main task: a cyclic, sync. full-comm task
_mainTsk = XFSyncCommTask( phasedXFCallback_=FCMainTaskCB(), aliasName_='MainTask', bMainTask_=True
, bRefToCurTaskRequired_=True, runCallbackFrequency_=100)
_mainTsk.Start(_jobsrv.taskUID)
# wait for framework's termination
fwapi.JoinXcoFW()
return 0
if __name__ == "__main__":
exit(Main())When started, the ouput of the program main.py should look like below:
$> python3.12 -m main
[10:23:12.638 XINF][CTsk_501001] [JobSrv] Received trusted message from task 501003.
[10:23:14.589 XINF][CTsk_501001] [JobSrv] Got 20 job requests so far.
[10:23:16.714 XINF][CTsk_501001] [JobSrv] Got 40 job requests so far.
[10:23:18.780 XINF][CTsk_501001] [JobSrv] Got 60 job requests so far.
[10:23:20.904 XINF][CTsk_501001] [JobSrv] Got 80 job requests so far.
[10:23:22.956 XINF][CTsk_501001] [JobSrv] Got 100 job requests so far.
[10:23:23.171 XINF][CTsk_501001] [JobSrv] Going to stop...
[10:23:23.244 XINF][CTsk_501002] [MainTask] Going to stop...With regard to both the design and the implementation presented above as common messaging scenario, there are a few important details to be pointed out:
- a complete detailed design should also take into account the necessary linkage between a request, e.g. by a requestID, and its associated job, i.e. jobID,
- also, in case of asynchronous processing of received job requests, the server should immediately send an acknowledge message back to the requestor,
- in general, a successful send operation requires that the specified destination is in running state (see task state property isRunning),
- in the example above, both tasks MainTask and JobSrv use a handshake mechanism (via message label LBL_JobQuitACK) to prevent blindly sending messages to each other while the respective destination is not in running state anymore,
- such a handshaking, however, is insufficient whenever there are more than one single client communicating with the server task(s). As a result, failed send operations will typically happen (for some time) when a server task has decided to ramp down,
- therefore, the use of a broadcast message (as depicted below) might be the proper solution instead of handshaking:
# file : servertask.py #... class FCJobServerCB: #... def ProcessExternalMessage(self, myTsk_: IRCCommTask, xmsg_: IMessage) -> EExecutionCmdID: #... elif xmsg_.msgHeader.msgLabel == EAppMsgID.LBL_JobQuit: # broadcast server downtime self.__bQuit = True myTsk_.BroadcastMessage(EAppMsgID.LBL_JobSrvDowntime, msgClusterID_=EAppMsgID.CL_JOBS) #... return EExecutionCmdID.CONTINUE #...
In accordance to the specification of both payload abstraction and message marshaling, applications are provided with two alternatives to pass the desired payload object (if any) when sending messages:
-
default payload objects:
which are instances of class XPayload implemented on top of ordinary dictionary objects, i.e. instances of the built-in Python type dict. The framework always uses instances of this class whenever an ordinary dict instance is passed as message payload.Subclassing the interface IPayload, it provides supplementary API functions below useful when constructing a payload instance:
API Property/Function : Return Type isMarshalingRequired -> bool payloadContainer -> dict SetParameter() -> bool UpdatePayloadContainer() -> bool For example, the MagicTaskRunCB shown above could have sent its trusted message using an instance of class XPayload, too:
# file : magictask.py #... from xcofdk.fwapi.xmsg import XPayload # run phase callback function of magic task: a non-cyclic, async. task def MagicTaskRunCB(jobServerTID_ : int) -> EExecutionCmdID: _myTsk = GetCurTask() #... # send a message to the job server... _pl = XPayload() _pl.SetParameter('trusted-key-its-me', 'dont-care') _myTsk.SendMessage(jobServerTID_, msgPayload_=_pl) #... # done, stop task return EExecutionCmdID.STOP
-
custom payload objects:
which are instances of application-specific classes derived from the interface class IPayload. This way, applications are given most flexibility to construct their own, specific payload classes. An example of such a custom payload class is presented by class CustomPayload used by the basic RC example exampleB21:- class CustomPayload:
# file : custompayload.py from xcofdk.fwcom import override from xcofdk.fwapi import IPayload class CustomPayload(IPayload): __slots__ = [...] def __init__(self, ...): super().__init__() #... # -------------------------------------------------------------------------- # custom API # -------------------------------------------------------------------------- @property def countReceived(self): return self.__cntRcv @property def countSent(self): return self.__cntSnd @property def countFailure(self): return self.__cntErr @property def countOutOfOrder(self): return self.__cntOutOfOrder # -------------------------------------------------------------------------- # API inherited from IPayload # -------------------------------------------------------------------------- @IPayload.isValidPayload.getter def isValidPayload(self) -> bool: return self.__bSer is not None #...
- custom payload construction in
XFServiceTask.__PostMessage():
# file : servicetask.py #... from xuserapp.basic.xmt.xmtmisc.custompayload import CustomPayload class XFServiceTask: #... def __PostMessage(self) -> bool: #... payload = CustomPayload( self.__ctrRcv, self.__ctrSnd, self.__ctrErrSnd , self.__ctrOutOfOrder, bSkipSer_=False) #... return res
- custom payload usage when processing in
XFMainTask.ProcessExternalMessage():
# file : maintask.py #... from xuserapp.basic.xmt.xmtmisc.custompayload import CustomPayload class XFMainTask: #... def ProcessExternalMessage(self, xmsg_ : IMessage) -> EExecutionCmdID: #... payload = xmsg_.msgPayload if not isinstance(payload, CustomPayload): xlogif.LogWarning('[mtGuiAppMain] Message...') return EExecutionCmdID.CONTINUE paramSnt = payload.countSent paramRcv = payload.countReceived paramCntErrSnd = payload.countFailure paramOutOfOrder = payload.countOutOfOrder #... return EExecutionCmdID.CONTINUE
- class CustomPayload:
Unless the multiprocessing subsystem is disabled (see related RTE configuration API), the subpackage xcofdk.fwapi.xmp is responsible for providing the public API required for child processes, see process abstraction and general requirements specified by the architecture:
-
enumerator class EProcessStartMethodID:
providing symbolic IDs for process start methods:Pre-defined ID : Description SystemDefault : Refers to the current process start method determined upon start of the framework. Spawn : Refer to the official documentation of Contexts and start methods. Fork : ditto. -
enumerator class EXmpPredefinedID:
mostly providing pre-defined constants related to supplied data of child processes:Pre-defined ID : Description MinSuppliedDataSize : Currently set to 4:
min. size of a byte stream provided as supplied data.MaxSuppliedDataSize : Currently set to 0x7FF0.0000 or 2,146,435,072:
max. size of a byte stream provided as supplied data.DefaultSuppliedDataMaxSize : Currently set to 10240:
default value taken as max. size of a byte stream provided as supplied data,
which is large enough for a byte stream of a list of up to 1022 integer values
each set to sys.maxsize. -
utility class XmpUtil:
providing common, auxiliary API functions:API Function : Return Type IsCurrentStartMethod(startMethodID_ : EProcessStartMethodID) -> bool IsValidStartMethodName(startMethod_ : str) -> bool GetDefinedStartMethdsNameList() -> list GetSystemDefaultStartMethodID() -> EProcessStartMethodID GetCurrentStartMethodID() -> EProcessStartMethodID GetCurrentStartMethodName() -> str MapStartMethodToID(startMethod_ : str) -> EProcessStartMethodID CurrentProcessStartMethodAsString() -> str -
simple child process class XProcess:
enables applications to create and run simple child processes (see below). -
child process exception classes PTException and PTWrappedException:
represent possible exceptions while execution of child processes (see below).
Instances of class XProcess represent (simple) child processes each associated with a host process object, that is an instance of Python class multiprocessing.Process. The signature of the constructor of class XProcess consists of below formal parameters:
-
target_:
the callable object to be executed by the associated host process when this instance is started.
When called , the callback function is expected to accept positional and/or keyword arguments (if any). -
aliasName_:
the alias name to be assigned to the child process.
If specified, an arbitrary non-empty and printable string literal without spaces which optionally may have a trailing '_', otherwise 'Prc_' will be auto-assigned.
Also, much like the alias name of task instances (see ITask.aliasName), the unique instance number of the child process to be created will be appended to an alias name with a trailing '_'. -
name_:
name of the host process to be created and associated with.
If not specified, it will be auto-generated (see host process attribute Process.name). -
maxSuppliedDataSize_:
default value used as maximum length of a byte stream representing the serialization of a data object (if any) returned by the callback function target_ above.
If not specified, DefaultSuppliedDataMaxSize will be assumed. Otherwise the value passed to is expected to be within the integer range below:
[MinSuppliedDataSize .. MaxSuppliedDataSize]
Note that child processes can be created only if the framework has been started already. The code snippet below demonstrates a simple example:
# file : main.py
from os import getpid
from time import sleep
from typing import List
from xcofdk import fwapi
from xcofdk.fwapi import rtecfg
from xcofdk.fwapi import xlogif
from xcofdk.fwapi import XProcess
def _ProcCB(idx_ : int) -> List[int]:
# do some work
sleep(2.0)
_rs = idx_ * 1000
return [ _ee+getpid() for _ee in range(_rs, _rs+1000) ]
def _StartProcesses():
_CNT = 3
_procs = [ XProcess(_ProcCB) for _ii in range(_CNT) ]
for _ii in range(_CNT):
_procs[_ii].Start(_ii)
# wait for all running child processes to terminate
fwapi.JoinProcesses()
for _pp in _procs:
_sd = _pp.processSuppliedData
_msg = f'[{_pp.aliasName}-PID:{_pp.processPID}] '
_msg += f'Got (de-serialized) supplied data, size={len(_sd)}:\n\t'
_msg += ' '.join([str(_ee) for _ee in _sd[:5]]) + '...'
xlogif.LogInfo(_msg)
def Main() -> int:
# optional: disable subsystem xmsg
rtecfg.RtePolicyDisableSubSystemMessaging()
# start the framework
fwapi.StartXcoFW()
# start a few child processes
_StartProcesses()
# wait for framework's termination
fwapi.JoinXcoFW()
return 0
if __name__ == "__main__":
exit(Main())When started, the ouput of the program main.py should look like below:
$> python3.12 -m main
[10:30:04.373 XINF][MainThread] [Prc_1-PID:8576] Got (de-serialized) supplied data, size=1000:
8576 8577 8578 8579 8580...
[10:30:04.373 XINF][MainThread] [Prc_2-PID:8577] Got (de-serialized) supplied data, size=1000:
9577 9578 9579 9580 9581...
[10:30:04.373 XINF][MainThread] [Prc_3-PID:8578] Got (de-serialized) supplied data, size=1000:
10578 10579 10580 10581 10582...The general child process properties of an instance of class XProcess include below API:
| API Property | : | Description |
|---|---|---|
| isAttachedToFW | : | Returns True if this instance is (still) attached to the framework, False otherwise. |
| aliasName | : | Returns (auto-generted) alias name of this instance (see constructor parameter aliasName_). |
| processPID | : | Returns PID of the associated host process of this instance if started, None otherwise. |
| processName | : | Returns (auto-generted) name of this instance (see constructor parameter name_). |
| processExitCode | : | Returns None if not started or not terminated yet, 0 upon successful termination, an integer value otherwise. |
| processExitCodeAsStr | : | Returns the exit code (if available) of this instance as a string object, e.g. SIGTERM, None otherwise. |
| processSuppliedData | : | Returns the (application-specific) data (if any) supplied by this instance through the returned value of the target callback function target_ passed to the constructor. |
| processException | : | Returns: - None if exception tracking of child processes is disabled via RTE configuration, - None if this instance is not started, - None if this instance is terminated upon successful termination, - None if no exception was raised while execution of the target callback function, - that raised exception otherwise. |
The general child process state properties of an instance of class XProcess are as follows:
| API Property | : | Description |
|---|---|---|
| isStarted | : | Returns True if this instance has been started, False otherwise. |
| isRunning | : | Returns: - False if this instance is not started yet, - False if the associated host process is not alive anymore or terminated already by providing an exit code, - False if this instance has been requested to terminate, - True otherwise. |
| isDone | : | Returns True if this instance has finished its execution upon normal termination indicated by an exit code of 0, False otherwise. |
| isFailed | : |
True if this instance has finished its execution upon abnormal termination indicated by an exit code other than 0, False otherwise. |
| isTerminated | : | Returns: - False if this instance is not started yet, - False as long as this instance is running, - True otherwise. |
| isTerminatedByCmd | : | Returns: - False if this instance is not started yet, - False as long as this instance is running, - False if this instance is terminated already with or without having a request to terminate it by intention, - True after a request to terminate this instance by intention with the framework was not able to establish the exit code and/or the aliveness of the associated host process. |
The general child process operations of an instance of class XProcess are as follows:
| API Function | : | Description |
|---|---|---|
| Start() | : | Starts the host process associated to this instance making the target callback function target_ passed to the constructor is called. Also, positional and/or keyword arguments expected by target_ (if any) can be passed, too. |
| Join() | : | Initiates a synchronous waiting for the termination of this instance, possibly for a maximum amount of time (if specified). |
| Terminate() | : | Requests the framework to intenially terminate the host process associated to this innstance. Note that the framework never terminates a child process on its own behalf. |
| DetachFromFW() | : | Detaches this instance from the framework. Main purpose of this API functions is releasing application or system resources used for. |
Unless exception tracking of child processes is disabled via RTE configuration, and whenever an exception is raised while execution of the target callback function target_ by the associated host process, the child process property XProcess.processException returns a reference to an instance of either one exception classes below (with PT stands for (child) Process Target):
-
PTException:
represents the exception raised by the associated host process, its API is as follows:API Propery : Return Type isWrappedException -> bool message -> str code -> int reason -> Union[BaseException, None] -
PTWrappedException:
derived from class PTException, instances of this class represent a compact version of the exception raised by the associated host process, whenever the length of the byte stream of the raised exception exceeds the maximum valued specified by maxSuppliedDataSize_ passed to the constructor. Its (inherited) API is as follows:API Propery/Function : Return Type message -> str code -> int reason -> str reasonType -> type IsReasonType(cls_ : type) -> bool
Representing a key characteristic of child processes, however, a failed child process is never considered an LC failure unless the application submitts it as such, as shown in the code snippet below:
# file : main2.py
from time import sleep
from xcofdk import fwapi
from xcofdk.fwapi import rtecfg
from xcofdk.fwapi import xlogif
from xcofdk.fwapi import XProcess
def _ProcCB():
# do some work
sleep(2.0)
raise AttributeError('Some demo exception.')
def _StartChildProcess():
_proc = XProcess(_ProcCB, aliasName_='MyProc')
_proc.Start()
# wait for all running child processes to terminate
fwapi.JoinProcesses()
_pfx = f'[PID:{_proc.processPID}]'
if not _proc.isDone:
if _proc.processException is None:
_msg = f'{_pfx} Child process {_proc.aliasName} failed '
_msg += f'with exit code {_proc.processExitCode}.'
xlogif.LogErrorEC(_msg, 12345)
else:
_pxcp = _proc.processException
_rname = type(_pxcp.reason).__name__
_msg = f'{_pfx} Child process {_proc.aliasName} failed with an {_rname} exception.'
xlogif.LogExceptionEC(_msg, _pxcp, 67890)
return 1
xlogif.LogInfo(f'{_pfx} Child process {_proc.aliasName} terminated as expected.')
return 0
def Main() -> int:
# optional: disable subsystem xmsg
rtecfg.RtePolicyDisableSubSystemMessaging()
# start the framework
fwapi.StartXcoFW()
# create and start a child process
_StartChildProcess()
# wait for framework's termination
fwapi.JoinXcoFW()
return 0
if __name__ == "__main__":
exit(Main())When started, the ouput of the program main2.py should look like below:
This section provides detailed information on both how to use the redirection API and how to capture the redirected output.
As already explained in section 4.3.2 RTE Configuration API, the group of API functions related to redirection is designed to enable applications to:
In general, however, the console output of the submitted log entries is enabled by default as long as the framework is not started yet. This is especially to ensure a proper feedback to the application in case of possible user errors when using otherwise available API of the framework prior to start, e.g. RTE configuration or when creating task instances in advance (see section 4.2.5 Task Instantiation).
The API function RtePolicyDisableLogRDConsoleSink() can be used to disable the output of submitted logs to the console, i.e. to the standard output stream stdout. This configuration is especially regardless of a possible configuration for redirection to a file and/or TCP connection.
For example, with below modified version of the Main() function shown in section 4.6.1.4 Child Process Exceptions above:
# file : main2.py
#...
def Main() -> int:
# optional: disable subsystem xmsg
rtecfg.RtePolicyDisableSubSystemMessaging()
# disable console output
rtecfg.RtePolicyDisableLogRDConsoleSink()
# start the framework
fwapi.StartXcoFW()
# create and start a child process
_StartChildProcess()
# wait for framework's termination
fwapi.JoinXcoFW()
# if an LC failure, then generate an ordinary console output on behalf of the program
if not fwapi.IsLcFailureFree():
_lcf = fwapi.GetLcFailure()
_msg = f'Program encountered an LC failure:\n error code : '
_msg += f'{_lcf.errorCode}\n error message : {_lcf.errorMessage}'
print(_msg)
return 0
if __name__ == "__main__":
exit(Main())there will be no console output by the framework anymore when running main2.py:
The API function RtePolicyEnableLogRDFileSink() can be used to enable the output of submitted logs to a file. There are also optional parameters which can be passed to when configuring:
-
filePath_, defaulting to None:
pathname (absolute or relative to the current working directory) of the file to be used for log output to:- if None, then the path to the current working directory will be assumed as specified,
- if the path specifies an existing directory, then an auto-generated file name (in that directory) will be used, e.g.:
'xcofdk_log__14_23_38_418.txt', - the pathname as is otherwise.
-
bFileModeAppend_, defaulting to False:
if True then the specified file will be opened with file open mode 'a' (for appending if exists),
it will be opened with 'w' (for writing) otherwise. -
fileEncoding_, defaulting to 'utf-8':
the encoding to be used for the specified file:- if None, then system or platform default will be used,
- the specified encoding otherwise.
The example code below demonstrates the general pattern of RTE configuration for redirection of logging output:
# file : main3.py
from xcofdk import fwapi
from xcofdk.fwapi import rtecfg
from xcofdk.fwapi import xlogif
from xcofdk.fwapi import EExecutionCmdID
from xcofdk.fwapi import IRCTask
from xcofdk.fwapi import AsyncTask
from xcofdk.fwcom.fwdefs import ELineEnding
_DEFAULT_MAX_ITER = 100
def RunCB(myTsk_: IRCTask, maxIter_ : int =_DEFAULT_MAX_ITER) -> EExecutionCmdID:
_iterNo = myTsk_.currentRunPhaseIterationNo
if _iterNo > maxIter_:
xlogif.LogInfo(f'[{myTsk_.aliasName}:{_iterNo}] Done.')
return EExecutionCmdID.STOP
if (_iterNo%10) == 0:
_rt = ((maxIter_-_iterNo)*myTsk_.runPhaseFrequencyMS)//1000
xlogif.LogInfo(f'[{myTsk_.aliasName}:{_iterNo}] ca. time to stop: {_rt} [s]')
return EExecutionCmdID.CONTINUE
def Main(maxIter_ : int =_DEFAULT_MAX_ITER) -> int:
# optional: disable subsystems xmsg and xmp
rtecfg.RtePolicyDisableSubSystemMessaging()
rtecfg.RtePolicyDisableSubSystemMultiProcessing()
# optional: disable console sink
#rtecfg.RtePolicyDisableLogRDConsoleSink()
# enable redirection to:
# - file sink
_fp = '/home/xguest/logs/my_xcofdk_log.txt'
rtecfg.RtePolicyEnableLogRDFileSink(filePath_=_fp, bFileModeAppend_=True)
#
# - and/or TCP sink
rtecfg.RtePolicyEnableLogRDTcpSink('192.168.56.100', 65432, lineEnding_=ELineEnding.CR)
# start the framework
fwapi.StartXcoFW('--fw-log-level warning')
# create and start a cyclic async. task
_tsk = AsyncTask(RunCB, aliasName_='RDDemo', bRefToCurTaskRequired_=True, runCallbackFrequency_=100)
_tsk.Start(maxIter_=maxIter_)
# wait for framework's termination
fwapi.JoinXcoFW()
return 0
if __name__ == "__main__":
_maxIter = _DEFAULT_MAX_ITER if len(sys.argv)<2 else int(sys.argv[1])
if _maxIter < 30: _maxIter = _DEFAULT_MAX_ITER
exit(Main(_maxIter))In a separate command Shell open the specified log file for live tracking of (appended) changes to its content:
/home/xguest/logs$> touch my_xcofdk_log.txt
/home/xguest/logs$> tail -f my_xcofdk_log.txtand start running main3.py (for a duration of ca. 5 seconds):
/home/xguest/test$> python3.12 -m main3 50
--------------------------------------------------------------------------------
-----
----- XCOFDK v3.1
...Then the specified log file is written or updated accordingly:
NOTE:
- The RTE configuration API for redirection to TCP sinks, as presented in this section, is designed and
provided for development purposes only (in a development/local/private environment).
- Especially, the underlying implementation does not incorporate any kind of precautions in terms of
secure connections.
- Hence, the API function RtePolicyEnableLogRDTcpSink() shall not be used in a production code
or whenever seccure network requirements are of concern.
- A new, separate API function for redirection via secure connections is part of framework's PM-backlog
for future featucres and will be announced accordingly as soon as available.
The API function RtePolicyEnableLogRDTcpSink() can be used to enable the output of submitted logs to a TCP connection with below (optional) parameters to be passed to:
-
ipv4Addr_:
the IPv4 address (specified in dotted decimal notation) of the host machine the application is running on, -
port_:
the port number of the host machine to be used for the connection, -
lineEnding_:
the line ending to be used for sending string objects via the specified TCP connection, it defaults to ELineEnding.NOLE, i.e. no line ending.
The example code main3.py shown above demonstrates the expected pattern of use of this API function. In general, when started with redirection to a TCP sink enabled:
- the framework starts listening to incoming connection requests,
- first successful one (if any) wins with subsequent connection requests are refused then,
- note, however, that this listening mode means an extra CPU load as long as no connection request is received,
something of special importance if running Python interpreter's GIL is enabled, - therefore, the redirection to a TCP sink should only be enabled if a client application, which will initiate the connectiion establishment, is supposed to be used, too,
- once the connection is established, subsequent logs are redirected to with queued logs still available (if any) are sent out first.
Tera Term is a free terminal emulator program for Microsoft Windows. Among other purposes, it is widely used for capturing the output of remote connections. So, in case of a TCP connection, it perfectly serves as the connection initiator expected by the framework to stop its listeening mode mentioned in previous section.
The bullet points below illustrate a typical capture session using Tera Term:
-
(optional) log file specification:
to store (or append) received connection output to a local file (go to menu item File > Log...), -
connection setup:
go to File > New connection... to open the setup dialog and enter the same connection parameters as done for enabling the TCP sink in the application, i.e. RtePolicyEnableLogRDTcpSink(). Make sure an unspecified connection type is selected, i.e. the radio button Other:
If the application whose logs are going to be captured is running already, confirm the setup dialog,
-
start the application:
if not done yet, for example main3.py (this time for a duration of ca. 8 seconds):/home/xguest/test$> python3.12 -m main3 80 -------------------------------------------------------------------------------- ----- ----- XCOFDK v3.1 ... -
confirm setup dialog:
if not done yet, confirm the new connection dialog now, -
Tera Term starts displaying incoming logs:
almost promptly, received logs should now be displayed as shown below for main3.py:
Tools for network analysis are useful for:
- troubleshooting an specified TCP connection which is not established as expected, or
- capturing transferred logs of an existing TCP connection for redirection out of yet another machine or device, respectively.
Commonly known as a powerful tool for network analysis, the bullet points below demonstrate the typical analysis steps to be performed for such scenarios using Wireshark:
-
network interface:
identify the network interface the specified IPv4 address belongs to, e.g. using below commands:-
on Linux platforms:
/home/xguest/test$> ifconfig [-a] ens44: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 inet 192.168.56.100 netmask 255.255.255.0 broadcast 192.168.56.255 ... /home/xguest/test$>
-
on Windows platform:
C:\Users\xguest$> ipconfig [/all] Ethernet adapter Ethernet: IPv4 Address. . . . . . . . . . . : 192.168.1.123(Preferred) ... Ethernet adapter VirtualBox Host-Only Network: IPv4 Address. . . . . . . . . . . : 192.168.56.1(Preferred) ... C:\Users\xguest$>
-
-
start a new capture:
in Wireshark start a new capture for the identified network interface, -
define capture filter:
define an appropriate capture filter which includes the packet sequence for connection establishment and/or transferred packets for submitted logs, e.g.:- (ip.src==192.168.56.100 && tcp.port==65432) || (ip.dst==192.168.56.100 && tcp.flags.syn==True)
- with tcp.flags.syn is used to identify that the connection is initiated as expected,
-
start the application:
start an application with redirection to TCP sink is enabled, e.g. main3.py, along with the client application supposed to serve as connection initiator, e.g. Tera Term as presentd in section 4.7.1.3.1 Capture Logs via Tera Term, -
analyze captured TCP packets:
provided the specification of the connection to be used was correct, related TCP packets for connection establishment should be present, as shown in the screenshot below:- 56145 -> 65432 [SYN]... for connection request,
- 65432 -> 56145 [SYN, ACK]... for connection acknowledge
[Top] [Home] [Previous] [Next] – [1. Introduction] [2. Quick Start] [3. Architecture] [5. Error Handling] [6. Basic Examples] [7. Glossary]
©fasabm 2024-2025 XCOFDK. All Rights Reserved.