Bulk Update of data in CockroachDB

 Working with customers always provides a large number of topics to discuss; however, in recent weeks the same type of scenario has surfaced a number of times.  "I have a very large table and I want to update a large portion of these records.  I wrote an UPDATE sql statement but it never finishes."


So how does this situation come about?  There are three main scenarios that have repeatedly led to the need to bulk update data within CockroachDB.

  • Data from a legacy datastore has been imported into CockroachDB without any sanitization.  Ideally, some form of ETL workflow has been utilized and only clean data is being stored in CockroachDB.

  • An application inserts data over a period of time that also hasn't been properly sanitized.

  • A business use case arises that entails updating data in place.  

Now, once the malformed data is in CockroachDB, something needs to be done to fix the data.


Let's take an example table like the one below.


  table_name |                                                        create_statement

-------------+---------------------------------------------------------------------------------------------------------------------------------

  rides      | CREATE TABLE public.rides (

             |     id UUID NOT NULL,

             |     city VARCHAR NOT NULL,

             |     vehicle_city VARCHAR NULL,

             |     rider_id UUID NULL,

             |     vehicle_id UUID NULL,

             |     start_address VARCHAR NULL,

             |     end_address VARCHAR NULL,

             |     start_time TIMESTAMP NULL,

             |     end_time TIMESTAMP NULL,

             |     revenue DECIMAL(10,2) NULL,

             |     CONSTRAINT rides_pkey PRIMARY KEY (city ASC, id ASC),

             |     CONSTRAINT rides_city_rider_id_fkey FOREIGN KEY (city, rider_id) REFERENCES public.users(city, id),

             |     CONSTRAINT rides_vehicle_city_vehicle_id_fkey FOREIGN KEY (vehicle_city, vehicle_id) REFERENCES public.vehicles(city, id),

             |     INDEX rides_auto_index_fk_city_ref_users (city ASC, rider_id ASC),

             |     INDEX rides_auto_index_fk_vehicle_city_ref_vehicles (vehicle_city ASC, vehicle_id ASC),

             |     CONSTRAINT check_vehicle_city_city CHECK (vehicle_city = city) NOT VALID

             | )


We have a primary key made up of a UUID and a number of data fields.  In this situation we have 450M rows in the table, and we know that there are approximately 150M rows where we want to update the revenue value.


An initial thought would be to do a one liner, like this one:


SQL> update rides set revenue=revenue*.9 where true;


This will update every row in this table.  But two thirds of those records don't need to be updated.  To be a little more specific, we can change it to this:


SQL> update rides set revenue=revenue*.9 where discounted != true AND extract('month', start_time) = 12 AND extract('day', start_time) > 23 WHERE true;


This query will only update the rows in the table that are in December, after the 23rd day of the month, and where the row hasn't previously been discounted.  In our example, this would be one third of the 450M rows instead of all of them and that results in much less work being done.


But with a large table such as this, trying to update many millions of rows at once will cause contention and a large number of write intents leading to very high cpu utilization.  This is because CockroachDB operates at a serializable isolation level.  Serializable isolation prevents you from getting any form of inconsistent data in that either the whole statement succeeds or the whole statement fails. If another write operation tries to modify the same data during our transaction, one of them will fail.  When an expensive query, such as our update above, runs it will most definitely take some non-trivial amount of time to complete.  This long wall clock time increases the potential for contention with other queries that are occurring during that span.  Once contention does occur, our UPDATE statement will be implicitly retried by the DB engine until it succeeds.  If we have a lot of other activity going on with this table, we may never be able to complete a full sweep through the table to update the appropriate records.  


If we then consider this effort to be a bulk operation, our best chance of updating millions of rows is to make the UPDATE statement take less time.  A way to do that is to slice up the rows in the key-space that we want to update and do them in smaller increments.  As the old saying goes, "How do you eat an elephant?  One bite at a time."  Our update statements above are trying to eat the whole elephant in one gulp.  Let's apply some additional logic to take multiple bites out of this elephant.  Below is some python code which we'll go through in detail.


#!/usr/bin/env python3


import psycopg2

import os

import time


def main():

    conn = psycopg2.connect(os.environ.get('DB_URI'))

    lastid = None


    while True:

        with conn:

            with conn.cursor() as cur:

                cur.execute("SET TRANSACTION AS OF SYSTEM TIME '-5s'")

                if lastid:

                    cur.execute("SELECT id FROM rides WHERE id > %s AND discounted != true AND extract('month', start_time) = 12 AND extract('day', start_time) > 23 ORDER BY id LIMIT 10000", (lastid,))

                else:

                    cur.execute("SELECT id FROM rides WHERE discounted != true AND extract('month', start_time) = 12 AND extract('day', start_time) > 23 ORDER BY id LIMIT 10000")

                pkvals = list(cur)

        if not pkvals:

            return

        while pkvals:

            batch = pkvals[:2000]

            pkvals = pkvals[2000:]

            with conn:

                with conn.cursor() as cur:

                    cur.execute("UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", (batch,))

                    print(cur.statusmessage)

                    if not pkvals:

                        lastid = cur.fetchone()[0]

        del batch

        del pkvals

        time.sleep(5)


    conn.close()

if __name__ == '__main__':

    main()


We'll ignore the import information and begin by looking at the main function.


    conn = psycopg2.connect(os.environ.get('DB_URI'))

    lastid = None


The first line here sets up our connection to our database.  The second line defines lastid to be None.  We will need to keep track of the last record in our table that we have inspected and on our first loop through we won't have any previously considered ids.  This is because for each loop through the table, looking for records to update, we don't want to inspect millions of rows that we have already considered and potentially updated.


while True:

        with conn:

            with conn.cursor() as cur:

                cur.execute("SET TRANSACTION AS OF SYSTEM TIME '-5s'")

                if lastid:

                    cur.execute("SELECT id FROM rides WHERE id > %s AND discounted != true AND extract('month', start_time) = 12 AND extract('day', start_time) > 23 ORDER BY id LIMIT 10000", (lastid,))

                else:

                    cur.execute("SELECT id FROM rides WHERE discounted != true AND extract('month', start_time) = 12 AND extract('day', start_time) > 23 ORDER BY id LIMIT 10000")

                pkvals = list(cur)

        if not pkvals:

            return


This chunk of the code starts a loop, executing a query to extract the primary keys from the rides table.  Our query will start from the value in our lastid variable and limits the output to 10k rows.  It is important to note that the query is ordered by our primary key.  We are doing this ordering to make sure we walk sequentially through the key-space based on our primary key.  Since CockroachDB stores data in an underlying key-value store where our id is the key, this allows us to efficiently and sequentially walk through the key-space for our table.


There are two sql statements in this code block.  Both of them are preceded by cur.execute("SET TRANSACTION AS OF SYSTEM TIME '-5s'").  This says to execute the subsequent select as of five seconds in the past.  This allows us to avoid any contention with write operations that are occurring at the same time, thus we avoid any possible contention during our inspection of rows to be updated.  The first select query is executed when our condition is satisfied (if it is not our first loop through and a lastid exists).  The second select query is executed when lastid is None.  After executing either, we create a list of all the returned ids called pkvals.  This list now contains up to 10k ids for the rows that match our criteria for what needs to be updated.  In this case, we want rows where the start_time field was in December, the day of month was greater than 23, and where this row wasn't previously discounted in some manner.  Finally, if our list of pkvals contains nothing, then we have finished walking through the key-space for this table and there is nothing further to update… and we break out of this loop.


        while pkvals:

            batch = pkvals[:2000]

            pkvals = pkvals[2000:]

            with conn:

                with conn.cursor() as cur:

                    cur.execute("UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", (batch,))

                    print(cur.statusmessage)

                    if not pkvals:

                        lastid = cur.fetchone()[0]

        del batch

        del pkvals


Once we do have some ids in the pkvals list, we begin to walk through the list in order to update these records.  We define the batch list to be the first 2000 (0-1999) entries from pkvals.  Then we remove those first 2000 entries from pkvals and redefine the list to be the content of pkvals starting from entry number 2000 to the end of the list.


We then execute an update that marks a record as being discounted, and sets the field revenue to be 90% of its previous value.  We do this for any rows identified by our ids in the batch list variable.  We also use a RETURNING function to tell us the last id we updated.  If we've exhausted all of the ids in pkvals, we keep track of the last id we updated and use that as a starting point for the SELECT on our next loop through more data.


Putting this all together, we have now taken a multi-million row table and sliced it up into 10k chunks for consideration.  Once we identify rows to be updated, we slice up that 10k chunk and update 2k rows at a time.  Our multi-million row elephant is now being eaten in small, manageable bites.  Each update is much much quicker working on 2k rows at a time, instead of millions, massively shrinking the wall clock time that a query will run and thus reducing contention.


If in our workload we find that the table is so busy that updating 2k rows at once is still undergoing an unacceptable number of retries, we can shrink the update loops to operate upon 1k rows, or 500, or 100.


Multi-threading?


Our example code above runs as a single thread.  If it is operating upon a 450M row table, this will take a considerable amount of time overall to work through all these loops.  A way to speed that up would be to employ multi-threading.  A simplistic approach would be to create 36 different threads, each working on ids that begin with a unique character (UUIDs use the characters a-z and 0-9).  Our first thread would operate only on the ids beginning with 'a', the 26th on ids beginning with 'z', and the 36th on ids starting with '9'.  Our SELECT query to gather up 10k rows to be updated would have an additional piece of logic added to the where clause that would refine the selection to ids that begin with 'a' or whichever starting character applies to our individual thread.


Comments

Popular posts from this blog

CockroachDB Backups, Exports, and Archives

Data Center Failures and CRDB Replication