We will shift our focus back to Lager-wise Real-time Operational data store, which as previously mentioned, is a more complex solution that may not deliver the desired level of real-time latency. Those unfamiliar with lager may wonder why it’s so complicated, just as some may question the complexity of Lager-wise ODS. However, despite its disadvantages, I believe this type of solution will soon become more prevalent, particularly in the business intelligence world where we aim to avoid extensive integration tasks with legacy and source layer application owners. Thanks to the Metadata-driven approach and ETL patterns, even seemingly “silly” approaches can be made feasible and efficient.
Lager-wise ODS brewing again
To successfully brew lager, patience, and humility are essential. It is an organic process that cannot be meticulously supervised at every stage, leading to feelings of redundancy and uncertainty. Early filtering is necessary to achieve a clean result. Accepting and understanding these contradictory feelings is crucial to success. Attempts to simplify the process have often resulted in failure, underscoring the complexity and importance of patience in brewing lager.
Gathering hops – first, we have to collect signals – events
All the bottom brewing is about gathering signals and driving processes based on them. There are no workflows starting tasks, no subscriptions, and no pushes. There is only CDC signaling that something changed in the source and giving us a piece of information about what has changed. These events (signals, hops) are the basis of the real-time Lager-wise processing. And because we work on the bottom level of information – on raw data – these hops will come in elementary granularity enveloped by low-level transaction frames of the source database.
The following picture shows the process from the information origination in the source toward the low-level data and back to the Operational data store.
To begin, we must collect hops, which can be accomplished through various means such as CDC or trigger-based integration. Despite the abundance of information available on these methods, the end result is always the same: we have hops. These are elementary changes separated from the plant stem and aggregated only by transactions at best. It is essential to note that we are not dealing with a mirror copy of the source system where we can materialize the hops into tables and use them at will. We are creating an operational data store that requires some level of transformation to consolidate raw data into a state that can be used by real-time services with strict SLA. Services cannot be expected to access shattered data and consolidate it on demand. While we may be able to tolerate data that is 20 seconds old, we require it to be returned in milliseconds.
So, what is the solution?
We have to get logical information from all these low-level changes received from the CDC process. The question is more about how to do that. Let’s call the process fermenting. And fermenting needs time, especially in the designing process. It is an organic process so we have to forget about some ideas from synthetic thinking:
Forget about:
- Reverse process construction – there is no analytical apparatus able to revert the business logic of source transformation automatically with all details. It could be done for particular cases manually, but it would be very expensive even in small solutions and twice more solving changes in the business logic of source systems. These things should be solved in numeric.
- Considering the exact order of changes – there is neither time nor energy for that.
- Aggregating signal by source logical event – Admit that each hop can cause a separate event despite ultimately leading to a single transformation, as the hops are now separated from the stem.
- Eventual consistency – It’s difficult to keep track of source changes, which can result in a transformation starting before all events have arrived, leading to incomplete changes being made. Then, when the remaining change data arrives, the transformation is triggered again, considering the remaining part of the changes. This is especially true for extra-transaction processes.
The solution is slightly “dirty” but organic, and we need to embrace this reality.
Processing hops – L0 layer and what do we need it for
If you have ever attempted to reconstruct information based on partial changes, then you understand the importance of maintaining persistent state information about those objects. Changes do not necessarily occur across all parts simultaneously, so it is necessary to merge changing information with the last known state of unchanged information. In the world of topics, this is referred to as “compaction,” where the last state of each object is retained.
Your initial task is to update the persistent mirrored L0 layer, which has almost the same structure as the source, based on change records. This part may seem easy, and many CDC solutions often only address this task, with salespeople touting the simplicity of the implementation. However, it is not as easy as having a piece of cake, especially after a few pints of lager.
There are several important issues that need to be addressed in the process going forward:
- The implementation of “soft delete”, which involves keeping deleted records in the game marked as deleted, is necessary for the fermentation process to work. The minimum amount of history you need to keep will be discussed further in the Identification phase, but trust me when I say that it’s necessary. While it may be tempting to forgo this step, it is crucial for the success of the fermentation process.
- While the order of changes is not assured in later phases, it is essential to ensure that the L0 layer synchronization and generation of “identification events” are transaction safe and ordered in this phase. Older changes should not override newer ones when updating the L0 layer, and no identification event should occur before the L0 table is updated based on that hop. This is the only place in the fermentation process where the order of changes matters.
- Identification is a sub-process of the event-driven process where tables in higher layers need to be updated based on transformed data from lower layers. Fine-grain identification is necessary as we cannot update all the tables whenever something changes. We need to identify all records of target tables that could be hypothetically influenced by the source change. This process will be described in detail below.
- Early filtering of L0 is necessary to filter out unnecessary changes that the CDC process may push. This can happen when source system processes do “idle updates” or when only a subset of columns from a broad table is taken, and changes appear in columns that are not included.
Example:
For instance, we once used a Users table as a source to convert an internal user key into a more readable code. We only needed two columns: the key and the code. However, we overlooked a third column containing the timestamp of the user’s last login, which meant that every time a user logged in, the record changed and our CDC process picked up on it. Although this was a small table and didn’t cause any issues at first, the user information was used in many records across other tables. This caused a cascading effect, with every login leading to the identification and attempted updates of all related records. To avoid this, we learned to apply early filtering as soon as possible in the fermentation process.
- Using adapted hops for identification is a challenging task. We may know how the source information has changed, but identifying the records in the target table that could be affected by that change is a complex process that involves computing keys. This process can involve changes to particular expressions, as well as changes to filtering, join conditions, deduplication, aggregation, and more, all of which can have recursive effects. In the analytical world, it is almost impossible to predict what actions will occur at the target after a certain action is taken at the source. For example, inserting a record in the source may cause a record to be deleted in the target, or changes to columns used in join conditions may result in some records being deleted and others being inserted or updated. To tackle this problem, I recommend starting with the transformation mapping and approaching the task methodically, using numerical methods where possible. Although it may be challenging, it is essential to be humble and carefully consider all possible options, including the use of “early pruning,” which is a metadata variant of early filtering and can be done at the metadata level.
Identification
Currently, we are in the early stages of the fermentation process. After creating the bottom layer (L0) using fresh hops, we need to modify them for identification. This involves computing all target keys of records that could potentially be impacted by the source change. Each adapted hop can be used multiple times since the source table can appear in several transformations. The process can be thought of as a multi-consumer queue of hops.
However, when attempting to use a standard queue solution for this process, we encountered issues when the number of queues exceeded a certain critical point. While it worked fine during the Proof of Concept phase, it proved inadequate for real traffic and hit the unspoken limits of the solution. As a recommendation, it is best to focus on the necessary functionality and create a custom, lightweight queue solution instead.
To facilitate the identification process, there are several steps that can be taken:
- First, ensure that the L0 record is updated and that the hop adaptation is done within the same transaction, but be cautious not to slow down the L0 updating process too much.
- Multiplication and adaptation involve multiplying the hops into separate sub-queues, one for each transformation, which makes it easier to manage the queue as a multi-consumer one. This also allows for the use of mapping information to select significant columns for certain mappings and to perform early filtering to reduce unchanged records. Additionally, the hops can be cut into two symmetrical parts, the old and the new state, for the identification process, as both states need to be processed in the same way.
- The use of metadata is necessary for the multiplication and early filtering processes, as it requires formalized metadata to generate two matrices:
- the source table x transformation matrix
- the source column x transformation matrix.
- However, not all metadata systems have the necessary granularity for the latter matrix.
Consider the transformation labeled as T1. This transformation requires hops from three source tables, namely SOURCE_A, SOURCE_B, and SOURCE_C. However, SOURCE_C is used twice in this transformation, leading to four distinct sub-queues that are specifically intended for T1. Therefore, we need to create four separate identification statements for T1, and we will approach the problem numerically. To begin, let us focus on the first source table, SOURCE_A, which has the following columns:
- ID
- PARENT_ID
- NM
- DS
- SOMETHING
- X
- Y
- Z
- STATUS
And of course the technical
- DELETED_FLAG
Step 1 – Substitute the certain source with the hop
To identify both the previous and new state of records, we need to use a query without the “deleted flag” limitations and consider both valid and deleted records of the sources. Soft delete is crucial to work with records that have been deleted from their sources, as it helps us to perform the delete operation on the target. We will use the HOP_T1_a queue (hops of the SOURCE_A for transformation T1) instead of the SOURCE_A table for the identification process. In the actual process, the queue has its dynamics in real-time and needs to be solved accordingly. As mentioned in the previous step, only differences in ID, NM, DS, SOMETHING, X, and DELETED_FLAG will be considered significant to fill records into the HOP_T1_a queue. It is important to test the metadata granularity to ensure that the necessary information can be generated automatically, as doing it manually would be extremely time-consuming. We only kept the necessary primary key of the table in the result set.
The next section may be difficult to comprehend, so I will provide an SQL query example to illustrate specific steps. The example will not involve complicated processes such as cascading queries, de-duplication, or aggregation. It will only cover the fundamental principles to help avoid confusion. In case the example is still unclear, perhaps a few pints of lager will aid in understanding.
Shall we examine the original query that describes the transformation?
select
a.ID as TARGET_ID --pk of the target table
,b.NM||'.'||a.NM|| as TARGET_NAME
,a.DS as TARGET_DESC
,trim(a.SOMETHING) as TARGET_VALUE_A
,b.X+nvl(c2.X,0) as TARGET_VALUE_B
,a.X+nvl(c1.X,0) as TARGET_VALUE_C
,b.SOMETHING as TARGET_VALUE_D
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID
left join SOURCE_C c1 on c1.OBJ_ID = a.ID
left join SOURCE_C c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
As we previously discussed, we will utilize the “soft delete” approach in our solution. Therefore, the actual query describing the transformation, which includes the logic for handling soft deleted data, would appear as follows:
select
a.ID as TARGET_ID --pk of the target table
,b.NM||'.'||a.NM|| as TARGET_NAME
,a.DS as TARGET_DESC
,trim(a.SOMETHING) as TARGET_VALUE_A
,b.X+nvl(c2.X,0) as TARGET_VALUE_B
,a.X+nvl(c1.X,0) as TARGET_VALUE_C
,b.SOMETHING as TARGET_VALUE_D
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID and b.DELETED_FLAG != 'Y'
left join SOURCE_C c1 on c1.OBJ_ID = a.ID and c1.DELETED_FLAG != 'Y'
left join SOURCE_C c2 on c2.OBJ_ID = b.ID and c2.DELETED_FLAG != 'Y'
where b.STATUS != 'DUMMY'
and a.DELETED_FLAG != 'Y'
from SOURCE_A a will be replaced by from HOP_T1_a a
select
a.ID as TARGET_ID --pk of the target table
from HOP_T1_a a
join SOURCE_B b on b.ID = a.PARENT_ID
left join SOURCE_C c1 on c1.OBJ_ID = a.ID
left join SOURCE_C c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
Step 2 – Reduce unnecessary parts of the query (Early pruning)
It can be risky to include a WHERE condition in a query, as it may affect the intended result set. For instance, an anti-join could influence the outcome. The best approach is to remove the WHERE condition altogether. Alternatively, you can evaluate the condition and look for indications of anti-conditions, such as NOT IN. It’s advisable to provide some kind of hint or pattern directive to help with the final decision. In this instance, the generator has determined that the WHERE condition is not problematic, so it has been retained.
Early pruning involves removing unnecessary parts of the query. This is a complex process that I won’t delve into here. Either you can imagine it or you wouldn’t understand it regardless. Perhaps in a subsequent article, I can explore it in more depth.
The join c1 will be removed because it is a left join that does not impact the table, and its attributes are not used in any key, where condition, or remaining join condition.
Similarly, c2 will also be removed for the same reason.
However, b will not be removed because it is an inner join with filtering capabilities. This avoids complicating deduplication, aggregation, analytical functions, or cascading queries.
select
a.ID as TARGET_ID --pk of the target table
from HOP_T1_a a
join SOURCE_B b on b.ID = a.PARENT_ID
where b.STATUS != 'DUMMY'
As it has been shown in the picture above, we will do the same for SOURCE_B and for SOURCE_C twice (c1 and c2).
I won’t describe it here for each source, just for the c2. It is the last one and along with the c1 one it is slightly different. because of the left join and relative isolation (what makes things easier, not get mistaken by that. Outer joins can be much more complicated impacting keys, used as anti-joins, necessary for further joins etc.)
So let’s say, the SOURCE_C table has the following columns:
- OBJ_ID
- STATUS
- X
- DELTED_FLAG
As I have described in the previous step of multiplication, only differences of OBJ_ID, X, and DELETED_FLAG will be considered as significant to fill records into this particular queue HOP_T1_c2
left join SOURCE_C c2 will be replaced by left join HOP_T1_c2 c2
select
a.ID as TARGET_ID --pk of the target table
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID
left join SOURCE_C c1 on c1.OBJ_ID = a.ID
left join HOP_T1_c2 c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
There are additional considerations in this scenario. Apart from removing the c1 line since it’s a left join without causal relationships, we can also convert the LEFT JOIN to a JOIN. This is because we will only be working with the records affected by the queue, so the join can now serve as a filter.
select
a.ID as TARGET_ID --pk of the target table
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID
join HOP_T1_c2 c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
Step 3 – Mix it together
Now there is time to mix all these identifiers for the transformation and reduce multiplicities. Then the set of identifiers can be used to run the fine-grain transformation.
Step 4 – Final touch
We are very close to the end of the process. With the identifiers we have, we can determine which records should be updated. The appearance of these identifiers in the ID queue indicates that these records are eligible for modification. However, it does not specify what type of modification is required, whether it be insertion, updating, or deletion. Fortunately, we are accustomed to using MERGE statements, which enable us to handle this situation without having to worry about whether we should insert or update the records. So, it’s a straightforward process.
What is not easy is the DELETE operation.
The operation that should be performed on a record depends solely on the transformation, and anything can change. However, I’ve held back the reason for implementing soft delete from the beginning of this article. The process of identification includes both old and new states, as well as valid and deleted records.
But what happens if the transformation does not return identified records? In this case, all records in the target table identified by the ID queue but not returned by the transformation query must be deleted.
The picture above displays the following sets of interests (with no regard for anything beyond our ID queue):
- INSERT: Records identified in the transformation but not in the target table should be inserted.
- UPDATE: Records identified in both the transformation and the target table should be updated. We only update them if they differ (the last stage of early filtering). However, there may be “soft deleted” records that need to be updated to the “undeleted” state (operation REVIVE).
- DELETE: Records identified in the target table but not in the transformation should be deleted (soft deleted). This occurs when something has happened to the source, and the records are no longer returned by the transformation.
- IGNORE?: Records identified in neither the transformation nor the target table may be either ignored or inserted as soft deleted. Both options are possible, but the existence of these records means that they would have been deleted previously. However, the reason for their disappearance before the initial load is usually insignificant, and the time of the initial load typically has no business meaning. Therefore, I recommend inserting records of this set as “soft deleted” records, as it will pay off during further integration.
Delete processing
We utilize the ID queue and perform a left outer join with the transformation. By doing so, we are able to obtain all the identified records (i.e., also all the identifiers for deleted candidates) along with the DELETE_FLAG which indicates their presence or absence in the transformation.
Step 5 – Identified target transformation
The final example pattern will give us a taste of the end result. It may be a bit rough and require refinement, but it will provide a big picture of the larger brewing process. The core of this process involves a join between our “ID queue” ID_T1 and the transformation T1.
with T1 as(
select
a.ID as TARGET_ID --pk of the target table
,b.NM||'.'||a.NM|| as TARGET_NAME
,a.DS as TARGET_DESC
,trim(a.SOMETHING) as TARGET_VALUE_A
,b.X+nvl(c2.X,0) as TARGET_VALUE_B
,a.X+nvl(c1.X,0) as TARGET_VALUE_C
,b.SOMETHING as TARGET_VALUE_D
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID and b.DELETED_FLAG != 'Y'
left join SOURCE_C c1 on c1.OBJ_ID = a.ID and c1.DELETED_FLAG != 'Y'
left join SOURCE_C c2 on c2.OBJ_ID = b.ID and c2.DELETED_FLAG != 'Y'
where b.STATUS != 'DUMMY'
and a.DELETED_FLAG != 'Y'
)
select
nvl(ID_T1.TARGET_ID,T1.TARGET_ID) as TARGET_ID --that way we get ID despite of subset
,T1.TARGET_NAME
,T1.TARGET_DESC
,T1.TARGET_VALUE_A
,T1.TARGET_VALUE_B
,T1.TARGET_VALUE_C
,T1.TARGET_VALUE_D
,case when T1.TARGET_ID is null then 'Y' else 'N' end as DELETED_FLAG
from ID_T1 --surprise, we start with the identification because it is leading in that case
left join T1
on T1.TARGET_ID = ID_T1.TARGET_ID -- and of course left join, we need these deleted records
Short explanation:
We use the ID queue and left outer join the transformation. That way we get all identified records (all identifiers for deleted candidates) and the DELETE_FLAG indicating (non)presence in the transformation.
Now we have up-to-date data in the target tables.
Now we can taste the Lager.
Oops, too late!
OtherS in the series
-
Operational Data Store: A Perfect Blend of Data and Brewing Science – Chapter 1
What do beer brewing and Operational Data Store data integration have in common?