Operational Data Store: A Perfect Blend of Data and Brewing Science – Chapter 6

We can further explore the optimization and scalability aspects of our process. Initially, when I began working on real-time processing, I believed that only row-based operational changes could be addressed, and that bulk operations like EOD or EOM processing would require another solution. However, due to the urgency of real-time issues, we postponed finding a solution for the future. In hindsight, we realized that there is no mechanism that enables us to seamlessly switch from real-time processing to bulk processing and then back to real-time processing again. Not only do we face synchronization and process management issues, but some bulk operations are not well-planned in the source system. Therefore, we cannot tell users that we don’t have up-to-date data because the system is currently handling a bulk operation that we can’t handle in real time. We need to ensure that our real-time solution is also equipped to handle bulk operations. I hope this clarifies our approach.

LAGER-WISE ODS AND ELASTICITY

During the Lager brewing process, we utilize transformations that are similar to those used in bulk operations. However, in most cases, these transformations are executed on only a few rows, so they should be optimized differently than bulk operations. Additionally, we run similar queries with different leading data sets, which impacts the necessary plans. However, we also need to handle source bulk operations, which require the optimizer to work similarly to bulk processing.

When teaching optimization of SQL queries, I categorize tasks as either row-based, bulk, or hybrid. The task of the Lager-wise Real-time ODS is a hybrid one, and it is the most complicated due to the volatility of the requirements.

Several factors must be considered, including:

  • Volatility: Every execution depends on the volume of queued data (hops, IDs), and there may be various requirements for execution plans for each of them.
  • Quantity: There are thousands of queries running every second, and the solution should be stable and scalable. Any incorrect plan could disrupt the process. Therefore, the solution should be well-monitored, and discrepancies must be tracked and solved immediately.
  • Complexity: Every transformation query is molded and used with different predicates, requiring different plans. In the row-based execution, we must support many more indexes than in classical OLTP solutions to support them. We must also ensure that the predicate is always pushed to the detail levels of the execution, which can be tricky in cases of analytical functions or aggregations.
  • Concurrency: We cannot lock any queue during our execution because we need to be fluent. We must count on new items appearing in every queue during our execution of the previous batch.

    Volatility and bulks

    As previously mentioned, bulk data processing is an inevitable part of our data processing workflow. These bulk operations can fall under the following categories:

    1. Periodical business bulk operations in source systems: Such operations are often triggered by time period closures, such as End of Day, End of Month, or End of Year. Although these actions are typically well-planned, they may require additional communication between the source system and Real-time ODS process management to temporarily stop and restart real-time processing. Therefore, it is preferable to make Real-time ETL processing capable of handling bulks organically without requiring external process management actions.
    2. Periodical technical bulk operations in source systems: These actions usually involve backup and clean-up tasks that result in data changes. Although they may not have a significant impact on data integration, it is still necessary to consider their occurrence.
    3. Unintended small bulks (bubbles) caused by data processing complexity: Due to the complexity of data processing, small regular data changes can cause significant changes in dependent tables, particularly when there are modifications to lists of values or master data.
    4. Reconciliations and repairs of source systems after their failures or downtime: While these events cannot be predicted and are not considered part of standard processing, they do occur. Therefore, we need to build our Real-time Operational data store to handle such situations.
    5. Reconciliation and repairs after our system failure or downtime: This category is similar to the previous one, as failures, breaches, and other issues can occur in real-life scenarios.

    Possible solutions

    TYPE A: – Very efficient row-based processing

    When we talk about row-based processing, it doesn’t necessarily mean that we process one row at a time. Even if we optimize our system by reducing hard parses and caching everything, it is still more efficient to work in micro-batches, rather than processing one row at a time. Row-based processing refers to working with small amounts of data, either row by row or in micro-batches, using indexes and pushing predicates extensively.

    The advantage of row-based processing is that it has almost linear characteristics between load and time, theoretically starting at zero-zero. Therefore, it is a good idea to rely on this type of processing and let the ETL processes work efficiently.

    Row-based efficiency

    The issue with row-based processing is not its inefficiency with large loads. In fact, it is just as efficient with small and large amounts of records. The problem lies in the fact that it is equally efficient across different sizes of loads. This becomes an issue when dealing with large bulks of data that need to be processed quickly and efficiently. The issue with row-based processing is not that it is slow, but rather that it is not optimized for processing large bulks of data. These bulks need to be processed much faster and more efficiently than individual rows. The following image illustrates the problem. The dissolving phase of row-based processing takes a long time, and while it is proportional to the size of the bulk, it is still too slow for larger bulks of data. While we may be able to tolerate higher latency during periods such as the close of business hours, we want our system to work properly and efficiently during normal operating hours.

    Bulk processing with row-by-row processing

    Both row-based and bulk-based processing have almost linear characteristics related to the number of processed rows at a time. Row-based processing is much more efficient for small workloads because it processes each row individually or in micro-batches, making use of indexes and pushing predicates extensively. On the other hand, bulk-based processing is much more efficient with higher workloads and is less sensitive to the number of rows at a time. Bulk-based processing involves processing large sets of data as a whole, resulting in fewer operations being performed on the database. The point where the curves cross (the number of rows per time) is known as the small-large boundary. At this point, row-based processing becomes less efficient, and bulk-based processing becomes more efficient.

    The graph below illustrates the comparison between the efficiency of row-based processing and bulk processing. As shown, bulk processing is significantly less efficient than row-based processing for small numbers of processed records. This difference is so significant that it’s easy to identify and alert for wrong plans, missing indexes, and not pushed predicates. Consequently, fine-grained row-based processing is necessary for 90-99% of the time. However, bulk processing, which uses full table scans and hash joins, is much more proficient for large amounts of data. Therefore, the challenge is to provide both row-based and set-based processing for each statement based on the volume of incoming data.

    Small-large boundary

    TYPE B: – Small – Large approach

    A small-large approach is a processing strategy that dynamically switches between row-based and bulk-based processing based on the workload. This approach is designed to optimize performance by using row-based processing for small workloads and bulk-based processing for larger workloads. When the workload is small, row-based processing is more efficient due to its almost linear characteristics with the number of processed rows at a time. However, as the workload increases, the efficiency of row-based processing decreases, and bulk-based processing becomes more efficient due to its ability to handle large bulks of data more quickly and efficiently. The small-large boundary, which is the point where the curves of row-based and bulk-based processing intersect, is used to determine when to switch between the two processing methods. By using a small-large approach, the system can achieve optimal performance and efficiency for any workload.

    Small-large dynamic switching

    The graph above provides a clear illustration of why we utilize the SMALL-LARGE switch in our processing approach.

    The synergy of the identification process

    One major advantage of the Hops and IDs processing is the ability to know the number of rows in each queue. However, the challenge lies in determining how to make queries work based on the amount of incoming data. Although the optimizer of the DB engine can do this work based on statistics, it is not a practical solution to recalculate statistics before each run of an atomic statement, as the size of queues can be volatile. Additionally, dynamic sampling causes overhead in the database core. The optimizer remembers the last resolution and does not do a hard parse before each atomic transaction. Thus, running the same statement several times in a row automatically sticks to the first plan based on the first queue size.

    To overcome these challenges, running each statement in two modes, the SMALL, and the LARGE is recommended. This can be achieved by:

    • Using two distinguished statements and switching between them based on the size of the leading queue.
    • Alternatively, setting statistics on the queue at the moment of switching between SMALL and LARGE and vice versa is also feasible.
    • Another option is to use two queues, one for SMALL and one for LARGE, which may be combined with different generated codes for small and large. Although this solution is more complicated, it helps to dissolve bulks at the same time as new changes are processed.
    How to determine the SMALL-LARGE boundary?

    There are varying opinions on the optimal size for the number of rows to be processed at a time. Some sources suggest it’s 42, but in my experience, the default value could be set to 40,000-100,000 records on an Oracle server. However, it’s always good to allow designers to change the value for specific statements if needed, using pattern directives or hints.

    The graph below illustrates different approaches to working with the learned queue size and the setup of statistics, which can result in both feasible and not feasible scenarios.

    Fiddling with optimizer

    Explanation:

    There are four different approaches for determining the processing method based on the amount of incoming data.

    • The “row by row” approach is the laziest option and processes everything row by row. While it is not the most efficient method, it requires no switching between small and large modes, and the most frequent small phase operates efficiently when it does not dissolve bulks of previous operations.
    • The “realistic statistics” approach relies on the optimizer to determine the best plan based on statistics. However, it increases the number of hard parses in the most frequent zone of small operations, causing significant performance issues as the system grows.
    • The “white lie” approach is the best option and involves classifying the SMALL-LARGE decision as binary, counting on two plans, and two ways. This method involves deciding on two different statements for small and large executions, resulting in a small overhead while generating, but with more possibilities for hard-core tuning. The advantage is having more control over the execution process, while the disadvantage is the impact of wrongly set SMALL-LARGE limits.
    • Finally, the “partial white lie” approach is a compromise between the white lie and realistic statistics approaches. It combines the advantages of the trusted optimizer and reduces the number of hard parses in the most frequent small zone. The advantage of this method is less sensitive to a SMALL-LARGE limit set too low, while the disadvantage is the possibility of not perfect work of the optimizer in the volatile zone.

    Advanced white lie

    The concept of a “white lie” is based on the assumption that the optimizer may sometimes fail at boundaries. To mitigate this risk, we can bypass the optimizer’s decision-making process and force a binary option between ROW or BULK processing. However, the question arises as to why we choose a limit of 100,000 records – is this a common limit for each transformation? Is it too low or too high? (The “Partial white lie” method can address this issue by setting a lower limit and letting the optimizer work the rest.)

    There are two options to consider:

    1. Allow for the possibility of setting an overriding value for each transformation as a pattern directive or hint, and manually tune the limit when necessary.
    2. Let the system self-tune the limit based on statistics from previous runs.

    The first option is a good standard approach that costs almost nothing and can be helpful in any case. The second option is a bit more complex, but it has the potential to optimize performance around the SMALL-LARGE boundary.

    First

    To start with, we need to establish a dynamic range or threshold around the SMALL-LARGE boundary. This range should be small enough so that it doesn’t affect the efficiency of the SMALL zone, but large enough to allow for the gathering of a significant number of executions within this range. As an example, we can set the range to be around 15% of the SMALL-LARGE limit.

    Second

    Once sufficient statistics have been gathered, the SMALL-LARGE boundary should be re-evaluated by adjusting it either up or down within the volatile area. It is important to gather a significant sample of results and account for potential fluctuations in available system resources. Once a substantial sample has been collected, a Linear regression analysis can be performed on both sets of data.

    Third

    After a sufficient number of samples have been collected, including some level of system resources volatility, the limit between the SMALL and LARGE zones can be adjusted either higher or lower along with the volatile area. To determine the best adjustment, a Linear regression can be calculated for both sets of data.

    Forth

    Once the linear regression is computed, the intersection point can be determined, which will dictate the new SMALL-LARGE limit. This will also cause the volatile area to shift accordingly. It is important to note that previous statistics should not be deleted as they are still valid even after the limit is adjusted. The only reason to delete statistics would be if there is a significant change in the transformation logic or patterns used.

    This approach to tuning is based on our intimate knowledge of the specific operations and patterns used in our system, rather than relying solely on the capabilities of the optimizer. While the optimizer has more advanced resources, we as the creators of the system have a deeper understanding of the nuances and intricacies of our operations. This is why a more pessimistic approach to tuning can often be successful, as it allows us to proactively address potential issues and fine-tune the system to better suit our needs.

    They are plenty

    Many systems have performance limitations that are lower than their functional and documented limits, which has caused issues in our solutions. We often choose to use technology features to avoid reinventing the wheel, and while they may work well during proof of concept and even simulated performance tests on a small subset of data, they fail when scaling up to handle the full extent of our solution. This is especially problematic when adapting technologies designed for intensive usage on small amounts of data to handle large amounts of data with numerous transformation tasks.

    Several factors must be considered when addressing these issues:

    • The number of transformation modules and mappings (T),
    • the number of event processing-enabled source tables involved in transformations (S),
    • the number of permanent listening processes (L),
    • and the number of threads allowed to listen and run transformations (R).

    Plain spread

    The plain spread method is the simplest and most parallel variant of a solution, although it has limitations. It involves assigning one listener and one thread to each process, whether related to processing hops or transformations themselves. This approach grants equal priority to each process and ensures that even the smallest transformation task is assigned a permanently listening thread. The plain spread method is straightforward and eliminates the need to make decisions about the importance of specific transformations or tables.

    R = L
    L = S + T

    Thread groups

    The thread groups method involves organizing transformations and source processes into groups of threads. The size and arrangement of these groups can be based on the priority and Service Level Agreement (SLA) of particular transformations. All signal processing is then aggregated to a smaller number of groups. This approach is more complex than the plain spread method, but it allows for better performance scaling, the ability to diversify preferences, and the suppression of unimportant parts of the solution.

    R = L
    L < S + T

    Dispatchers 

    In some cases, it may be beneficial to separate the listening and processing stages of a solution. This approach can bring benefits such as improved performance and scalability. However, there is also an overhead associated with starting new processes if they are not organized in pools.

    Separating listening and processing involves setting up a dedicated listener to receive incoming requests or data. This listener then passes the data on to a pool of processing threads that are responsible for carrying out the necessary transformations. By separating these two stages, it is possible to increase efficiency and improve the ability to handle high volumes of requests or data.

    However, setting up separate listening and processing stages can also be complex, and requires careful consideration of factors such as thread management and resource allocation. It is important to ensure that processes are efficiently organized and managed, to avoid unnecessary overheads and delays. By doing so, it is possible to achieve a high level of performance and scalability while also maintaining efficient resource utilization.

    They are multifaceted

    Another difference between Lager-wise solutions is that a single transformation may appear in multiple facets. This means that the same transformation query is used in different ways, with different drivers (leading tables) and plans. As a result, more extensive indexing is required, as every transformation in each facet must be supported for row-level access by relevant indexes.

    To achieve this, indexing at the start should be bi-directional. This means that when one column is indexed to realize a join in one direction, the column on the other side must also be indexed. In many cases, this involves indexing primary keys and foreign keys, but not all queries follow standard references.

    To illustrate this, consider the following picture which shows access paths for the facets of a single transformation (the same one used in the previous article as an example). This approach allows for more efficient querying and can help to improve overall system performance.

    They are Concurrent

    In previous examples, I have demonstrated transformations working on static data-sets. However, when processing data from a queue in micro-batches, new data appears in the queue and we cannot block it. We must know what part of the queue has been processed to remove it from the queue.

    There are several options available:

    1. Standard queue – We can use proven queue solutions, but they often support many other roles, making them a heavier solution than what we need, and performance limits can easily be reached.
    2. Bookmark – Bookmarking the batch start is a good solution, but in a world of concurrent writing transactions in read-committed isolation, we cannot rely on sequence or timestamp. Other isolation levels are not viable options for this task.
    3. Flagging – We can set flags on records and decide which records will be part of the micro-batch. Unfortunately, setting flags is an expensive operation for our purposes.
    4. Statement isolation – We can use statement isolation – read-use-delete in one statement (possibly using a memory structure to assist). However, this requires a homologous environment.
    5. In-memory indexing – This is a less expensive method of flagging. It is relatively flexible and allows us to limit the maximum bulk size.

    It is essential to ensure transaction safety to maintain system consistency.

    Limitations of bulks

    The maximal bulk size is limited for several reasons, including bulk processing in the database. In previous graphs, we observed that bulk processes remain stable for large numbers of records but exhibit slow growth characteristics for significantly larger quantities.

    1. During bulk processing, the HASH JOIN method is used extensively, and although it is highly efficient, there are still limitations when dealing with extremely large numbers of records, such as the 10 million multiplication range.
    1. Limitations on the maximal size of in-memory bulk indexes should be considered due to potential overhead when processing large bulks.
    2. Additionally, long-running and excessively large bulks may cause increased rollback and repeat times during fail-over, impacting process continuity.
    3. Chained processes benefit from smaller bulk sizes, as it allows for earlier start times and reduces overall latency.

    The ideal bulk limit size varies based on the specific process requirements and can be set to several million by default, with further refinements made through pattern directives if needed.

    Although not perfect, the Lager has a smoother and more elastic taste. We invite you to try it at your local pub.

    OtherS in the series