WAL on Kafka
Architecture
In this section we present a distributed WAL implementation(based on Kafka). Write-ahead logs(hereinafter referred to as logs) of tables are managed here by region, which can be simply understood as a shared log file of multiple tables.
As shown in the following figure, regions are mapped to topics(with only one partition) in Kafka. And usually two topics are needed by a region, one is used for storing logs and the other is used for storing metadata.
┌──────────────────────────┐
│ Kafka │
│ │
│ ...... │
│ │
│ ┌─────────────────────┐ │
│ │ Meta Topic │ │
│ │ │ │
Delete │ │ ┌─────────────────┐ │ │
┌──────────────────────┐ ┌───────┼─┼─► Partition │ │ │
│ HoraeDB │ │ │ │ │ │ │ │
│ │ │ │ │ └─────────────────┘ │ │
│ ┌──────────────────┐ │ │ │ │ │ │
│ │ WAL │ │ │ │ └─────────────────────┘ │
│ │ ...... │ │ │ │ │
│ │ ┌──────────────┐ │ │ │ │ ┌──────────────────────┐ │
│ │ │ Region │ │ │ │ │ │ Data Topic │ │
│ │ │ ├─┼─┼──┘ │ │ │ │
| | | ┌──────────┐ │ │ │ │ │ ┌──────────────────┐ │ │
│ │ │ │ Metadata │ │ │ │ │ │ │ Partition │ │ │
│ │ │ └──────────┘ │ │ │ Write │ │ │ │ │ │
Write ─────────┼─┼─► ├─┼─┼───┐ │ │ │ ┌──┬──┬──┬──┬──┐ │ │ │
│ │ │ ┌──────────┐ │ │ │ └──────┼─┼─┼─► │ │ │ │ ├─┼─┼─┼────┐
│ │ │ │ Client │ │ │ │ │ │ │ └──┴──┴──┴──┴──┘ │ │ │ │
Read ◄─────────┼─┼─┤ └──────────┘ │ │ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ └──────────────────┘ │ │ │
│ │ └──▲───────────┘ │ │ │ │ │ │ │
│ │ │ ...... │ │ │ └──────────────────────┘ │ │
│ └────┼─────────────┘ │ │ ...... │ │
│ │ │ └──────────────────────────┘ │
└──────┼───────────────┘ │
│ │
│ │
│ Read │
└──────────────────────────────────────────────────────────┘
Data Model
Log Format
The common log format described in WAL on RocksDB is used here.
Metadata
Each region will maintain its metadata both in memory and in Kafka, we call it RegionMeta
here. It can be thought of as a map, taking table id as a key and TableMeta
as a value.
We briefly introduce the variables in TableMeta
here:
next_seq_num
, the sequence number allocated to the next log entry.latest_marked_deleted
, the last flushed sequence number, all logs in the table with a lower sequence number than it can be removed.current_high_watermark
, the high watermark in the Kafka partition after the last writing of this table.seq_offset_mapping
, mapping from sequence numbers to offsets will be done on every write and will removed to the updatedlatest_marked_deleted
after flushing.
┌─────────────────────────────────────────┐
│ RegionMeta │
│ │
│ Map<TableId, TableMeta> table_metas │
└─────────────────┬───────────────────────┘
│
│
│
└─────┐
│
│
┌──────────────────────┴──────────────────────────────┐
│ TableMeta │
│ │
│ SequenceNumber next_seq_num │
│ │
│ SequenceNumber latest_mark_deleted │
│ │
│ KafkaOffset high_watermark │
│ │
│ Map<SequenceNumber, KafkaOffset> seq_offset_mapping │
└─────────────────────────────────────────────────────┘
Main Process
We focus on the main process in one region, following process will be introduced:
- Open or create region.
- Write and read logs.
- Delete logs.
Open or Create Region
Steps
- Search the region in the opened namespace.
- If the region found, the most important thing we need to do is to recover its metadata, we will introduce this later.
- If the region not found and auto creating is defined, just create the corresponding topic in Kafka.
- Add the found or created region to cache, return it afterwards.
Recovery
As mentioned above, the RegionMeta
is actually a map of the TableMeta
. So here we will focus on recovering a specific TableMeta
, and examples will be given to better illustrate this process.
- First, recover the
RegionMeta
from snapshot. We will take a snapshot of theRegionMeta
in some scenarios (e.g. mark logs deleted, clean logs) and put it to the meta topic. The snapshot is actually theRegionMeta
at a particular point in time. When recovering a region, we can use it to avoid scanning all logs in the data topic. The following is the example, we recover from the snapshot taken at the time when Kafka high watermark is 64:
high watermark in snapshot: 64
┌──────────────────────────────┐
│ RegionMeta │
│ │
│ ...... │
│ ┌──────────────────────────┐ │
│ │ TableMeta │ │
│ │ │ │
│ │ next_seq_num: 5 │ │
│ │ │ │
│ │ latest_mark_deleted: 2 │ │
│ │ │ │
│ │ high_watermark: 32 │ │
│ │ │ │
│ │ seq_offset_mapping: │ │
│ │ │ │
│ │ (2, 16) (3, 16) (4, 31) │ │
│ └──────────────────────────┘ │
│ ...... │
└──────────────────────────────┘
- Recovering from logs. After recovering from snapshot, we can continue to recover by scanning logs in data topic from the Kafka high watermark when snapshot is taken, and obviously that avoid scanning the whole data topic. Let's see the example:
┌────────────────────────────────────┐
│ │
│ high_watermark in snapshot: 64 │
│ │
│ ┌──────────────────────────────┐ │
│ │ RegionMeta │ │
│ │ │ │
│ │ ...... │ │
│ │ ┌──────────────────────────┐ │ │
│ │ │ TableMeta │ │ │
│ │ │ │ │ │
│ │ │ next_seq_num: 5 │ │ │ ┌────────────────────────────────┐
│ │ │ │ │ │ │ RegionMeta │
│ │ │ latest_mark_deleted: 2 │ │ │ │ │
│ │ │ │ │ │ │ ...... │
│ │ │ high_watermark: 32 │ │ │ │ ┌────────────────────────────┐ │
│ │ │ │ │ │ │ │ TableMeta │ │
│ │ │ seq_offset_mapping: │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ next_seq_num: 8 │ │
│ │ │ (2, 16) (3, 16) (4, 31) │ │ │ │ │ │ │
│ │ └──────────────────────────┘ │ │ │ │ latest_mark_deleted: 2 │ │
│ │ ...... │ │ │ │ │ │
│ └──────────────────────────────┘ ├──────────────────► │ high_watermark: 32 │ │
│ │ │ │ │ │
│ ┌────────────────────────────────┐ │ │ │ seq_offset_mapping: │ │
│ │ Data topic │ │ │ │ │ │
│ │ │ │ │ │ (2, 16) (3, 16) (4, 31) │ │
│ │ ┌────────────────────────────┐ │ │ │ │ │ │
│ │ │ Partition │ │ │ │ │ (5, 72) (6, 81) (7, 90) │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ ┌────┬────┬────┬────┬────┐ │ │ │ │ └────────────────────────────┘ │
│ │ │ │ 64 │ 65 │ ...│ 99 │100 │ │ │ │ │ ...... │
│ │ │ └────┴────┴────┴────┴────┘ │ │ │ └────────────────────────────────┘
│ │ │ │ │ │
│ │ └────────────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────┘ │
│ │
└────────────────────────────────────┘
Write and Read Logs
The writing and reading process in a region is simple.
For writing:
- Open the specified region (auto create it if necessary).
- Put the logs to specified Kafka partition by client.
- Update
next_seq_num
,current_high_watermark
andseq_offset_mapping
in correspondingTableMeta
.
For reading:
- Open the specified region.
- Just read all the logs of the region, and the split and replay work will be done by the caller.
Delete Logs
Log deletion can be divided into two steps:
- Mark the logs deleted.
- Do delayed cleaning work periodically in a background thread.
Mark
- Update
latest_mark_deleted
andseq_offset_mapping
(just retain the entries whose's sequence >= updated latest_mark_deleted) inTableMeta
. - Maybe we need to make and sync the
RegionMeta
snapshot to Kafka while dropping table.
Clean
The cleaning logic done in a background thread called cleaner:
- Make
RegionMeta
snapshot. - Decide whether to clean the logs based on the snapshot.
- If so, sync the snapshot to Kafka first, then clean the logs.