When implementing ETL or ELT processes using PL/SQL modules, scheduling the execution of atomic modules with internal dependencies and specific rules is often a crucial task. However, enterprise-wide schedulers may not meet the requirements for fine-grained dependencies and rules. As a result, local ETL schedulers are often used to trigger ETL batch processes. Unfortunately, many of the built-in schedulers in standard ETL tools are too inflexible and cannot scale effectively. To address this, I have developed a solution called SIMPLETASK, which is straightforward to use despite its powerful capabilities. The following is a basic version of the solution, which is intended to help users understand the principles and easily incorporate them into simpler ETL or migration processes.
SIMPLETASK
The solution of the scheduling process comes from the standard network graph task with resources. It is possible to share a lot of methods with another solution (Finding a critical path etc.)
- Heap (Batch) – defines a closed group of processes run together and driven by internal dependencies.
- Task (Process) – defines atomic module to be run.
- Predecessor – defines a dependency between two modules, the second one should not be run before the first one is completed.
- Exclusion – defines a dependency between two modules, any of them should not be run when another process is in progress. (Note – in our solution exclusions are solved using resources)
- Resource – defines limited resources consumed by running processes.
- Resource consumption – defines consumption of resources by a particular process.
- Round (Run) – defines one cycle of execution, for example, a processed day of daily process.
Resources and exclusions
Resource-based exclusion: This approach involves defining a resource that is consumed by a process while it is running. When another process needs to run, it checks if the resource is available. If the resource is not available, the process is put on hold until the resource is freed up. This approach ensures that two processes do not run at the same time if they require the same resource.
The previous example demonstrated dependencies between processes using both predecessor and exclusion types. However, the following example replaces the exclusion dependency with resources. Resources provide two options: full exclusion (100% consumption) or limiting the number of large modules processed simultaneously (50% consumption allows up to 2 such processes to be processed at the same time).
State diagram of a task during runtime
All tasks start with the “NONE”, i.e. they have no record in the runtime table for the current round.
When there are neither PREDECESSOR nor RESOURCE limitations, the re is a thread available, it chooses the task for execution. It checks its Exec and Skip conditions and decides on action:
- Exec is false – WAIT
- Exec is true and Skip is true – SKIP
- Exec is true and Skip is false – ACTIVE (start it)
D O W N L O A D S I M P L E S O L U T I O N
o run your ETL or Data Migration tasks using the SIMPLETASK solution, you can download two files: one for the metadata structure and the other for the PL/SQL packages. Here are the steps to install the solution:
- Create a database user (e.g. BWTA_OWNER).
- Allocate database space quota to store the metadata. Note that extensive space is not required.
- Grant the following privileges to the user:
- CREATE SESSION
- CREATE TABLE
- CREATE TRIGGER
- CREATE VIEW
- CREATE SEQUENCE
- CREATE JOB
- CREATE PROCEDURE
- EXECUTE ON DBMS_LOCK.
- Run the first script download to install the metadata structures: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_metadata.sql.
- Run the second script download to install the packages: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_packages.sql.
- Run the third script download to install the views: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_views.sql.
METADATA MODEL
Metadata structure description:
BWTA_TASK_RES | Relation between processes – predecessor specification | ||
TASK_SEQ | NUMBER | 1 | Sequence key of task |
RES_SEQ | NUMBER | 2 | Sequence key of resource |
AMOUNT | NUMBER | O | Consumption of specified resource by the process |
BWTA_TASK_REL | Relation between processes – predecessor specification | ||
SEQ1 | NUMBER | 1 | Sequence key of dependent task |
SEQ2 | NUMBER | 2 | Sequence key of the predecessor task |
SKIP_FLAG | NUMBER(1) | O | Skip flag to disable dependency |
BWTA_TASK | Process (task) to be run | ||
SEQ | NUMBER | 1 | Sequence key of task |
HEAP_SEQ | NUMBER | M | Sequence key of heap |
ID | VARCHAR2(100) | M | ID of process |
NOTE | VARCHAR2(500) | O | Description of task |
EXEC_COND | VARCHAR2(2000) | O | Condition code of execution to block task based on external condition |
SKIP_COND | VARCHAR2(2000) | O | Condition code of skip to skip task based on external condition |
EXEC_FLAG | NUMBER | O | Flag of execution to block task |
SKIP_FLAG | NUMBER | O | Flag of skip to skip task |
EXEC_CODE | VARCHAR2(2000) | O | Execution PL/SQL code |
PRIORITY | NUMBER | M | Priority of execution (less is more important) |
AVG_DURATION | NUMBER | O | Average duration of process based on statistics |
CNT_DURATION | NUMBER | O | Count of runs involved in statistics |
BWTA_RES | Resource definition | ||
SEQ | NUMBER | 1 | Sequence key of resource |
ID | VARCHAR2(100) | O | ID of resource |
NOTE | VARCHAR2(500) | O | Description of resource |
AMOUNT | NUMBER | M | Total amount of resource (e.g. 100 [%]) |
PENDING | NUMBER(1) | M | 0..resource released after end of process, 1..resource should be released by negative consumption |
BWTA_LOG_THREAD | Thread of execution | ||
SEQ | NUMBER | 1 | Sequence key of thread |
ROUND_SEQ | NUMBER | O | Sequence key of round |
STATUS | VARCHAR2(10) | O | Status of thread (ACTIVE, INACTIVE, SLEEP) |
TASK_SEQ | NUMBER | O | Sequence key of active task for the ACTIVE status |
START_DT | DATE | O | Start date of current activity |
COMMAND | VARCHAR2(100) | O | command for thread – STOP to stop thread after current activity |
ERROR_MSG | VARCHAR2(2000) | O | Error message for the ERROR status |
BWTA_LOG_TASK_H | Process execution log | ||
TASK_SEQ | NUMBER | O | Sequence key of process |
ROUND_SEQ | NUMBER | O | Sequence key of round |
STATUS | VARCHAR2(10) | O | Status of process (ACTIVE, ERROR, DONE, SUSPEND, SKIP, WAIT) |
START_DT | DATE | O | Start date and time of process execution or time to wait for |
END_DT | DATE | O | End date and time of process execution |
ERROR_MSG | VARCHAR2(4000) | O | Error message for the ERROR status |
TS | TIMESTAMP(6) | O | Timestamp of the change |
BWTA_LOG_TASK | Process execution log | ||
TASK_SEQ | NUMBER | 1 | Sequence key of process |
ROUND_SEQ | NUMBER | 2 | Sequence key of round |
STATUS | VARCHAR2(10) | O | Status of process (ACTIVE, ERROR, DONE, SUSPEND, SKIP, WAIT) |
START_DT | DATE | O | Start date and time of process execution or time to wait for |
END_DT | DATE | O | End date and time of process execution |
ERROR_MSG | VARCHAR2(4000) | O | Error message for the ERROR status |
BWTA_LOG_ROUND | Round of the batch execution | ||
SEQ | NUMBER | 1 | Sequence key of round |
HEAP_SEQ | NUMBER | M | Sequence key of heap |
EFFECTIVE_DATE | DATE | O | Effective date of round |
START_DT | DATE | O | Start date and time of round |
END_DT | DATE | O | End date and time of round – indicates completed rounds |
BWTA_LOG_METADATA | |||
SEQ | NUMBER | 1 | Primary key of changed record |
SEQ2 | NUMBER | 2 | Optional extension of key of changed record |
DT | TIMESTAMP(6) | 3 | Timestamp of realized change |
OPERATION | CHAR(1) | 4 | Operation (I,U,D) |
OLD_VAL | XMLTYPE | O | Old value of record XML element |
NEW_VAL | XMLTYPE | O | New value of record XML element |
TAG | VARCHAR2(100) | O | |
TABLE_NAME | VARCHAR2(30) | M | Table of the change |
BWTA_LOG_ERR | Log of all the realized failover actions | ||
ROUND_SEQ | NUMBER | 1 | Round when it happened |
TASK_SEQ | NUMBER | 2 | Sequence key of Task |
ERR_ID | NUMBER | 3 | Error ID |
DT | TIMESTAMP(6) | 4 | Timestamp when it happened |
BWTA_HEAP | Batch of processes to be run | ||
SEQ | NUMBER | 1 | Sequence key of batch, -1 is default one |
ID | VARCHAR2(100) | O | ID of batch |
NOTE | VARCHAR2(500) | O | Description of batch |
STAT_ROUND_SEQ | NUMBER | M | Last round the statistics has been gathered |
BWTA_ERR | Maintained errors | ||
ID | NUMBER | 1 | ORA error number |
ATMPT_CNT | NUMBER | O | Number of attempts |
DELAY_DAY | NUMBER | O | Delay specified as a fragment of day |
PROLONG_KOEF | NUMBER | O | Koefficient of prolongation of each next attempt |
DIVERS_MOD | NUMBER | O | Modulo used for diversification of particular processes |
Example
In the following example of filling metadata, we will use processes from the network graph above.
Begin DBMS_LOCK.sleep([duration]);end;
will be used instead of real modules to simulate the duration of processes.
BEGIN
BWTA_METADATA.SETRES('R1','Resource 1 - target partition',100,0,'INIT');
BWTA_METADATA.SETTASK('P1', 'Task1', 'begin DBMS_LOCK.sleep(120);end;', 50, 'INIT');
BWTA_METADATA.SETTASK('P2', 'Task2', 'begin DBMS_LOCK.sleep(140);end;', 50, 'INIT');
BWTA_METADATA.SETTASK('P3', 'Task3', 'begin DBMS_LOCK.sleep(250);end;', 50, 'INIT');
----
BWTA_METADATA.SETTASK('P4', 'Task4', 'begin DBMS_LOCK.sleep(160);end;', 50, 'INIT');
BWTA_METADATA.SETTASKREL('P4','P1',-1,0,'INIT'); --dependency P4 on P1
BWTA_METADATA.SETTASKREL('P4','P2',-1,0,'INIT'); --dependency P4 on P2
BWTA_METADATA.SETTASK('P5', 'Task5', 'begin DBMS_LOCK.sleep(100);end;', 50, 'INIT');
BWTA_METADATA.SETTASKREL('P5','P3',-1,0,'INIT'); --dependency P5 on P3
BWTA_METADATA.SetTaskRes('P5',-1,'R1',100,'INIT'); --Task P5 requires resource R1 (amount 100)
----
BWTA_METADATA.SETTASK('P6', 'Task6','begin DBMS_LOCK.sleep(300);end;', 50, 'INIT');
BWTA_METADATA.SETTASKREL('P6','P4',-1,0,'INIT'); --dependency P6 on P4
BWTA_METADATA.SetTaskRes('P6',-1,'R1',100,'INIT'); --Task P5 requires resource R1 (amount 100)
----
BWTA_METADATA.SetTask('P7', 'Task7', 'begin DBMS_LOCK.sleep(309);end;', 50, 'INIT');
BWTA_METADATA.SETTASKREL('P7','P5',-1,0,'INIT'); --dependency P7 on P5
BWTA_METADATA.SetTaskRel('P7','P6',-1,0,'INIT'); --dependency P7 on P6
Commit;
end;
/
Now we can start the daily process using the following command:
BEGIN
bwta_oper.startround(date'2023-02-22');
END;
/
The process is started in 5 parallel threads for an effective date of 22-FEB-2023. We can operatively check execution from runtime metadata. The following list contains snapshots taken during the execution from the very start to the end. Column BLOCKING_TASKS hints at what are particular tasks waiting for.
Security architecture
The most common tasks of workflow management
Monitoring of current processes
SELECT
TASK_ID
, EFFECTIVE_DATE
, STATUS
, to_char(START_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT
, to_char(END_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT
, END_DT
, MINUTES
, AVG_MINUTES
, PREDECESSOR_TASKS
, BLOCKING_TASKS
FROM BWTA_V_LOG_TASK_WIDE
WHERE HEAP_SEQ=-1
ORDER BY
CASE STATUS WHEN 'ERROR' THEN 1
WHEN 'SUSPEND' THEN 2
WHEN 'ACTIVE' THEN 3
WHEN 'DONE' THEN 5
WHEN 'SKIP' THEN 5
ELSE 4
END
, TASK_ID
Error treatment the first column returns a command to restart the process
SELECT 'BEGIN BWTA_OPER.restartTask('''||TASK_ID||''');END;' as CMD
, ERROR_MSG
FROM BWTA_V_LOG_TASK_WIDE
WHERE status = 'ERROR'
ORDER BY START_DT
List of threads of the current round(s)
SELECT b.seq
, b.round_seq
, b.status
, TO_CHAR(d.start_dt, 'DD-MM-YYYY HH24:MI:SS') start_dt
, b.task_seq
, c.id
, b.command
, b.error_msg
FROM BWTA_LOG_THREAD B
LEFT JOIN BWTA_TASK C
ON B.TASK_SEQ = C.SEQ
LEFT JOIN BWTA_LOG_TASK d
ON d.task_seq = c.seq
AND d.round_seq = b.round_seq
ORDER BY b.seq
Number of processes by status (done, to be done, error)
SELECT
TASK_ID
, EFFECTIVE_DATE
, STATUS
, START_DT
, END_DT
, MINUTES
, AVG_MINUTES
, PREDECESSOR_TASKS
, BLOCKING_TASKS
FROM BWTA_V_LOG_TASK_WIDE
WHERE HEAP_SEQ=-1
TASK_ID EFFECTIV STATUS START_DT END_DT MINUTES AVG_MINUTES PREDECESSOR_TASKS BLOCKING_TASKS -------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1 22.08.08 ACTIVE 29.06.13 ,133333333 2,03541667 P2 22.08.08 ACTIVE 29.06.13 ,05 2,33333333 P3 22.08.08 ACTIVE 29.06.13 ,033333333 4,16666667 P4 22.08.08 5,175 P1,P2 P1,P2 P5 22.08.08 1,675 P3 P3 P6 22.08.08 5 P4 P4 P7 22.08.08 5,15833333 P5,P6 P5,P6
TASK_ID EFFECTIV STATUS START_DT END_DT MINUTES AVG_MINUTES PREDECESSOR_TASKS BLOCKING_TASKS -------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1 22.08.08 DONE 29.06.13 29.06.13 2,05 2,03541667 P2 22.08.08 DONE 29.06.13 29.06.13 2,33333333 2,33333333 P3 22.08.08 ACTIVE 29.06.13 2,55 4,16666667 P4 22.08.08 ACTIVE 29.06.13 ,233333333 5,175 P1,P2 P5 22.08.08 1,675 P3 P3 P6 22.08.08 5 P4 P4 P7 22.08.08 5,15833333 P5,P6 P5,P6
TASK_ID EFFECTIV STATUS START_DT END_DT MINUTES AVG_MINUTES PREDECESSOR_TASKS BLOCKING_TASKS -------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1 22.08.08 DONE 29.06.13 29.06.13 2,05 2,03541667 P2 22.08.08 DONE 29.06.13 29.06.13 2,33333333 2,33333333 P3 22.08.08 DONE 29.06.13 29.06.13 4,16666667 4,16666667 P4 22.08.08 ACTIVE 29.06.13 2,01666667 5,175 P1,P2 P5 22.08.08 ACTIVE 29.06.13 ,166666667 1,675 P3 P6 22.08.08 5 P4 P4 P7 22.08.08 5,15833333 P5,P6 P5,P6
TASK_ID EFFECTIV STATUS START_DT END_DT MINUTES AVG_MINUTES PREDECESSOR_TASKS BLOCKING_TASKS -------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1 22.08.08 DONE 29.06.13 29.06.13 2,05 2,03541667 P2 22.08.08 DONE 29.06.13 29.06.13 2,33333333 2,33333333 P3 22.08.08 DONE 29.06.13 29.06.13 4,16666667 4,16666667 P4 22.08.08 DONE 29.06.13 29.06.13 2,66666667 5,175 P1,P2 P5 22.08.08 ACTIVE 29.06.13 1,56666667 1,675 P3 P6 22.08.08 5 P4 P7 22.08.08 5,15833333 P5,P6 P5,P6
TASK_ID EFFECTIV STATUS START_DT END_DT MINUTES AVG_MINUTES PREDECESSOR_TASKS BLOCKING_TASKS -------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1 22.08.08 DONE 29.06.13 29.06.13 2,05 2,03541667 P2 22.08.08 DONE 29.06.13 29.06.13 2,33333333 2,33333333 P3 22.08.08 DONE 29.06.13 29.06.13 4,16666667 4,16666667 P4 22.08.08 DONE 29.06.13 29.06.13 2,66666667 5,175 P1,P2 P5 22.08.08 DONE 29.06.13 29.06.13 1,66666667 1,675 P3 P6 22.08.08 ACTIVE 29.06.13 ,883333333 5 P4 P7 22.08.08 5,15833333 P5,P6 P6
TASK_ID EFFECTIV STATUS START_DT END_DT MINUTES AVG_MINUTES PREDECESSOR_TASKS BLOCKING_TASKS -------------------- -------- ---------- -------- -------- ---------- ----------- -------------------- ----------------P1 22.08.08 DONE 29.06.13 29.06.13 2,05 2,03541667 P2 22.08.08 DONE 29.06.13 29.06.13 2,33333333 2,33333333 P3 22.08.08 DONE 29.06.13 29.06.13 4,16666667 4,16666667 P4 22.08.08 DONE 29.06.13 29.06.13 2,66666667 5,175 P1,P2 P5 22.08.08 DONE 29.06.13 29.06.13 1,66666667 1,675 P3 P6 22.08.08 DONE 29.06.13 29.06.13 5 5 P4 P7 22.08.08 ACTIVE 29.06.13 ,133333333 5,15833333 P5,P6
List of remaining tasks
SELECT
TASK_ID
, EFFECTIVE_DATE
, STATUS
, to_char(START_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT
, to_char(END_DT,'DD.MM.YYYY HH24:MI:SS') AS START_DT
, END_DT
, MINUTES
, AVG_MINUTES
, PREDECESSOR_TASKS
, BLOCKING_TASKS
FROM BWTA_V_LOG_TASK_WIDE
where HEAP_SEQ=-1
AND NVL(STATUS,'#') not in ('DONE','SKIP')
ORDER BY
CASE STATUS WHEN 'ERROR' THEN 1
WHEN 'SUSPEND' THEN 2
WHEN 'ACTIVE' THEN 3
ELSE 4
END
, TASK_ID
Active operational tasks
A restart of workflow after the stop
After the change of metadata
Begin
BWTA_OPER.WakeupThread;
commit;
End;
/
Stop workflow
Stops running new processes, and keeps current tasks to be finished.
WITH LRL AS
(
SELECT --+materialize
HEAP_SEQ
, MAX(SEQ) ROUND_SEQ
FROM BWTA_LOG_ROUND
GROUP BY HEAP_SEQ
)
,STAT AS
(
SELECT
T.HEAP_SEQ
,NVL(LT.STATUS,'WAIT') AS STATUS
,LRL.ROUND_SEQ
,COUNT(1) CNT
FROM BWTA_TASK T
JOIN LRL ON LRL.HEAP_SEQ=T.HEAP_SEQ
LEFT JOIN BWTA_LOG_TASK LT ON LT.TASK_SEQ=T.SEQ AND LT.ROUND_SEQ=LRL.ROUND_SEQ
GROUP BY T.HEAP_SEQ
,NVL(LT.STATUS,'WAIT')
,LRL.ROUND_SEQ
)
,L1 as(
SELECT
H.SEQ AS HEAP_SEQ
,H.ID AS HEAP_ID
,H.NOTE AS HEAP_NOTE
,LR.START_DT
,LR.END_DT
,S.STATUS
,S.CNT
FROM BWTA_HEAP H
JOIN STAT S ON S.HEAP_SEQ=H.SEQ
JOIN BWTA_LOG_ROUND LR on LR.SEQ=S.ROUND_SEQ
)
Select * from L1
PIVOT
(SUM(CNT) FOR STATUS IN (
'DONE' AS Done_cnt
,'WAIT' AS Wait_cnt
,'ACTIVE' AS Active_cnt
,'SUSPEND' AS Suspend_cnt
,'ERROR' AS Error_cnt
,'SKIP' AS Skip_cnt
))
SEQ ROUND_SEQ STATUS START_DT TASK_SEQ ID COMMAND ERROR_MSG ---- ---------- ---------- ------------------- ---------- ------------ ----------------- ----------- 46 20 ACTIVE 29-06-2013 16:49:24 13 P1 47 20 ACTIVE 29-06-2013 16:49:27 14 P2 48 20 INACTIVE 49 20 SLEEP 50 20 SLEEP
Begin
BWTA_OPER.stop;
End;
/
Restart workflow after stop
Begin
BWTA_OPER.ReleaseThreads;
End;
/
Remove inconsistency between system and metadata
Check orphans.
Begin
BWTA_OPER.checkOrphans;
End;
/
Add threads into the processing
The number of additional threads should be specified.
Begin
BWTA_OPER.addThreads(<number of threads>);
End;
/
Reduce the number of threads
Deletes the required number of sleeping threads. If there are not enough sleeping threads next will be marked with the STOP command.
Begin
BWTA_OPER.remThreads(<number of threads>);
End;
/
The success story of mentioned solution
Despite the perception that sophisticated parallelizing solutions are more suitable for stable ETL processes rather than one-time migration processes, we successfully used the SIMPLETASK solution to migrate historical data from four country-based sites. The migration was challenging due to slight differences in the data across sites and in time.
We developed and tested the migration processes on data samples, but we expected runtime problems given the complexity of the migration. With the heavy parallelization provided by the SIMPLETASK solution, the failover effect resulted in impressive outcomes:
- Two developers were able to permanently solve data inconsistencies that would have been a showstopper in most other situations.
- The failover effect of the SIMPLETASK solution ensured that not a single minute was lost on the critical path of the migration.
Overall, the success of our one-time migration project highlights the robustness and versatility of the SIMPLETASK solution, which can be used in a variety of scenarios beyond traditional ETL processes.
Conclusion
In conclusion, the SIMPLETASK solution offers a simple yet powerful method for implementing ETL or ELT processes based on PL/SQL modules, with a focus on scheduling the execution of atomic modules according to internal dependencies and other rules. Unlike enterprise-wide schedulers or standard ETL tools, the SIMPLETASK solution provides fine-grained control over dependencies and offers scalable solutions for data migration or ETL processes. The solution’s success has been demonstrated in the context of a challenging data migration project, where the solution’s parallelization capabilities ensured that not a single minute was lost on the critical path of the migration, and developers were able to solve data inconsistencies that would have been showstoppers in most other situations. Ultimately, the SIMPLETASK solution is a valuable addition to the ETL and data migration toolkit, providing a robust and versatile option for organizations seeking to optimize their data processing workflows.
Despite I consider it almost incredible, if you decide to remove the solution from your system, I add a script to drop all the created database objects belonging to the SIMPLETASK solution above: https://github.com/bobjankovsky/metaswamp/blob/main/oracle/simpletask/bwta_drop.sql.