<- Back to all posts
September 15, 2025
September 15, 2025

Implementing Firebolt MERGE Statement

September 15, 2025
September 15, 2025

Implementing Firebolt MERGE Statement

No items found.

Listen to this article

Powered by NotebookLM
Listen to this article

The MERGE statement is a powerful SQL command that allows you to simultaneously perform multiple INSERT, UPDATE, and DELETE operations on a single table. It’s a classic PostgreSQL feature that simplifies data synchronization tasks by 

  1. supporting complex conditional business logic, and 
  2. moving the entire data mutation operation into a single ACID transaction, thereby natively protecting against stale reads from frequently updated data sources 

This blog goes deep on what it takes to build a scale-out merge operation that can run over terabytes of data.

MERGE Statement Overview

As an example, if you had a large table of students that you wanted to sync after the course registration period completed, thus making recent enrollment changes visible publicly (e.g. students that transferred in, that dropped out, or that changed their planned graduation year, etc), you could use MERGE to do so. Here’s a toy example.

-- dated 990 A.D. upon school founding
CREATE TABLE hogwarts_students (
    id INT,
    full_name TEXT,
    dob DATE,
    graduation_date DATE,
    owl_exams ARRAY(TEXT)
);
-- dated 1017 A.D. alongside new privacy and ethical recruitement reforms
CREATE TABLE public_hogwarts_students (
    id INT,
    full_name TEXT,
    graduation_date DATE
);
-- running the sync from the staged source data to the public target table
MERGE INTO public_hogwarts_students AS t USING hogwarts_students AS s
ON t.id = s.id
WHEN NOT MATCHED THEN INSERT (id, full_name, graduation_date) VALUES (s.id, s.full_name, s.graduation_date)
WHEN MATCHED THEN UPDATE SET full_name = s.full_name, graduation_date = s.graduation_date
WHEN NOT MATCHED BY SOURCE THEN DELETE;

The MERGE statement works by comparing rows from a source (which can be a table, view, or subquery) with rows in a target table. Based on whether rows match or not, you can specify different actions:

  • WHEN MATCHED: This clause defines actions to take when a row in the source matches a row in the target. You can specify UPDATE or DELETE operations here.
  • WHEN NOT MATCHED (e.g. WHEN NOT MATCHED BY TARGET): This clause defines actions to take when a row in the source does not have a corresponding match in the target. It is used for INSERT operations, in order to backfill the target table with rows from the source.
  • WHEN NOT MATCHED BY SOURCE: This clause defines actions to take when a row in the target does not have a corresponding match in the source. This is used to UPDATE or DELETE rows from the target table, to clean up rows that no longer exist in the source.

Row “matching” is evaluated by an explicit JOIN condition. Traditionally, this is done over a set of primary key columns. However, any subset of columns from the source and target tables can be used, or any transformations therein.

You can find common use cases and examples of the syntax in our documentation. For the remainder of this blog, however, we proceed with the low-level details of how we implemented MERGE.

What MERGE Requires Beyond Existing DML

At a high level, MERGE needed to wrap together functionality that we already had. We already knew how to modify tablets using the logical representations of INSERT, UPDATE, and DELETE requests (physical tablet management is explained below). We already knew how to transparently modify aggregating indexes, alongside any affected table (these are the consistent materialized views that the planner uses to optimize aggregation queries: documentation). We already knew, even, how to run multiple DML operations under a single transaction, since the UPDATE command reduces to a DELETE and an INSERT op. And, quite significantly, we already had battle tested distributed executions of massive joins.

So what was missing? Mainly, adding the parsing/planning layer to translate a user’s MERGE query into our proven, scalable building blocks. While not breaking any of the existing code paths 😂. Let’s dig into that translation.

Ordering User-Provided Clauses

While these terms are personal colloquialisms, they are useful for describing nuances of the MERGE syntax that have to make it into the implementation.

Match category: A set of clauses within which order matters. E.g. the category of MATCHED clauses, or the category of NOT MATCHED clauses, or the category of NOT MATCHED BY SOURCE clauses. 

Match clause: A tuple of match category, optional conditional, and action. Several unrelated examples: 

-- Delete action with the (default) catch-all condition
WHEN MATCHED THEN DELETE 

-- Update action with a data transformation
-- Note the nontrivial condition
WHEN MATCHED AND t.cond > 5 THEN UPDATE foo = s.foo + 1

-- A different match category, with a no-op action 
-- Note that conditions don't have to be meaningful 🤔
WHEN NOT MATCHED and FALSE THEN DO NOTHING

As defined by the syntax, ordering within each category provides short-circuiting (e.g. a row matching an earlier MATCHED clause condition will be sent to that earlier action, and will not be considered for any further MATCHED clauses). So within a single category, the order of clauses that the user defined is important. 

But what if we look across different categories? Conceptually, it cannot be, for example, that a target and source row pair both MATCH and do NOT MATCH BY SOURCE. Match categories act on non-overlapping sets of target-source row pairs. Put another way, given a well-defined JOIN condition, every target-source row pair will fall into exactly one of the match categories, or into zero of them (in case the JOIN condition evaluates to false or NULL). Therefore, clauses from different categories will have no effect on each other’s row sets. This means that it is irrelevant in which order the user defines the different categories, and makes no difference whether the user intermixed clauses across categories.

An aside: If we add to these observations the thoughtfully-limited number of category/action pairings that the MERGE syntax allows, and the errors that are thrown when attempting to update the same row multiple times, we get the following lemma: The MERGE syntax guarantees that each row from the target table will be modified (updated/deleted) at most once and each row from the source will be inserted into the target at most once. It is a nice cap on the maximal number of operations.

Degenerate No-Op Scenario

Let’s discuss the base case before the inductive case: In the most basic scenario, a MERGE statement represents a DML no-op. It must pass parsing and be registered as a successfully run DML, however it adds or affects exactly zero rows. Whereas this may be discovered at runtime due to data distribution (e.g. the source and target were already in sync and there was nothing new to handle), there are certain degenerate MERGE queries that represent global no-op’s and actually require special handling by the query planner. 

These consist of all DO NOTHING clause actions. Consider this example, where the syntax is used in a valid though futile way. The user is literally not asking for any actions to be taken:

MERGE INTO target t 
USING source AS s 
ON t.a = s.a
-- single DO NOTHING branch
WHEN MATCHED THEN DO NOTHING;

So how do you translate a DML statement with no requests for action? Instead of defining a runtime primitive for a “non-action”, we chose to catch such cases at the earliest convenience and provide a well defined DML plan. For simplicity sake, it is: INSERT INTO target SELECT * from target LIMIT 0. And here’s what the planner simplifies a LIMIT 0 down to: a local read from an empty static table.

The query plan generated:

[0] [TableModify]
 \_[1] [Insert] target table: "target"
    \_[2] [Projection] c_0: NULL, c_1: NULL
       \_[3] [StaticTable] Columns: [][]

Neat! Note that distributing DO NOTHING actions across multiple categories or conditions still hits this degenerate scenario.

MERGE INTO target t 
USING source AS s 
ON t.a = s.a
-- Match category I:
WHEN NOT MATCHED BY SOURCE and s.name in (‘Harry’, ‘Ron’, ‘Hermione’) THEN DO NOTHING;
-- Match category II:
WHEN NOT MATCHED and s.color = ‘red’ THEN DO NOTHING
WHEN NOT MATCHED and s.iteration > 10 THEN DO NOTHING;

As an aside, you can also get “zero effect” queries via always-false conditionals. However, it is not the parser’s job to catch those. So the following plan will still have a DELETE node in it, but note that the planner has collapsed any real scan of the source and target tables into, once again, a local “read” from an empty static table.

The query:

MERGE INTO target t USING source s ON t.a = s.a
WHEN NOT MATCHED BY SOURCE AND 7 < 5 THEN DELETE;

The plan:

[0] [TableModify]
 \_[1] [Delete] target table: "target", tablet_row_number_column: 0, tablet_txid_column: 1, tablet_id_column: 2, source_node_id_column: 3
    \_[2] [Projection] c_0: 0, c_1: '', c_2: '', c_3: 0
       \_[3] [StaticTable] Columns: [][]

Single Node Query Plan

So what does the query plan look like for a MERGE statement with at least one interesting action? Let’s try an INSERT action, where two local tables are simply generated and filled with 10 and 20 rows, respectively.

[EXAMPLE 1] The script:

CREATE TABLE target (a int, b int);
CREATE TABLE source (a int, b int);

-- Populate 10 entries with keys 1,11,21,etc
INSERT INTO target SELECT i, i%17 FROM generate_series(1,100,10) i;

-- populate 20 entries with keys 1,6,11,16,etc
INSERT INTO source SELECT i, i%17 FROM generate_series(1,100,5) i;

-- Note that the source contains all the keys already in the target, and then some (specifically, 10 other rows)

-- Merge in new entries from the source, with an extra conditional based on the mod of the key space, and a transformation of some of the source values.
-- The condition is picked purely for demonstrative purposes. It happens to capture 6 of the 10 possible rows in the target-source row diff.

MERGE INTO target t USING source s ON t.a = s.a
WHEN NOT MATCHED AND s.b < 11 THEN
	INSERT VALUES (s.a, 100 * s.b);

The executed plan:

[0] [TableModify]
 \_[1] [Insert] target table: "target"
    \_[2] [Projection] source.a, multiply_checked_0: (100 * source.b)
       \_[3] [Filter] ((CASE WHEN (c_0 IS NULL and (source.b < 11)) THEN 0 ELSE NULL END) = 0)
          \_[4] [Projection] c_0, source.a, source.b
             \_[5] [Join] Mode: Left [(source.a = target.a)]
                \_[6] [StoredTable] Name: "source"
		   \_[7] [Projection] target.a, c_0: TRUE
                   \_[8] [StoredTable] Name: "target"

What are we looking at? Going bottom up: 

[6, 8] At the bottom are the scans of the source and target tables.

[7] Before we get to joining rows from the source and target tables, we introduce a constant boolean projection onto the target side, mimicking a mark join. Mark joins are an optimization technique that propagate, alongside every source row, the result of some condition in a temporary “mark” column. Sometimes, this row-wise evaluation, passed along to a JOIN, is actually enough to compute some subquery like EXISTS or IN, without continuing to propagate the full data set itself.

Firebolt doesn’t have native support for mark joins, but we can utilize the technique by manually inserting this ‘magic’ constant column. The point is that we are about to compute the set of source rows that do not have matches in the target table, i.e. we really want a LEFT ANTI JOIN. However our join infra will return the full set of source rows, extended by either the matching target rows or by all NULL’s. To distinguish a NULL value set because “there was no matching target row” from a NULL value set from “a target row consisting of all NULL’s”, we need an extra marker that will be TRUE alongside all target rows (even a completely NULL one), and NULL anytime there is no matching target row. 

This constant (dubbed c_0 in the plan) will be filtered against further up.

[5] A left side outer join, which will return all rows in source, matched either with a row from target or with all NULL’s.

[4] A pass-through of all the source columns required for insertion, plus the target mark

[3] A filter that encapsulates the NOT MATCHED BY TARGET clause and the additional condition that we passed. Note the use of c_0 from before, to isolate only source rows without matches. It may look odd that the CASE statement in the filter is returning 0 and NULL, rather than true and false, but given more merge clauses, it will be classifying each row as belonging to clause index 0, 1, 2, etc, up to the number of clauses. More on that in the CASE WHEN section below.

[2] The computation of the transform we requested on the data to-be-inserted.

[0,1] The top level goal: we’re modifying the table target by inserting some rows into it.

Ready to dive in deeper?

Choosing the JOIN Type

Plain WHEN MATCHED clauses require simply an INNER JOIN between the source and target tables. Target rows can be updated or deleted based on conditions on the source row values, and only row-pairs with direct matches are candidates for consideration.

In the prior example, the NOT MATCHED BY TARGET clause requests a LEFT OUTER JOIN because it’s looking to operate over source rows (and the JOIN inputs happen to be ordered as source table scan on the left, target table scan on the right)

More generally, NOT MATCHED BY SOURCE clauses will require the flipped RIGHT OUTER JOIN in order to be able to update/delete target rows.

If both the NOT MATCHED categories are requested - the JOIN type will be expanded into a FULL OUTER JOIN.

The JOIN type is dynamically chosen to the strictest one possible, given which subset of merge categories are requested.

CASE WHEN: Stacking Conditions

Let’s look at the CASE statement from that example’s filter step [3].

The MERGE syntax mandates short circuiting between different match clauses of a single category, and a CASE statement is exactly the primitive to describe that behavior. This is a PostgreSQL operator that enables describing a series of if-then-else statements. Once a row matches one of the CASE filter conditions, it will not be evaluated again. So in the previous example, we assign the value zero to rows matching the INSERT clause condition, and then filter for exactly those rows that were evaluated to zero. Not too complex. However, consider the case where there are multiple match clauses within a category, with different filters and even potentially different actions.

[EXAMPLE 2] Given this query:

MERGE INTO target t USING source s ON t.a = s.a
WHEN NOT MATCHED AND s.b % 2 == 0 THEN DO NOTHING
WHEN NOT MATCHED AND s.b < 11 THEN INSERT VALUES (s.a, 100 * s.b)
WHEN NOT MATCHED AND s.b >= 11 THEN INSERT VALUES (s.a, 1 + s.b);

We want to route row pairs to four possible outcomes (fine for rows to fall through and land in the fourth and default “ELSE do nothing” outcome. At this point, the filter step will be elevated up (to sit below each of the INSERT operators), and the CASE statement will be clear within its own projection step. Note the assignment of a unique index to each of the possible outcomes, including the first DO NOTHING group:

[Projection] 
source.a, source.b, multiIf_0: 
(CASE 
WHEN (c_0 IS NULL and ((source.b % 2) = 0)) THEN 0 
WHEN (c_0 IS NULL and (source.b < 11)) THEN 1 
WHEN (c_0 IS NULL and (source.b >= 11)) THEN 2 
ELSE NULL 
END)

Although previously mentioned as the recipe for “degenerate” merge queries, DO NOTHING clauses are typically quite useful. They are used to take out a subset of rows from consideration. Since match clauses are evaluated in order, having a DO NOTHING clause lifts a conditional from having to be repeated in every later clause.

This overall projection step shows that the result of the CASE is forwarded alongside the source data (dubbed multiIf_0 in the plan). This allows DML nodes further up in the plan to filter on precisely the subset of rows that they are meant to handle.

Loopback Shuffle

So what does the plan look like for [EXAMPLE 2]? With multiple different row subsets getting inserted (each with its own custom filter and transformation), it’s getting a bit thornier to read. I have greyed out the sections that have not been changed - namely the topmost INSERT node and the base JOIN.

Observe that we are now working with a DAG-shaped plan.

[0] [TableModify]
 \_[1] [Insert] target table: "target"
    \_[2] [Union]
       \_[3] [Projection] source.a, multiply_checked_0: (100 * source.b)
       |  \_[4] [Filter] (multiIf_0 = 1)
       |     \_[5] [Shuffle] Loopback with disjoint readers
       |        \_[6] [Projection] source.a, source.b, multiIf_0: 
       |           \    (CASE 
       |           |      WHEN (c_0 IS NULL and ((source.b % 2) = 0)) THEN 0 
       |           |      WHEN (c_0 IS NULL and (source.b < 11)) THEN 1 
       |           |      WHEN (c_0 IS NULL and (source.b >= 11)) THEN 2 
       |           |      ELSE NULL 
       |           |    END)
       |           \_[7] [Join] Mode: Left [(source.a = target.a)]
       |              \_[8] [StoredTable] Name: "source"
       |              \_[9] [Projection] target.a, c_0: TRUE
       |                 \_[10] [StoredTable] Name: "target"
       \_[11] [Projection] source.a, add_checked_0: (1 + source.b)
          \_[12] [Filter] (multiIf_0 = 2)
             \_Recurring Node --> [5]

Going bottom up once again, beyond the base join of source and target tables:

[6] is the projection discussed in the CASE WHEN section above, which is classifying every row-pair in terms of which merge clause it should be handled by, and forwarding that classification in the new column dubbed multiIf_0. All the c_0 IS NULL checks are coming from the fact that all three clauses are of the NOT MATCHED BY TARGET category.

[5] Introduces what Firebolt calls a loopback shuffle, a building block that enables multiple operators to reuse the same input stream. Conceptually, this can be viewed as a form of "fork" operator which allows both [4] and [14] to consume the data produced by [6]. In this case specifically, there are two Filters that are going to split the classified stream of row-pairs into sections, and pass only subsets of the stream up further to the Insert node. 

Note that the “shuffle” keyword is usually reserved for distributed join or aggregation processing in which nodes need to send data to each other and hence data is “shuffled” across the network. In contrast, the loopback shuffle does not move any data off the node and simply broadcasts data from one local input stream to multiple local output streams. It keeps track of which blocks have been consumed by which output stream, and applies backpressure onto the input stream so that no consumer gets “too far ahead” or falls “too far behind”. (It would lead to a memory usage explosion if one consumer was reading very slowly, whereas another was speeding ahead and asking to load the whole input stream into memory.)

At Firebolt, we use loopback shuffle for other DAG shaped plans, such as join pruning using a filtered primary index key set that is passed sideways from one side of the join to the other. More on this in a separate blog.

So now you can see why and how branches [3,4] and [13,14] are reusing node [5]. 

[3,4] Is filtering out the subset of row-pairs that match the second WHEN condition, and then preparing the data for insertion: WHEN NOT MATCHED AND s.b < 11 THEN INSERT VALUES (s.a, 100 * s.b). These are the rows that were assigned classification 1 by the CASE statement [6].

[11,12] Is filtering out the subset of row-pairs that match the third WHEN condition, and then preparing the data for insertion: WHEN NOT MATCHED AND s.b >= 11 THEN INSERT VALUES (s.a, 1 + s.b). These are the rows that were assigned classification 2 by the CASE statement [6].

Note that there is no action branch to handle rows matching (multiIf_0 = 0). These would be the rows caught by the first match clause of the query. As a DO NOTHING action, its purpose was precisely to classify some subset of rows for non-action. And so its execution is satisfied by the CASE statement assigning those row-pairs a unique classification which will not get picked up by any of the downstream filters. There is “nothing more” to do for those row-pairs.

[2] The Union operator joins the two transformed streams into a single input for the Insert step. Handing the Insert operator the full data stream will let it best distribute the data across all newly generated tablets, thereby improving tablet quality.

[0,1] Are the same top-level operators as before.

In a nutshell, while performing a table synchronization or a data deduplication task, the introduction of the loopback shuffle is the key performance boost that MERGE offers over running multiple separate DML statements. The fact that the base join can be done once and then reused by all the data mutation branches is crucial.

Now, let’s see what the reuse looks like when there’s even more action types requested. But before that, let’s step to the side for a moment to refresh what it actually means to INSERT, UPDATE, or DELETE from a managed (internally persisted) Firebolt table.

Tablets: What INSERT, UPDATE & DELETE affect

At Firebolt, tables are persisted in units of “tablets”, and this is what all DML operations ultimately operate over. A tablet is a collection of data and metadata files that are (for the most part) immutable. As data is ingested into a table, one or more new tablets are written and stored somewhere in cloud. At the conclusion of the insert transaction, our metadata service attaches those new tablets to the tablet list that defines the table.

The sole caveat for tablet immutability comes in the form of deletion vectors. Either through UPDATE or DELETE, a user may ask to delete some subset of rows from a pre-existing tablet (say all rows where column foo is divisible by 5). While we could immediately construct a new tablet that has only the rows to-be-kept-alive, that is often overkill, and we punt on that heavy copy operation until a VACUUM op is run. Instead, a deletion vector is attached to the tablet files (conceptually a bitset of which rows are still alive; implementation wise it is a RoaringBitmap). This bitset is then referenced prior to any future reads of tablet data, ensuring old rows are ignored. 

If multiple mutation statements are applied to a single tablet, the previous deletion vector will be read each time, and a new one will be generated to reflect the net sum of the changes. The fact that there is only one trusted deletion version at any one time actually explains why it’s important to have at most one Deletion operator per table per transaction. Just planting the seed here, but this turns out to be the root cause behind adding the UNION operators to the DML nodes in the MERGE plan in the first place.

To reiterate: other than this mutable deletion bitset concept, all files in a tablet are immutable. And DML operations operate over tablets. INSERT creates new tablets; DELETE marks subsections of a tablet as deleted, or drops tablets entirely; UPDATE applies a DELETE on all rows that are to be changed, and then inserts new tablets with precisely the affected rows and their new values. 

Multiple Action Types in MERGE

Let’s go all in. What if a user requests the classic “please sync my tables” MERGE statement? Insert the new rows, delete the old rows, and overwrite any duplicate rows. In other words, the source table is more up-to-date and we would like to trust it.

In the general MERGE query, of course, there could be multiple match clauses per category, and every clause could have further conditionals. We present the simplest case of one clause per category and zero extra conditionals, purely to demonstrate how the Insert/Delete nodes roll up.

[EXAMPLE 3] Query:

MERGE INTO target t USING source s ON t.a = s.a
WHEN NOT MATCHED BY TARGET THEN INSERT VALUES (s.a, s.b)
WHEN NOT MATCHED BY SOURCE THEN DELETE
WHEN MATCHED THEN UPDATE SET b = s.b;

The executed plan, for readability having extracted the CASE statement like so:

multiIf := 
(CASE
   WHEN (c_1 IS NOT NULL and c_0 IS NOT NULL) THEN 0  -- when matched
   WHEN c_1 IS NULL THEN 1  -- when not matched by target 
   WHEN c_0 IS NULL THEN 2  -- when not matched by source 
   ELSE NULL
END)
[0] [TableModify]
 \_[1] [Delete] target table: "target"
 |  \_[2] [Projection] target.$tablet_row_number, target.$tablet_txid, target.$tablet_id, target.$source_node_id
 |     \_[3] [Union]
 |        \_[4] [Projection]
 |        |  \_[5] [Filter] (multiIf = 0)
 |        |     \_[6] [Projection]
 |        |        \_[7] [Shuffle] Loopback with disjoint readers
 |        |           \_[8] [Join] Mode: Full [(source.a = target.a)]
 |        |              \_[9] [Projection] source.a, source.b, c_0: TRUE
 |        |                 \_[10] [StoredTable] Name: "source"    
 |        |              \_[11] [Projection] target.a, target.$source_node_id, target.$tablet_id, target.$tablet_txid, target.$tablet_row_number, c_1: TRUE
 |        |                 \_[12] [StoredTable] Name: "target"
 |        \_[13] [Projection]
 |           \_[14] [Filter] (multiIf = 2)
 |              \_[15] [Projection]
 |                 \_Recurring Node --> [7]
 \_[16] [Insert] target table: "target"
    \_[17] [Union]
       \_[18] [Projection] target.a, source.b
       |  \_[19] [Filter] (multiIf = 0)
       |     \_[20] [Projection] source.b, c_0, target.a, c_1
       |        \_Recurring Node --> [7]
       \_[21] [Projection] source.a, source.b
          \_[22] [Filter] (multiIf = 1)
             \_[23] [Projection] source.a, source.b, c_0, c_1
                \_Recurring Node --> [7]

Having again greyed out the bottom most Join, as well as the topmost TableModify operator, I’d like to point out the high level hierarchy. 

At the top, multiple types of DML operations (Delete and Insert), are now inputs to a single TableModify node ([1], [16]). This is what is enabling multiple types of data mutation to be run under the same transaction. The final query will succeed only if all children of the TableModify node succeed.

Both the Delete and Insert nodes happen to have multiple row-pair classifications sent to them, and hence are receiving Unions of substreams ([3], [17]).

Why are they both receiving multiple classifications, given the three original match clause actions? Because the subset multiIf = 0 , expressing the WHEN MATCHED clause, is an UPDATE action, which splits into a Delete and an Insert component. The subset multiIf = 1 has only an Insert component. And the subset multiIf = 2 has only a Delete component.

All these filters and projections are reading from a single base loopback shuffle [7], and the base join is now a FULL OUTER (marked) join, rather than a LEFT OUTER (marked) join, because both target and source row values are required in order to execute all later branches. 

Note also that there are now markers c_0, c_1 applied to both source and target rows, respectively, such that we can tell apart a source row of all null’s, from a target row without a match in the source table, and vice versa.

Phew. There you have the composability of this query plan: 

  • Complex conditionals will make it into the branch filters. 
  • Complex data transformations (upon Insert or Update) will make it into the projections before Insert. 
  • As many clauses as requested will get unioned and piped up to the DML processing nodes.

What are the differences introduced by making this plan distributed?

Distributed (Multi Node) Query Plan

We’ve mentioned a type of shuffle already: the loopback shuffle. Now we introduce the Hash and Key-Identity (or Homecoming) shuffles that will actually move data between the nodes of a distributed engine, at two key steps.

Let’s continue using the same full-sync query [EXAMPLE 3].

Difference 1: Hash Shuffle of JOIN inputs

\_[Join] Mode: Full [(source.a = target.a)]
   \_[Shuffle] Hash by [source.a]
   |  \_[Projection] source.a, source.b, c_0: TRUE
   |     \_[StoredTable] Name: "source"
   \_[Shuffle] Hash by [target.a]
      \_[Projection] target.a, target.$source_node_id, target.$tablet_id, target.$tablet_txid, target.$tablet_row_number, c_1: TRUE
         \_[StoredTable] Name: "target"

The source and target tables (being default FACT, rather than DIMENSION tables) are now distributed, and each node can read only a portion of them locally. On a multi-node engine then, given the join condition of target.a = source.a, each table’s data will get shuffled across the engine, partitioned by the hash of each of those columns. More complex join conditions will hash all relevant columns from each base table. 

Note that once the full data partitions land on every node, the join can proceed node-locally. Nothing about the node-local reuse of this base join (via loopback shuffle), nor the filter branches higher up, is affected.

Fundamentally speaking, the scalability of this basemost join is what enabled MERGE to be scalable out-of-the-box. Convenient!

Difference 2: Key-Identity (or Homecoming) Shuffle of Delete inputs

[0] [TableModify]
 \_[1] [Delete] target table: "target"
 |  \_[2] [Shuffle] KeyIdentity by target.$source_node_id
 |     \_[3] [Projection] target.$tablet_row_number, target.$tablet_txid, target.$tablet_id, target.$source_node_id
 |       \_[4] [Union]
 |           \_ ... multiple branches

For this difference, I have to more deeply explain how delete works. For starters, a Key-Identity Shuffle sends rows to a specific node ordinal (rather than hashing row values to decide the receiver node at runtime). But why would you need to shuffle the stream of (tablet, row_number) pairs sent to a Delete node? And it looks like we’re sending each pair back to its $source_node_id?

Here’s why. On a multi-node engine, the entire query pipeline is duplicated on every node. So every node will have its own active Delete operator that can receive row sets, write and serialize RoaringBitmap deletion vectors, etc. But a limitation mentioned earlier is that at the end of any given transaction, a persisted tablet must have exactly one deletion vector specified. Which means you cannot allow multiple nodes to delete rows from the same tablet - in this case they would upload multiple independent (and partial) deletion vectors, and all but one of those would fail to be committed to the tablet’s metadata. It is like some deletions would have never taken place.

So it’s great to have an active Delete operator per table per node. However each of them must handle rows from distinct sets of tablets.

The workaround is to assign to every tablet a “parent” node, here called the “source”. That’s the node responsible for initially scanning the tablet and then for uploading a single and complete deletion vector for that tablet at the end of every DML query. (Note that it behooves us that this node will already have the previous version of the deletion vector cached.) Applying the Key-Identity Shuffle using this “source” node id lets us guarantee the correctness of the distributed Delete step. I’ve always thought it a bit sweeter to call the mechanism a “Homecoming Shuffle” instead 🐑, for all the wandering lil’ rows to be sent back to where they came from. 

Note that this shuffle is introduced only if at least one prior layer of the pipeline ever shuffled data away from its source node. (In our case, it is the distributed join at the bottom of the MERGE plan which added the initial hash shuffles.)

And now a neat aside - to be fully transparent, implementing MERGE did not require building out or even newly instantiating this Key-Identity Shuffle layer. It was an existing building block, already used to support distributed UPDATE and DELETE queries. What MERGE introduced was the first ever time that more than one data stream could contribute to a distributed DELETE of the same table, all within a single query plan. (Consider multiple match clauses, with different conditionals, that all request the DO DELETE action.) So whereas the shuffle layer was already in place - this is actually the true reason that we started adding UNION operators to all the branches funneling into DML nodes. I mentioned earlier that unioning INSERT streams improves tablet quality. But unioning DELETE streams is truly a basic correctness requirement to ensure that all rows-to-be-deleted make it through the same shuffle infrastructure, and get marked exactly once per tablet.

INSERT ON CONFLICT: Syntactic Sugar for MERGE

The final cherry on top of building out this new syntax was enabling a syntactic “candy wrapper” around the merge plan to support INSERT ON CONFLICT syntax. See the documentation for the full syntax spec and its current limitations. 

The MERGE spec provides full flexibility on the number and order of merge clauses that you choose to interleave. However many common "upsert" patterns can be expressed more concisely. Specifically - say that you want some tuples to be inserted into a target table without creating duplicates. Once a duplicate is encountered, the only decision is whether to keep the original row or to overwrite it. PostgreSQL defines this as an ON CONFLICT DO <X> clause on top of the classic INSERT statement. With options to DO NOTHING (e.g. INSERT IGNORE) or to DO UPDATE (e.g. INSERT UPDATE). 

And the key observation for implementing this translation was that the ON CONFLICT syntax can be rewritten as a MERGE. Observe the following.

Original INSERT query:

INSERT INTO target (id, value, last_updated) VALUES (1, 'retro', GETDATE())
ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value, last_updated = EXCLUDED.last_updated;

Translates to the following MERGE statement:

MERGE INTO target AS T
USING VALUES (1, 'retro', GETDATE()) AS EXCLUDED (id, value, last_updated)
ON T.id = EXCLUDED.id
WHEN NOT MATCHED BY TARGET THEN
    INSERT (id, value, last_updated) 
    VALUES (EXCLUDED.id, EXCLUDED.value, EXCLUDED.last_updated)
WHEN MATCHED THEN
    UPDATE SET
        value = EXCLUDED.value,
        last_updated = EXCLUDED.last_updated;

Therefore, some parsing magic was all that was required to support folks’ favorite UPSERT commands. Now you know what’s happening under the hood when you invoke them.

Looking Forward

MERGE is currently in public preview. Thanks to our powerful runtime building blocks, crafting the right MERGE plan made the execution “just work”. Use the EXPLAIN command (documentation) to see the query plans for yourself.

Several of our customers are already using MERGE for table sync or ingest, repeating the query pattern across many of their tables every couple minutes or hours. 

Word to the wise:

Anywhere from 0-100% of your table can be updated on sync. To reduce churn, avoid empty rewrites (i.e. deleting and inserting a row that is exactly equivalent).

We’re looking forward to hearing how MERGE makes a difference in your day-to-day data-filled lives. Please reach out with any performance or scalability issues that you uncover. 

Thanks for learning about the underlying query plan and implementation. Query on everyone.

To all posts

Intrigued? Want to read some more?