After taking a break for a few moments, let’s return to the topic of the Real-Time Operational Data Store. Brewing beer, whether it’s lager, ale, or stout, requires tasting to ensure it’s been brewed correctly. Similarly, we must consider the possibility of consistency loss in our system and prepare for it.
The first inconsistency arises during the initial state of the system before the first load. Consistency breaches and downtimes of particular services may also occur. We should not get too comfortable with the good taste of the first batch because the second one may turn out to be sour. Therefore, it’s important to be prepared for inconsistencies by conducting consistency tests, repairs, and hard repairs to our brewing process to avoid ending up with sour results.
In the reconciliation process, there is a connection between the equality of brewing methods. Despite lager brewing being more complex than brewing ale or stout, the reconciliation process is actually easier for lager brewing.
Why do we need a reconciliation? Checking the taste is crucial to brewing good beer, and taking corrective measures is necessary to achieve the desired taste consistently. Reconciliation serves several purposes:
- Quality testing – Real-time ETL works incrementally, which can lead to information loss due to lost signals, micro-batches, or extraordinary events. To ensure everything is in order, we need to test the solution, confirm data consistency, and detect and solve production issues. By using metadata of transformation, we can generate scripts that help us check for differences between data sets, show detailed differences, and identify the problem at the row level.
- Quality monitoring – It’s essential to periodically run a subset of the tasks mentioned above to alert us about inconsistent data states. This process should be automated using the same metadata.
- Quality restoring – Reconciliation can be either passive (informing and alerting) or active (repairing differences). Active reconciliation can include the task of initial load (hard reconciliation) and can be achieved non-invasively, reconciling data without any downtime and achieving a smooth reconciliation process.
All these ways can (and should) be generated from the same metadata of transformation and should be implemented in your solution to get a reliably good taste. The first question we have to answer first is – Where do we get source data for the reconciliation?
Load source table for reconciliation
This method is the most basic approach and is particularly useful for CDC-based ETL patterns for lager-wise ODS brewing. It involves integrating the ODS based on CDC events at the physical table level in the source. This approach ensures that the source has the same structure as the real-time source, allowing us to use the same transformation metadata to generate the reconciliation process. This approach is essential for ensuring data quality consistency. Although lager-wise ODS is more complex than ale or stout ODS, the reconciliation process is more straightforward due to the use of this method.
Load and transform source tables for the reconciliation
This approach is more complex. We perform real-time integration based on messaging initiated either from the UI or by the source of records, but we conduct the reconciliation process through a bulk transformation that is completely separate. While this approach involves duplication of the transformation logic, it provides better test efficiency by detecting errors in the original transformation. This approach enables a double-check from the logical design phase, which requires additional effort during the design and redesign phases but ultimately ensures a more thorough and reliable reconciliation process.
Use “Fake shake” of the source
Here’s a clever way to obtain reconciliation data. We use the same infrastructure to transfer data to our target, just like in real-time processing, but we apply this to all records in the source. This can be achieved by creating a fake event chain for all records or by filling the queue on the source of records side with surrogate data. The fake event chain can be used for internal transformations within layers of ODS. However, it’s important to disable early filtering to ensure the accuracy of the reconciliation process.
Data from compacted message queue topics
The majority of modern messaging systems (e.g. Kafka) provide the option of compacting topics. This means that the last state of each keyed value of the topic is retained, and can be leveraged as a source for the reconciliation process.
In addition to external reconciliation between source systems and the Real-time Operational Data Store, there may be internal glitches and inconsistencies that arise during the transformation process between layers. This can occur when the identification process fails to capture some changes due to definition errors or rare events. Internal reconciliation may be necessary to supplement the external one, especially when using methods that do not allow real-time processing chaining or during initial loads.
Primitive reconciliation patterns
Let’s revisit the earlier days of Data warehousing ETL patterns, where we extract data from the source and merge it into the target with or without deleting orphan records in the target. This approach is straightforward and efficient, provided there are no internal or external factors preventing us from using it. These patterns are particularly useful for bulk initial loads when we can tolerate downtime before system startup and need to minimize the time of initial load.
This pattern can be considered as one of the most basic approaches to data loading, even compared to other simple methods. It involves truncating the target table before inserting a new set of data from the source. This is the fastest way to load an empty system during the initial load phase. However, this approach has significant drawbacks when it comes to reconciliation loads that are not initial, as we lose valuable information about the change timestamp. Without this information, we cannot easily identify what has changed after the reconciliation process, if anything has changed at all. As a result, it becomes difficult to evaluate the effectiveness of the reconciliation process in such scenarios.
Full refresh pattern
The Full refresh pattern has a similar functional aspect to the previous pattern. It involves the following steps:
- Inserting records missing in the target
- Updating records in the target that are different from the source
- Soft deleting records in the target that are not in the source
Like the previous pattern, this approach requires a full source dataset for the deleting part.
However, unlike the previous pattern, the Full refresh pattern minimizes the volume of changes and supports audit attributes such as inserted/updated timestamps and a deleted flag for soft delete. This approach also enables real-time replication to be chained after the bulk reconciliation process.
Diff merge pattern
The Diff merge pattern shares many similarities with the Full refresh pattern, except for the deletion part. This makes it useful even when the full source data set is not available. As with the Full refresh pattern, it allows for real-time processing to be chained after the bulk operation and supports all audit attributes.
In some cases, it can be used in combination with the Full refresh pattern, where the choice of the pattern depends on the type of source extract and data extraction is planned in a cyclic pattern.
(for example, using a sequence like F – i – i – i – i – i – F – i – i – i – i – i – F…)
External and internal factors of the reconciliation process
The reconciliation process is influenced by various external and internal factors that must be taken into consideration to ensure its success. External factors include the type of data source, the content and structure of the data, and its performance limitations. For instance, a reconciliation process for a large-scale data source may require different techniques and tools than one for a smaller data source. Additionally, the content and structure of the data may vary depending on the type of source system, such as a relational database, a flat file, or a NoSQL database.
On the other hand, internal factors include the business continuity of the operational data store, the service level agreement (SLA) with the stakeholders, and the ETL logic used for the reconciliation process. The operational data store must be available and accessible during the reconciliation process to ensure the accuracy and timeliness of the data. The SLA with the stakeholders must also be taken into account, as they may require specific levels of accuracy or timeliness.
Finally, the ETL logic used for the reconciliation process must be carefully designed and tested to ensure that it can handle the external and internal factors that affect the reconciliation process. It should be able to identify and resolve discrepancies between the source and the target data and handle any errors or exceptions that may occur during the process. By considering these external and internal factors, the reconciliation process can be optimized for the specific requirements of the operational data store and the stakeholders involved.
There are several external factors that can affect the reconciliation process, which depend on the data source and its characteristics, including:
- Data source and technology: This refers to the type of data source and the technology used to access it. Different data sources may require different methods for extracting and loading data.
- Full vs. Partial vs. Incremental source dataset: The type of source dataset can significantly impact the load methods that are available.
- Full loads contain all the data in the source.
- Incremental loads only contain the changed set of data.
- Partial loads, on the other hand, contain only data for a specific time period.
- Load impact sensitivity: Even if full data is available for reconciliation, there may be performance limitations that need to be considered, such as read operations in the source, locks, read consistency, and old snapshots. Additionally, the availability of source data may be limited to a small time window outside of business hours.
- Data aging: Data in the source may age at a different pace than data in the ODS, which must be taken into consideration during the reconciliation process.
It’s important to note that each data source is unique and may require a tailored approach to the reconciliation process. For example, some sources may offer a compacted topic that stores the last state of each keyed value of the topic, which can be used as a source for reconciliation. Additionally, incremental loads may only provide new values for changed data and may not include information for deleted records, which can affect the reconciliation process.
- Our SLA is the most critical internal factor that affects our customer experience and requirements. Since real-time operational systems typically operate 24/7, we cannot always plan regular downtime for maintenance. Therefore, we need to ensure that our reconciliation process is reliable enough to minimize the need for downtime. Alternatively, we can use partitioned or smooth methods that enable us to reconcile data without disrupting consumer availability or real-time loads.
- Data aging is another internal factor that can affect our reconciliation process. Although operational data stores are designed to store current data, we sometimes end up storing historical data, which can affect our reconciliation process.
- Dependencies are crucial to consider when reconciling data in any layer of the ODS. For example, when we reconcile our mirror of source table X, we must ensure that we also reconcile all the impacted tables that depend on table X.
In summary, the internal factors that affect the reconciliation process include the SLA, data aging, and dependencies, while the external factors include the data source and technology, the type of source dataset (full, partial, or incremental), load impact sensitivity, and data aging in the source. All these factors can significantly impact the set of possible load methods and the efficiency of the reconciliation process.
Partitioned reconciliation patterns
The next set of reconciliation patterns makes use of the capability of database engines to exchange partitions. In this method, the source data is first inserted into a stage table with the same partition format as the target table, and then the content is exchanged. This technique is typically faster than primitive methods and places fewer demands on the load window when data is unavailable for the consumer.
Database engines usually allow the exchange of partitions instead of tables, which means that we need to create a partition mirror (i.e., the table that will be exchanged at the end) as a single table if the target is partitioned, or as a partitioned table if the target table is not partitioned.
PEP – Primitive Exchange Partition pattern
The Primitive Exchange Partition (PEP) pattern is a partitioned approach that serves as an alternative to the Overwrite pattern. Despite having some limitations like the inability to support audit attributes, soft delete, and real-time processing, it is more powerful because it combines the advantage of quick load into an empty structure with the ability to use previous data by consumers.
In the PEP pattern, we first create an empty structural copy of the target partition, which we then fill with data from the source using the Overwrite pattern. We then synchronize indexes and quickly exchange them with the target partition. However, we may encounter difficulties with global indexes, so the best solution is either to avoid global indexes on tables that use PEP or use one of the solutions described below.
Unlike the Overwrite pattern, PEP allows us to include a testing step to determine if reconciliation is necessary. This way, we could create a workaround that enables real-time processing after the reconciliation process.
REP – Rich Exchange Partition pattern
The Rich Exchange Partition pattern is a comprehensive solution that offers improved performance while maintaining the same functionality as the Full refresh pattern. It provides a major advantage of faster processing for large differences or new structures without affecting the state of previous data during loading. This pattern is particularly useful in scenarios where we require all the features of a Full refresh pattern without risking the integrity of the target data during the import of raw input data.
The difference is that we replace the logical merge operation by joining the source and target into the resulting dataset mentioned to be exchanged with the original target partition after. This approach fully replaces the functionality of Full refresh realized by merges. The performance gain is not as significant, especially when a minority of records is changed, but it brings other advantages like reversibility of the step and versioning of states of the target.
Technical issues and solutions
here are two technical challenges associated with partitioned patterns:
- Partition exchange can only be performed in the PARTITION-TABLE mode.
- Partition exchange invalidates global indexes, which is not acceptable for 24×7 real-time environments.
To address the first issue, we can define the PARTITION MIRROR as a single table for partitioned targets, or as a partitioned table with one partition for non-partitioned targets.
The issue of invalidating global indexes during partition exchange can also be addressed. While I personally prefer to avoid using global indexes on partitioned tables, I understand that they may be necessary in certain cases. Here’s a possible solution to address this issue during the reconciliation process:
- Create a local index with the same structure as the global one, but with two differences: it will be a local index and the first column will be defined as “desc” to avoid technical limitations of having “exactly” the same structure. This will help ensure that the local index will not be invalidated during partition exchange.
- Drop the global index.
- Synchronize the indexes for the PARTITION MIRROR.
- Exchange the partition.
- Recreate the global index.
- Drop the redundant local index.
By following these steps, we can ensure that the global index is not invalidated during the partition exchange process while also avoiding technical limitations that may arise.
Advanced reconciliation patterns
Advanced patterns build on top of the standard patterns and add extra functionality to meet specific requirements. For example, one advanced pattern could add the ability to handle real-time data changes while still maintaining data accuracy. Another pattern might add the ability to handle data reconciliation across multiple data sources.
The advantage of using advanced patterns is that they offer more flexibility and customization than standard patterns, allowing for more tailored and optimized solutions. However, they also require a deeper understanding of the underlying data and system architecture, as well as more complex implementation and maintenance.
Overall, advanced patterns can be a powerful tool in data reconciliation, allowing for more sophisticated solutions that meet specific business needs.
Isolated Full Refresh pattern
That pattern is similar to the standard Full refresh pattern, but it can be used even in situations we have no full source set of data.
The pattern is based on assumption that if we do have not the full extent of data, we also know why we do not have it. That there exists some rule or measurable information we can use.
There are different types of limitations that can affect reconciliation processes:
- Business day limitation: if data is organized in daily partitions, either physically or logically, the comparison between source and target data can only be performed for a subset of the data that matches specific parameters of the process. For instance, you may only compare transactions or events that occurred during a certain time period, as defined by the reconciliation job.
- Dynamic timestamp check: when data is updated incrementally, based on a “last changed” timestamp, you can use the oldest relevant timestamp (T0) from the source data as a reference point. All insert or update actions will be based on the incoming data, and only those records that have a timestamp newer than T0 will be deleted or marked as deleted. This allows you to keep the most up-to-date information in the target system without losing any previous changes.
The Log-wise pattern is commonly used in simulated real-time or message input scenarios, but it can also be applied using the archived log of source data changes, if available. This pattern includes a sequence of changes with information about the operation and timestamp, which may also include DELETE operations. One of the main challenges is that there may be duplicates in the data, and the timestamp sequence must be considered.
To implement this pattern, the following steps are required to be taken in order to transfer the data to the target:
- Remove duplicates by retaining only the latest record of each instance. The only exception to this is extracting the timestamp from duplicate inserts to retain the originally inserted timestamp.
- Process [IU] operations through DIFF MERGE.
- Mark target records as deleted using [D] operations.
By following these steps, the Log-wise pattern can be effectively used to transfer data from source to target while minimizing the impact of duplicates and maintaining the correct timestamp sequence.
This chapter dives into the core topic of real-time operational data stores – reconciliation, which is a crucial task for these systems. Since real-time operational data stores are available 24×7, downtime cannot be afforded for any reconciliation processes. Hence, smooth reconciliation has become an essential part of the process. However, it is important to note that smooth reconciliation does not resolve all data differences; it only addresses differences that occurred before the start of loading source data for the current reconciliation, which is determined by the timestamp T0. In simpler terms, smooth reconciliation enables the process to run smoothly without any downtime, but it only reconciles the data up to a certain point in time.
To ensure Smooth reconciliation, certain prerequisites must be met, including:
- Last Change Timestamp: Every table involved in the reconciliation process should have a last change timestamp at the record level. This timestamp should reflect the changes in the source system, but even if it reflects changes in the target system, it can still be useful. This timestamp is crucial in preventing the bulk reconciliation process from overwriting newer real-time information.
- Rigid Soft Delete: In the Rigid Soft Delete method, even significantly deleted records are imported from the source instead of being ignored. These records are then inserted with a “deleted flag” set, instead of being physically deleted. This method ensures that all data changes are captured during the reconciliation process.
- Transaction Isolation: For Smooth reconciliation, READ COMMITTED transaction isolation works well. Other isolation levels may also be feasible, but further investigation is required to determine their suitability.
To perform Smooth reconciliation of data, you need to follow these steps:
- Save the T0 timestamp – this is the timestamp before you start reading data from the source. It’s important to ensure that you’re reading the correct state of source data, including all information submitted before the T0 timestamp. In some architectures, you may need to decrease the T0 timestamp for a small security delta, but a bigger delta will reduce the efficiency of the reconciliation for more recent differences.
1a. Create a plain MIRROR TABLE in the RT ODS L0 layer and make a carbon copy of all source data into it. The mirror table should have no indexes so you can use a fast OVERWRITE direct pattern to fill it.
- Calculate a differential DELTA DATASET that registers all differences between the mirror table and the target table. For each record in the dataset, you should record the type of difference: I for data that are in the MIRROR but missing in the TARGET, U for data that are both in the DELTA and in the TARGET but different, and D for data that are missing in the MIRROR (only for full import or with additional subsetting logic) and not marked as deleted in the target. Also, filter out records where the Target Last Change Timestamp is bigger than T0. It’s important to do this step set-based since working with a huge amount of data using a row-based or bulk-based approach would slow down the solution.
- Use the DELTA DATASET to repair the TARGET TABLE. This step is tricky but enables Smooth reconciliation to perform batch reconciliation and real-time integration simultaneously. The following factors contribute to success:
- The DELTA DATASET should contain only a small number of differences since the expensive operation of comparing huge data has been done set-based in the previous step.
- Perform the operation in small transactions to avoid possible locks that could slow down the real-time process.
- Include a safe clause in each DML operation that allows you to change only records where the Last Change Timestamp is smaller than T0. This eliminates the risk of changing real-time integrated information with older batch data. It’s better to miss some recent differences than to make the data even more inconsistent. Even though you filtered out differences in the previous step, it’s still important to do the check again during the DML operation.
Chaining reconciliation process
There are two methods for chaining dependent reconciliations:
- Real-time chaining is the preferred approach for non-initial and especially for Smooth reconciliation. This method relies on signals of changes in our tables and allows real-time processing to finish the work on dependent tables.
- Workflow chaining is the best solution for initial loads, where we pre-load each table layer by layer through reconciliation workflow dependencies. This can become complicated around loop-backs, where a table of the same layer is filled from data of another table in the same level, not to mention self-loops. These anomalies must be carefully considered in these reconciliation workflows.
The term “reconciliation” is often used to refer to the process of identifying differences between source and target data and then taking action to correct those differences. However, this process involves more than just comparing data and making changes. It is essential to keep track of statistics and logs of the differences solved during reconciliation because these differences may result from failures in the data processing pipeline.
To properly log the changes, we can use timestamps to track the changes made during reconciliation. We can also store snapshots of the delta table after each reconciliation to keep a record of the changes made. Additionally, we should have an informative reconciliation process to learn about any issues with our data transformation process. This process can help us identify problems with generating event signals or deploying new versions of the transformation.
To gather statistics on the reconciliation process, we can track the number of differences in the structure (MISSING, REDUNDANT, DIFFERENT), the number of differences per column of data, and the details of the differing records. This information can help us improve our reconciliation process and ensure that our data pipeline is functioning correctly.
Statistical evaluation of reconciliation results can bring very interesting insight into the quality assurance process.
Reconciliation is a critical process for maintaining the consistency and accuracy of data in a real-time operational data store (ODS). In order to perform effective reconciliation, it is important to have certain prerequisites such as a last change timestamp, rigid soft delete, and transaction isolation. Smooth reconciliation, which is designed to work in real-time without requiring downtime, is an essential part of any ODS. By implementing smooth reconciliation in ODS, organizations can ensure that their data is consistent, accurate, and up-to-date. With the continuous growth of data and the need for real-time analysis, smooth reconciliation has become an essential tool for organizations to maintain their competitive edge in today’s fast-paced business environment.
OtherS in the series
What do beer brewing and Operational Data Store data integration have in common?