Always Available Distributed Inventory Solution Using Datastax

This is intended as a follow-up to a blog last year by Matt Stump on building a scalable inventory system with Datastax Enterprise. This enhancement covers implementing temporary holds on inventory, real-time inventory transactions, and managing risk and best practices when implementing inventory management systems using DSE.

Use Case

Retail operations are increasingly integrating on-line and brick and mortar operations, including inventory management. With options such as ‘buy on-line and pick-up in store’ becoming popular, a scalable, distributed, always available, real-time inventory system is needed. Prior architectures use a queuing mechanism to handle the potential for concurrency and reduce the need for PAXOS lightweight transactions (LWT). Here we use a simple write for each transaction as well as the concept of a ‘low water mark’ to judiciously control the use of lightweight transactions. We will also present an option allowing lightweight transactions to be avoided altogether. We also introduce the concept of ‘inventory event compaction’ and provide some methods for providing the inventory compaction function.

Schema

CREATE TABLE inventory.inventory_by_sku_location (
sku text,
location text,
event_type text,
event_timeuuid timeuuid,
event_delta int,
description text STATIC,
last_compacted timestamp STATIC,
compactor text STATIC,
max_permitted_deltas int STATIC,
lot text STATIC,
low_water int STATIC,
on_hand int STATIC,
PRIMARY KEY (( sku, location ), event_type, event_timeuuid)
) WITH CLUSTERING ORDER BY ( event_type ASC, event_timeuuid DESC );

CREATE TABLE inventory.inventory_by_sku (
sku text,
location text,
description text,
lot text STATIC,
on_hand int,
PRIMARY KEY (( sku ), location)
) WITH CLUSTERING ORDER BY ( location ASC);   
 
CREATE TABLE inventory.inventory_by_lot (
lot text,
sku text,
location text,
lot_description text,
on_hand int,
PRIMARY KEY (( lot ), sku, location)
) WITH CLUSTERING ORDER BY ( sku ASC, location ASC);

The schema is partitioned by a combination of sku and location. This is to prevent the potential of hot spotting a partition when a single sku becomes very popular. This way the transactions are spread across multiple store or inventory locations. This becomes the transactional table for inventory and functions a bit like a ledger.

The schema uses static columns for data that is to be kept at the sku/location level. This can include supplier information, re-order levels, last delivery quantities and dates, etc. using the table to keep this non-volatile data in this location does not negatively impact performance of the inventory transaction system and allows for easy queries from a single table for most inventory data, as well as a place to store information useful in tuning compaction and business processes.

A on_hand integer, low water mark integer, max permitted delta compactor, and a last compacted datetime is kept as static columns in the inventory table as well. These values will be used to manage when lightweight transactions (LWT) will be used as well as what inventory compacter will be used and how often inventory compaction will happen.

In this example a timeuuid is used to uniquely identify the delta inventory transactions for the sku/location, this could just as easily be a uuid or other unique identifier provided by another system such as an order system.

It would also be possible to perform these functions using collections to manage inventory deltas; however collections tend to widen the partition rather than grow the partition in depth, presenting some CQL limitations.

We are also defining additional tables allowing us to query for inventory levels based on sku and based on lot (collection of sku’s). These tables have the potential to make for large partitions, so we will only keep limited data in these table and use them to quickly get inventory totals. Below we will discuss options for keeping these tables inventory levels up-to-date.

Adding Some Data

The following batch will add a few sku items and store locations for us to use.

BEGIN BATCH
INSERT INTO inventory.inventory_by_sku_location 
(sku, location,low_water,on_hand,description,lot) 
VALUES ('100123-424','13',3,27,'blue widget','13-678868');
INSERT INTO inventory.inventory_by_lot 
(sku, location,on_hand,lot_description,lot) 
VALUES ('100123-424','13',27,'widget','13-678868');
INSERT INTO inventory.inventory_by_sku
(sku, location,on_hand,description,lot) 
VALUES ('100123-424','13',27,'blue widget','13-678868');
INSERT INTO inventory.inventory_by_sku_location 
(sku, location,low_water,on_hand,description,lot) 
VALUES ('100123-423','13',3,18,'red widget','13-678868');
INSERT INTO inventory.inventory_by_lot 
(sku, location,on_hand,lot_description,lot) 
VALUES ('100123-423','13',18,'widget','13-678868');
INSERT INTO inventory.inventory_by_sku
(sku, location,on_hand,description,lot) 
VALUES ('100123-423','13',18,'red widget','13-678868');
INSERT INTO inventory.inventory_by_sku_location 
(sku, location,low_water,on_hand,description,lot) 
VALUES ('100123-422','13',3,12,'green widget','13-678868');
INSERT INTO inventory.inventory_by_lot 
(sku, location,on_hand,lot_description,lot) 
VALUES ('100123-422','13',12,'widget','13-678868');
INSERT INTO inventory.inventory_by_sku 
(sku, location,on_hand,description,lot) 
VALUES ('100123-422','13',12,'green widget','13-678868');
APPLY BATCH;

Consuming Inventory

Consuming inventory with the system is performed with a simple write. When the write is acknowledged the inventorychange has been made effective. The statement below may be used to consume inventory of a sku at a location.

INSERT INTO inventory.inventory_by_sku_location 
          (sku,location,event_timeuuid,event_type,event_delta) 
   VALUES ('100123-424','13',now(),'SOLD',-1);

Temporary holds may be placed in the partition and labeled under the ‘hold’ type and then given a TTL. That way the hold will automatically expire if it is not converted. The statement below will generate a simple hold on inventory for 10 minutes. Of course, if you wanted to track this hold and convert it to a sale, you would provide a uuid rather than generate one.

INSERT INTO inventory.inventory_by_sku_location 
          (sku,location,event_timeuuid,event_type,event_delta) 
   VALUES ('100123-424','13',now(),'HOLD',-1) USING TTL 900;

To convert a hold to a sale, a batch like the following may be used. This will generate a tombstone as it converts the Hold row to a Sale row.

BEGIN BATCH
DELETE FROM inventory.inventory_by_sku_location 
WHERE sku = '100123-424' 
AND location = '13' 
AND event_type = 'HOLD' 
AND event_timeuuid = d0107e45-987b-11e6-acd4-93fbc8003c08;
INSERT INTO inventory.inventory_by_sku_location 
          (sku,location,event_timeuuid,event_type,event_delta) 
   VALUES ('100123-424','13',d0107e45-987b-11e6-acd4-93fbc8003c08,'SOLD',-1)
APPLY BATCH;

 

Checking Inventory Levels

This inventory methodology allows for several different inventory check types. The simplest is the inventory on_hand integer stored at the partition level as a static. However, we know that this number will not always be up-to-date with the most recent transactions, so some additional information is needed to get an accurate number.

Additionally, hold items present a complication because these items are technically a part of inventory but reserved. Potentially, other categories of inventory like hold items may be conceived of and managed using this methodology.

To get a current total of items available for sale right now, the following queries could be used:

By sku, location

This uses the aggregate function available in DSE 5.0 and greater to sum up the delta transactions and is still a single partition read for the available inventory total for a particular sku/location. This is a real-time operation, and a fairly efficient read to gather current on-hand totals from the ledger.

select on_hand, sum(event_delta) as used from inventory.inventory_by_sku_location
WHERE sku='100123-424' AND location = '13' ;

In this model, we are primarily tracking inventory by sku/location to prevent large and hot partitions in the ledger. So getting the various types of inventory totals based on sku/location is quite simple and can be accomplished using single partitions reads.

By lot, or lot/sku

To query based on lot or sku is also accomplished using an aggregate function on a single partition read. However, there is a potential for these totals to be slightly delayed using this methodology. The delay will depend on the inventory compaction methodology used and how the compaction queues are managed.

select sum(on_hand) AS on_hand from inventory.inventory_by_lot WHERE lot='13-678868';

By sku

select sku, description, sum(on_hand) AS on_hand from inventory.inventory_by_sku WHERE sku='100123-423';

Consuming Inventory below the ‘low water mark’

Since our normal method of consuming inventory does not take into account the current availability, some logic must be implemented at the application or service level to enforce availability constraints to the current inventory available, or to handle any backorder process. We can gather all the information we need to determine how we want to process the inventory transaction as we select the current inventory levels for a sku/location. A variety of options are available which might include:

Simple inventory use with no validation

A simple insert of use will deduct inventory from a sku/location immediately. If there are plenty on hand and the sales frequency is not too high, this may be the most appropriate and efficient method.

Simple Inventory use with Validation

A delta insert into the sku/location may be used, followed up by an inventory query for the sku/location that ensures that the transaction did not result in a negative inventory level. If the inventory on-hand is negative, then the transaction is removed by deleting the transaction row and the inventory transaction has failed validation. This is an inexpensive method and may be appropriate for most situations. Very high demand and low availability items may require additional logic at the application level to properly use low levels of inventory.

Inventory use with a lightweight transaction

Using an ‘IF’ in CQL based on the inventory level being what is expected before performing the transaction. Lightweight transactions are expensive and should generally be avoided. This is the intent of the low_water setting kept as a static column with the sku/location. If there are fewer that the low water mark of items available, then perform a lightweight transaction, otherwise just use the inventory trusting that there is enough. This can even be combined with a post transaction inventory check to ensure that the available level did not go negative.

Triggering and Managing Inventory Compactions

The easiest way to trigger inventory compaction is on an inventory event. Rules may be applied such as if the on_hand minus the sum of the inventory transaction deltas is less than the low_water, then trigger compaction. Or if it has been more than an hour since the last compaction, then trigger inventory compaction. Another option would be to use the count of the number of delta transactions as the trigger point for inventory compaction.

Compaction processes can also follow rules as it runs. For example, there may be a reason not to allow a row to compact away. Perhaps there is a period of time where the sale can be rolled back.

The purpose of inventory compaction is to coalesce all of the delta transactions possible into a single on_hand number for the sku/location. This can be accomplished a variety of ways once triggered, and may be customized to take into account a wide variety of use case specifics. The most important consideration is the potential for concurrent transactions. This may be handled by either using lightweight transactions, or by ensuring that the partition’s inventory deltas are compacted by a single agent serially. We will look at both options here.

The easiest compaction method is to have the application hosts trigger a compaction method executed on the application host itself in real time. In this method, an inventory event triggers inventory compaction which is then executed immediately by the application logic. The downside to this methodology is that it has a potential for multiple application nodes to trigger simultaneous inventory compaction events, especially on active selling partitions. This will require the useof (LWT) in the commit batch of the compaction, as well as logic to handle contention and re-try attempts. Additionally, this will place a somewhat unpredictable load on the application servers, making them more challenging to manage and scale. Since the application servers can access any partition at any time, there is little that can e done in this methodology to prioritize compaction of sku or location, and it is also challenging to dial back undesired inventory delta compaction in real time.

Another option is to add the sku/location to a compaction queue when compaction is triggered by an application host. The queue can either be in a Cassandra table or managed by another technology such as Kafka. A number of queues may be used, each serviced by a specific compactor. The number will depend on the compaction desired as well as inventoryplatform utilization. This allows the compaction function to be separated from the real-time inventory transaction functions, and to be managed more intentionally. It also allows partitions to be compacted serially by a single actor, thus eliminating the need for lightweight transactions in the compaction process. A hash algorithm may be used based on the sku/location to determine the preferred compactor for a sku/location. By storing a specific compactor in the sku/location partition as a static, the default compactor hash may be over-ridden for specific active partitions you want to handle in a specific or customized way. This will also prevent hot partitions from potentially clogging up the standard compaction queues.

Dedicated compactors can be code run connecting to the DSE cluster such as a java application, or could be something like Spark. Spark is a very attractive option as it is included with DSE and provides efficient processing as well as data locality advantages. With spark streaming jobs sharing a context across the datacenter nodes can process inbound queues in near-real time allowing for a fast inventory compaction option.

Considerations

This design will produce tombstones in DSE on compaction events. This must be properly managed using appropriate compaction strategies in table definitions. The number of SSTables per read should be monitored and managed to ensure low read latencies. Using DSE for inventory delta compaction queues can also generate tombstones and should be managed appropriately. For smaller queues, DSE in memory only compaction option is a reasonable option to prevent the inventory delta compaction queues from impacting system performance. For larger queues as well as to manage back-pressure,  kafka topics are a good option for inventory compaction queues.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *