Metadata-Driven Workflow Management: Stripping Away the Myths – Chapter 2

When implementing ETL or ELT processes using PL/SQL modules, scheduling the execution of atomic modules with internal dependencies and specific rules is often a crucial task. However, enterprise-wide schedulers may not meet the requirements for fine-grained dependencies and rules. As a result, local ETL schedulers are often used to trigger ETL batch processes. Unfortunately, many of the built-in schedulers in standard ETL tools are too inflexible and cannot scale effectively. To address this, I have developed a solution called SIMPLETASK, which is straightforward to use despite its powerful capabilities. The following is a basic version of the solution, which is intended to help users understand the principles and easily incorporate them into simpler ETL or migration processes.

SIMPLETASK

Drivers and metadata elements

The solution of the scheduling process comes from the standard network graph task with resources. It is possible to share a lot of methods with another solution (Finding a critical path etc.)

  • Heap (Batch) – defines a closed group of processes run together and driven by internal dependencies.
  • Task (Process) – defines atomic module to be run.
  • Predecessor – defines a dependency between two modules, the second one should not be run before the first one is completed.
  • Exclusion – defines a dependency between two modules, any of them should not be run when another process is in progress. (Note – in our solution exclusions are solved using resources)
  • Resource – defines limited resources consumed by running processes.
  • Resource consumption – defines consumption of resources by a particular process.
  • Round (Run) – defines one cycle of execution, for example, a processed day of daily process.

Resources and exclusions

Resource-based exclusion: This approach involves defining a resource that is consumed by a process while it is running. When another process needs to run, it checks if the resource is available. If the resource is not available, the process is put on hold until the resource is freed up. This approach ensures that two processes do not run at the same time if they require the same resource.

Using explicit exclusion

The previous example demonstrated dependencies between processes using both predecessor and exclusion types. However, the following example replaces the exclusion dependency with resources. Resources provide two options: full exclusion (100% consumption) or limiting the number of large modules processed simultaneously (50% consumption allows up to 2 such processes to be processed at the same time).

Using resources

State diagram of a task during runtime

All tasks start with the “NONE”, i.e. they have no record in the runtime table for the current round.

When there are neither PREDECESSOR nor RESOURCE limitations, the re is a thread available, it chooses the task for execution. It checks its Exec and Skip conditions and decides on action:

  • Exec is false – WAIT
  • Exec is true and Skip is true – SKIP
  • Exec is true and Skip is false – ACTIVE (start it)
State diagram

D O W N L O A D    S I M P L E    S O L U T I O N

o run your ETL or Data Migration tasks using the SIMPLETASK solution, you can download two files: one for the metadata structure and the other for the PL/SQL packages. Here are the steps to install the solution:

  1. Create a database user (e.g. BWTA_OWNER).
  2. Allocate database space quota to store the metadata. Note that extensive space is not required.
  3. Grant the following privileges to the user:
    • CREATE SESSION
    • CREATE TABLE
    • CREATE TRIGGER
    • CREATE VIEW
    • CREATE SEQUENCE
    • CREATE JOB
    • CREATE PROCEDURE
    • EXECUTE ON DBMS_LOCK.
  4. Run the first script download to install the metadata structures: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_metadata.sql.
  5. Run the second script download to install the packages: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_packages.sql.
  6. Run the third script download to install the views: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_views.sql.

METADATA MODEL

Metadata model

Metadata structure description:

BWTA_TASK_RES Relation between processes – predecessor specification
TASK_SEQ NUMBER 1 Sequence key of task
RES_SEQ NUMBER 2 Sequence key of resource
AMOUNT NUMBER O Consumption of specified resource by the process
BWTA_TASK_REL Relation between processes – predecessor specification
SEQ1 NUMBER 1 Sequence key of dependent task
SEQ2 NUMBER 2 Sequence key of the predecessor task
SKIP_FLAG NUMBER(1) O Skip flag to disable dependency
BWTA_TASK Process (task) to be run
SEQ NUMBER 1 Sequence key of task
HEAP_SEQ NUMBER M Sequence key of heap
ID VARCHAR2(100) M ID of process
NOTE VARCHAR2(500) O Description of task
EXEC_COND VARCHAR2(2000) O Condition code of execution to block task based on external condition
SKIP_COND VARCHAR2(2000) O Condition code of skip to skip task based on external condition
EXEC_FLAG NUMBER O Flag of execution to block task
SKIP_FLAG NUMBER O Flag of skip to skip task
EXEC_CODE VARCHAR2(2000) O Execution PL/SQL code
PRIORITY NUMBER M Priority of execution (less is more important)
AVG_DURATION NUMBER O Average duration of process based on statistics
CNT_DURATION NUMBER O Count of runs involved in statistics
BWTA_RES Resource definition
SEQ NUMBER 1 Sequence key of resource
ID VARCHAR2(100) O ID of resource
NOTE VARCHAR2(500) O Description of resource
AMOUNT NUMBER M Total amount of resource (e.g. 100 [%])
PENDING NUMBER(1) M 0..resource released after end of process, 1..resource should be released by negative consumption
BWTA_LOG_THREAD Thread of execution
SEQ NUMBER 1 Sequence key of thread
ROUND_SEQ NUMBER O Sequence key of round
STATUS VARCHAR2(10) O Status of thread (ACTIVE, INACTIVE, SLEEP)
TASK_SEQ NUMBER O Sequence key of active task for the ACTIVE status
START_DT DATE O Start date of current activity
COMMAND VARCHAR2(100) O command for thread – STOP to stop thread after current activity
ERROR_MSG VARCHAR2(2000) O Error message for the ERROR status
BWTA_LOG_TASK_H Process execution log
TASK_SEQ NUMBER O Sequence key of process
ROUND_SEQ NUMBER O Sequence key of round
STATUS VARCHAR2(10) O Status of process (ACTIVE, ERROR, DONE, SUSPEND, SKIP, WAIT)
START_DT DATE O Start date and time of process execution or time to wait for
END_DT DATE O End date and time of process execution
ERROR_MSG VARCHAR2(4000) O Error message for the ERROR status
TS TIMESTAMP(6) O Timestamp of the change
BWTA_LOG_TASK Process execution log
TASK_SEQ NUMBER 1 Sequence key of process
ROUND_SEQ NUMBER 2 Sequence key of round
STATUS VARCHAR2(10) O Status of process (ACTIVE, ERROR, DONE, SUSPEND, SKIP, WAIT)
START_DT DATE O Start date and time of process execution or time to wait for
END_DT DATE O End date and time of process execution
ERROR_MSG VARCHAR2(4000) O Error message for the ERROR status
BWTA_LOG_ROUND Round of the batch execution
SEQ NUMBER 1 Sequence key of round
HEAP_SEQ NUMBER M Sequence key of heap
EFFECTIVE_DATE DATE O Effective date of round
START_DT DATE O Start date and time of round
END_DT DATE O End date and time of round – indicates completed rounds
BWTA_LOG_METADATA
SEQ NUMBER 1 Primary key of changed record
SEQ2 NUMBER 2 Optional extension of key of changed record
DT TIMESTAMP(6) 3 Timestamp of realized change
OPERATION CHAR(1) 4 Operation (I,U,D)
OLD_VAL XMLTYPE O Old value of record XML element
NEW_VAL XMLTYPE O New value of record XML element
TAG VARCHAR2(100) O
TABLE_NAME VARCHAR2(30) M Table of the change
BWTA_LOG_ERR Log of all the realized failover actions
ROUND_SEQ NUMBER 1 Round when it happened
TASK_SEQ NUMBER 2 Sequence key of Task
ERR_ID NUMBER 3 Error ID
DT TIMESTAMP(6) 4 Timestamp when it happened
BWTA_HEAP Batch of processes to be run
SEQ NUMBER 1 Sequence key of batch, -1 is default one
ID VARCHAR2(100) O ID of batch
NOTE VARCHAR2(500) O Description of batch
STAT_ROUND_SEQ NUMBER M Last round the statistics has been gathered
BWTA_ERR Maintained errors
ID NUMBER 1 ORA error number
ATMPT_CNT NUMBER O Number of attempts
DELAY_DAY NUMBER O Delay specified as a fragment of day
PROLONG_KOEF NUMBER O Koefficient of prolongation of each next attempt
DIVERS_MOD NUMBER O Modulo used for diversification of particular processes

Example

In the following example of filling metadata, we will use processes from the network graph above.

Begin DBMS_LOCK.sleep([duration]);end;  

will be used instead of real modules to simulate the duration of processes.

BEGIN 
     BWTA_METADATA.SETRES('R1','Resource 1 - target partition',100,0,'INIT'); 
     BWTA_METADATA.SETTASK('P1', 'Task1', 'begin DBMS_LOCK.sleep(120);end;', 50, 'INIT'); 
     BWTA_METADATA.SETTASK('P2', 'Task2', 'begin DBMS_LOCK.sleep(140);end;', 50, 'INIT'); 
     BWTA_METADATA.SETTASK('P3', 'Task3', 'begin DBMS_LOCK.sleep(250);end;', 50, 'INIT'); 
     ---- 
     BWTA_METADATA.SETTASK('P4', 'Task4', 'begin DBMS_LOCK.sleep(160);end;', 50, 'INIT'); 
       BWTA_METADATA.SETTASKREL('P4','P1',-1,0,'INIT'); --dependency P4 on P1 
       BWTA_METADATA.SETTASKREL('P4','P2',-1,0,'INIT'); --dependency P4 on P2 
     BWTA_METADATA.SETTASK('P5', 'Task5', 'begin DBMS_LOCK.sleep(100);end;', 50, 'INIT'); 
       BWTA_METADATA.SETTASKREL('P5','P3',-1,0,'INIT'); --dependency P5 on P3 
       BWTA_METADATA.SetTaskRes('P5',-1,'R1',100,'INIT'); --Task P5 requires resource R1 (amount 100) 
     ---- 
     BWTA_METADATA.SETTASK('P6', 'Task6','begin DBMS_LOCK.sleep(300);end;', 50, 'INIT'); 
       BWTA_METADATA.SETTASKREL('P6','P4',-1,0,'INIT'); --dependency P6 on P4 
       BWTA_METADATA.SetTaskRes('P6',-1,'R1',100,'INIT'); --Task P5 requires resource R1 (amount 100) 
     ---- 
     BWTA_METADATA.SetTask('P7', 'Task7', 'begin DBMS_LOCK.sleep(309);end;', 50, 'INIT'); 
       BWTA_METADATA.SETTASKREL('P7','P5',-1,0,'INIT'); --dependency P7 on P5 
       BWTA_METADATA.SetTaskRel('P7','P6',-1,0,'INIT'); --dependency P7 on P6 
    Commit; 
end; 
/ 

Now we can start the daily process using the following command:

BEGIN 
  bwta_oper.startround(date'2023-02-22');
END;
/

The process is started in 5 parallel threads for an effective date of 22-FEB-2023. We can operatively check execution from runtime metadata. The following list contains snapshots taken during the execution from the very start to the end. Column BLOCKING_TASKS hints at what are particular tasks waiting for.

Security architecture

Security architecture

The most common tasks of workflow management

Monitoring of current processes

SELECT
  TASK_ID
, EFFECTIVE_DATE
, STATUS
, to_char(START_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT
, to_char(END_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT
, END_DT
, MINUTES
, AVG_MINUTES
, PREDECESSOR_TASKS
, BLOCKING_TASKS
FROM BWTA_V_LOG_TASK_WIDE
WHERE HEAP_SEQ=-1
ORDER BY 
  CASE STATUS WHEN 'ERROR' THEN 1 
              WHEN 'SUSPEND' THEN 2
              WHEN 'ACTIVE' THEN 3
              WHEN 'DONE' THEN 5
              WHEN 'SKIP' THEN 5
              ELSE 4
  END 
, TASK_ID  

Error treatment the first column returns a command to restart the process

SELECT 'BEGIN BWTA_OPER.restartTask('''||TASK_ID||''');END;' as CMD
, ERROR_MSG
FROM BWTA_V_LOG_TASK_WIDE
WHERE status = 'ERROR'
ORDER BY START_DT 

List of threads of the current round(s)

SELECT b.seq
, b.round_seq
, b.status
, TO_CHAR(d.start_dt, 'DD-MM-YYYY HH24:MI:SS') start_dt
, b.task_seq
, c.id
, b.command
, b.error_msg
FROM BWTA_LOG_THREAD B
LEFT JOIN BWTA_TASK C
ON B.TASK_SEQ = C.SEQ
LEFT JOIN BWTA_LOG_TASK d
ON d.task_seq = c.seq
  AND d.round_seq = b.round_seq
ORDER BY b.seq 

Number of processes by status (done, to be done, error)

SELECT
   TASK_ID
 , EFFECTIVE_DATE
 , STATUS
 , START_DT
 , END_DT
 , MINUTES
 , AVG_MINUTES
 , PREDECESSOR_TASKS
 , BLOCKING_TASKS
 FROM BWTA_V_LOG_TASK_WIDE
 WHERE HEAP_SEQ=-1
TASK_ID              EFFECTIV STATUS     START_DT END_DT      MINUTES AVG_MINUTES PREDECESSOR_TASKS    BLOCKING_TASKS
-------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1                   22.08.08 ACTIVE     29.06.13          ,133333333  2,03541667
P2                   22.08.08 ACTIVE     29.06.13                 ,05  2,33333333
P3                   22.08.08 ACTIVE     29.06.13          ,033333333  4,16666667
P4                   22.08.08                                               5,175 P1,P2                P1,P2
P5                   22.08.08                                               1,675 P3                   P3
P6                   22.08.08                                                   5 P4                   P4
P7                   22.08.08                                          5,15833333 P5,P6                P5,P6
TASK_ID              EFFECTIV STATUS     START_DT END_DT      MINUTES AVG_MINUTES PREDECESSOR_TASKS    BLOCKING_TASKS
-------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1                   22.08.08 DONE       29.06.13 29.06.13       2,05  2,03541667
P2                   22.08.08 DONE       29.06.13 29.06.13 2,33333333  2,33333333
P3                   22.08.08 ACTIVE     29.06.13                2,55  4,16666667
P4                   22.08.08 ACTIVE     29.06.13          ,233333333       5,175 P1,P2
P5                   22.08.08                                               1,675 P3                   P3
P6                   22.08.08                                                   5 P4                   P4
P7                   22.08.08                                          5,15833333 P5,P6                P5,P6
TASK_ID              EFFECTIV STATUS     START_DT END_DT      MINUTES AVG_MINUTES PREDECESSOR_TASKS    BLOCKING_TASKS
-------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1                   22.08.08 DONE       29.06.13 29.06.13       2,05  2,03541667
P2                   22.08.08 DONE       29.06.13 29.06.13 2,33333333  2,33333333
P3                   22.08.08 DONE       29.06.13 29.06.13 4,16666667  4,16666667
P4                   22.08.08 ACTIVE     29.06.13          2,01666667       5,175 P1,P2
P5                   22.08.08 ACTIVE     29.06.13          ,166666667       1,675 P3
P6                   22.08.08                                                   5 P4                   P4
P7                   22.08.08                                          5,15833333 P5,P6                P5,P6
TASK_ID              EFFECTIV STATUS     START_DT END_DT      MINUTES AVG_MINUTES PREDECESSOR_TASKS    BLOCKING_TASKS
-------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1                   22.08.08 DONE       29.06.13 29.06.13       2,05  2,03541667
P2                   22.08.08 DONE       29.06.13 29.06.13 2,33333333  2,33333333
P3                   22.08.08 DONE       29.06.13 29.06.13 4,16666667  4,16666667
P4                   22.08.08 DONE       29.06.13 29.06.13 2,66666667       5,175 P1,P2
P5                   22.08.08 ACTIVE     29.06.13          1,56666667       1,675 P3
P6                   22.08.08                                                   5 P4
P7                   22.08.08                                          5,15833333 P5,P6                P5,P6
TASK_ID              EFFECTIV STATUS     START_DT END_DT      MINUTES AVG_MINUTES PREDECESSOR_TASKS    BLOCKING_TASKS
-------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1                   22.08.08 DONE       29.06.13 29.06.13       2,05  2,03541667
P2                   22.08.08 DONE       29.06.13 29.06.13 2,33333333  2,33333333
P3                   22.08.08 DONE       29.06.13 29.06.13 4,16666667  4,16666667
P4                   22.08.08 DONE       29.06.13 29.06.13 2,66666667       5,175 P1,P2
P5                   22.08.08 DONE       29.06.13 29.06.13 1,66666667       1,675 P3
P6                   22.08.08 ACTIVE     29.06.13          ,883333333           5 P4
P7                   22.08.08                                          5,15833333 P5,P6                P6
TASK_ID              EFFECTIV STATUS     START_DT END_DT      MINUTES AVG_MINUTES PREDECESSOR_TASKS    BLOCKING_TASKS
-------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1                   22.08.08 DONE       29.06.13 29.06.13       2,05  2,03541667
P2                   22.08.08 DONE       29.06.13 29.06.13 2,33333333  2,33333333
P3                   22.08.08 DONE       29.06.13 29.06.13 4,16666667  4,16666667
P4                   22.08.08 DONE       29.06.13 29.06.13 2,66666667       5,175 P1,P2
P5                   22.08.08 DONE       29.06.13 29.06.13 1,66666667       1,675 P3
P6                   22.08.08 DONE       29.06.13 29.06.13          5           5 P4
P7                   22.08.08 ACTIVE     29.06.13          ,133333333  5,15833333 P5,P6

List of remaining tasks

SELECT 
  TASK_ID 
, EFFECTIVE_DATE 
, STATUS 
, to_char(START_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT 
, to_char(END_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT 
, END_DT 
, MINUTES 
, AVG_MINUTES 
, PREDECESSOR_TASKS 
, BLOCKING_TASKS 
FROM BWTA_V_LOG_TASK_WIDE 
where HEAP_SEQ=-1
AND NVL(STATUS,'#') not in ('DONE','SKIP')
ORDER BY  
  CASE STATUS WHEN 'ERROR' THEN 1  
              WHEN 'SUSPEND' THEN 2 
              WHEN 'ACTIVE' THEN 3 
              ELSE 4 
  END  
, TASK_ID  

Active operational tasks

A restart of workflow after the stop

After the change of metadata

Begin
  BWTA_OPER.WakeupThread;
  commit;
End;
/

Stop workflow

Stops running new processes, and keeps current tasks to be finished.

WITH LRL AS
  (
    SELECT --+materialize
      HEAP_SEQ
    , MAX(SEQ) ROUND_SEQ
    FROM BWTA_LOG_ROUND
    GROUP BY HEAP_SEQ
  )
,STAT AS
  (
    SELECT 
      T.HEAP_SEQ 
     ,NVL(LT.STATUS,'WAIT') AS STATUS
     ,LRL.ROUND_SEQ
     ,COUNT(1) CNT
    FROM BWTA_TASK T 
    JOIN LRL ON LRL.HEAP_SEQ=T.HEAP_SEQ 
    LEFT JOIN BWTA_LOG_TASK LT ON LT.TASK_SEQ=T.SEQ AND LT.ROUND_SEQ=LRL.ROUND_SEQ
    GROUP BY T.HEAP_SEQ 
     ,NVL(LT.STATUS,'WAIT')
     ,LRL.ROUND_SEQ
   )
,L1 as(   
    SELECT 
       H.SEQ   AS HEAP_SEQ
      ,H.ID    AS HEAP_ID
      ,H.NOTE  AS HEAP_NOTE
      ,LR.START_DT
      ,LR.END_DT
      ,S.STATUS
      ,S.CNT
    FROM BWTA_HEAP H
    JOIN STAT S ON S.HEAP_SEQ=H.SEQ
    JOIN BWTA_LOG_ROUND LR on LR.SEQ=S.ROUND_SEQ  
)
Select * from L1
PIVOT
(SUM(CNT) FOR STATUS IN (
  'DONE' AS Done_cnt
 ,'WAIT' AS Wait_cnt
 ,'ACTIVE' AS Active_cnt
 ,'SUSPEND' AS Suspend_cnt
 ,'ERROR' AS Error_cnt
 ,'SKIP' AS Skip_cnt
))
 SEQ  ROUND_SEQ STATUS     START_DT              TASK_SEQ ID           COMMAND           ERROR_MSG            
---- ---------- ---------- ------------------- ---------- ------------ ----------------- -----------
  46         20 ACTIVE     29-06-2013 16:49:24         13 P1                                                  
  47         20 ACTIVE     29-06-2013 16:49:27         14 P2                                                  
  48         20 INACTIVE                                                                                      
  49         20 SLEEP                                                                                         
  50         20 SLEEP                                                                                         
Begin
  BWTA_OPER.stop;
End;
/

Restart workflow after stop

Begin
  BWTA_OPER.ReleaseThreads;
End;
/

Remove inconsistency between system and metadata

Check orphans.

Begin
  BWTA_OPER.checkOrphans;
End;
/

Add threads into the processing

The number of additional threads should be specified.

Begin
  BWTA_OPER.addThreads(<number of threads>);
End;
/

Reduce the number of threads

Deletes the required number of sleeping threads. If there are not enough sleeping threads next will be marked with the STOP command.

Begin
  BWTA_OPER.remThreads(<number of threads>);
End;
/

The success story of mentioned solution

Despite the perception that sophisticated parallelizing solutions are more suitable for stable ETL processes rather than one-time migration processes, we successfully used the SIMPLETASK solution to migrate historical data from four country-based sites. The migration was challenging due to slight differences in the data across sites and in time.

We developed and tested the migration processes on data samples, but we expected runtime problems given the complexity of the migration. With the heavy parallelization provided by the SIMPLETASK solution, the failover effect resulted in impressive outcomes:

  • Two developers were able to permanently solve data inconsistencies that would have been a showstopper in most other situations.
  • The failover effect of the SIMPLETASK solution ensured that not a single minute was lost on the critical path of the migration.

Overall, the success of our one-time migration project highlights the robustness and versatility of the SIMPLETASK solution, which can be used in a variety of scenarios beyond traditional ETL processes.

Conclusion

In conclusion, the SIMPLETASK solution offers a simple yet powerful method for implementing ETL or ELT processes based on PL/SQL modules, with a focus on scheduling the execution of atomic modules according to internal dependencies and other rules. Unlike enterprise-wide schedulers or standard ETL tools, the SIMPLETASK solution provides fine-grained control over dependencies and offers scalable solutions for data migration or ETL processes. The solution’s success has been demonstrated in the context of a challenging data migration project, where the solution’s parallelization capabilities ensured that not a single minute was lost on the critical path of the migration, and developers were able to solve data inconsistencies that would have been showstoppers in most other situations. Ultimately, the SIMPLETASK solution is a valuable addition to the ETL and data migration toolkit, providing a robust and versatile option for organizations seeking to optimize their data processing workflows.

Despite I consider it almost incredible, if you decide to remove the solution from your system, I add a script to drop all the created database objects belonging to the SIMPLETASK solution above: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_drop.sql.