System Design Case study

Database Scalling

Shopfiy

  • they started off by splitting the primary database into separate parts
  • They identified groups of large tables that could exist on separate databases, and used GhostFerry to move a few tables onto a new database.
  •  This scaling approach is referred to as “federation” where tables are stored in different MySQLs
  • As the app further grew, they were starting to hit the limit of a single MySQL.disk size take many Terabytes.y. they couldn’t further split the primary database, as that would add more complexity in the application layer, and require cross database transactions.
  • They choose Vitess (Vitess is an open source database system abstraction on top of MySQL that provides many benefits (docs with details) )

Figma

Verticall sharding (April 4, 2023 )

They go with vertically partition[[Databases#Database Partitioning]] the database by table(s). Instead of splitting each table across many databases, we would move groups of tables onto their own databases. This proved to have both short- and long-term benefits: Vertical partitioning relieves our original database now, while providing a path forward for horizontally sharding subsets of our tables in the future.

They identify which tables can be split by using average active sessions (AAS) for queries, which describes the average number of active threads dedicated to a given query at a certain point in time. We calculated this information by querying pg_stat_activity in 10 millisecond intervals to identify CPU waits associated with a query, and then aggregated the information by table name

They choose the table which will not do joins and required transactions

Migration approach

  1. Prepare client applications to query from multiple database partitions
  2. Replicate tables from original database to a new database until replication lag is near 0
  3. Pause activity on original database
  4. Wait for databases to synchronize
  5. Reroute query traffic to the new database
  6. Resume activity

Note: To fast down the  logical replication on they removed the indexing and add the indexing after everythings compeleted

They used Log Sequence Number (it is a unique identifier assigned to each transaction log entry, representing the order in which changes were made to the database. LSNs are used to track the state of replication and determine whether two databases are synchronized.)

They created new Query Routing service will centralize and simplify routing logic as we scale to more partitions.

Horizontal sharding (March 14, 2024)

 Our first goal was to shard a relatively simple but very high traffic table in production as soon as possible    horizontally sharded groups of related tables into colocations.which shared the same sharding key and physical sharding layout. This provided a friendly abstraction for developers to interact with horizontally sharded tables.

shard key -> selected a handful of sharding keys like UserID, FileID, or OrgID. Almost every table at Figma could be sharded using one of these keys.

They group the table in same sharding if they comes under single domain and they have same shard key such that the query will support join and transaction

Example: Imagine Figma has a colo named “UserColo” that includes tables related to user data. Within this colo, there are tables such as “Users”, “UserPreferences”, and “UserActivity”. Each of these tables is sharded based on the UserID, ensuring that data related to a specific user is stored together on the same shard.

Logical Sharding and Physical Sharding

First they did Logical Sharding that involves partitioning or organizing data at the application layer in a way that simulates horizontal sharding without physically distributing the data across multiple shards.

Then after sucess of logical they implement Physical Sharding that involves the actual distribution of data across multiple backend database servers

 DBProxy service that intercepts SQL queries generated by our application layer, and dynamically routes queries to various Postgres databases. build with GO  The job is  - A query parser reads SQL sent by the application and transforms it into an Abstract Syntax Tree (AST).

  • A logical planner parses the AST and extracts the query type (insert, update, etc) and logical shard IDs from the query plan.
  • A physical planner maps the query from logical shard IDs to physical databases. It rewrites queries to execute on the appropriate physical shard.
  • if query does not have shard key it will send to all cluster and aggregate the result.
  • If they running query that join two table in different shard they will reject it

Notion

check here

  • They go with horizontal sharding and application-level sharding.
  • Partition block data by workspace ID
  • 480 logical shards evenly distributed across 32 physical databases.

Migratio process

  1. Double-write: Incoming writes get applied to both the old and new databases.
  2. Backfill: Once double-writing has begun, migrate the old data to the new database.
  3. Verification: Ensure the integrity of data in the new database.
  4. Switch-over: Actually switch to the new database. This can be done incrementally, e.g. double-reads, then migrate all reads.

Zerodha

They have only one database no replica set and split the database based on financial year. they doing backup in s3

they using postgres as caching layer where they stored one day of data in this postgres caching layer after one day they drop the database and again move one day data to caching layer. the tool they used is dungbeetle spec : 16core 32GB ram

Learning: we can do wired thing if it work for us :)

Stripe

Stripe’s database infrastructure team built an internal database-as-a-service (DBaaS) offering called DocDB.

How Applications Access DocDB?

DocDB leverages sharding to achieve horizontal scalability for its database infrastructure. With thousands of database shards distributed across Stripe’s product applications, sharding enables efficient data distribution and parallel processing.

Stripe’s database infrastructure team developed a fleet of database proxy servers implemented in Golang. These proxy servers handle the task of routing queries to the correct shard.

When an application sends a query to a database proxy server, it performs the following steps:

  • Parsing the query
  • Routing it to one or more shards
  • Combining the results received from the shards
  • Returning the final result to the application

Caching

DoorDash’s

They use Layered caches

  1. Request local cache: Lives only for the lifetime of the request; uses a simple HashMap.
  2. Local cache: Visible to all workers within a single Java virtual machine; uses a Caffeine cache for heavy lifting.
  3. Redis cache: Visible to all pods sharing the same Redis cluster; uses Lettuce client.

They have Runtime feature flag control to enable and disable the caching in layer

Cache invalidation

  • Using Change Data Capture events emitted when database tables are updated
  •  The cache could be invalidated directly within the application code when data changes. This is faster but potentially more complex

Cache key how they create unique cache key

  1. Unique cache name, which is used as a reference in runtime controls. 
  2. Cache key type, a string representing the key’s type of entity to allow categorization of cache keys.
  3. ID, a string that refers to some unique entity of cache key type.
  4. Configuration, which includes default TTLs and a Kotlin serializer.

To standardize key schema, we chose the uniform resource name (URN) format:

urn:doordash:<cache key type>:<id>#<cache name>

Uber

They using Docstore (distributed database built on top of MySQL) database where they want to implement caching in query engine layer to optimize the db so let see how they did

Docstore

Docstore is mainly divided into three layers: a stateless query engine layer, a stateful storage engine layer, and a control plane.

The stateless query engine layer is responsible for query planning, routing, sharding, schema management, node health monitoring, request parsing, validation, and AuthN/AuthZ.  (they plan to build cache on front of stateless query engine)

The storage engine layer is responsible for consensus via Raft, replication, transactions, concurrency control, and load management. A partition is typically composed of MySQL nodes backed by NVMe SSDs, which are capable of handling heavy read and write workloads. Additionally, data is sharded across multiple partitions containing one leader and two follower nodes using Raft for consensus.

CacheFront

Since Docstore’s query engine layer is responsible for serving reads and writes to clients, it is well suited to integrate the caching layer. It also decouples the cache from disk-based storage, allowing us to scale either of them independently. The query engine layer implements an interface to Redis for storing cached data along with a mechanism to invalidate cached entries

CacheFront uses a cache aside strategy to implement cached reads:

  1. Query engine layer gets read request for one more rows
  2. If caching is enabled, try getting rows from Redis; stream response to users
  3. Retrieve remaining rows (if any) from the storage engine
  4. Asynchronously populate Redis with the remaining rows
  5. Stream remaining rows to users

Cache Invalidation

They used Change Data Capture for Cache Invalidation they have publisher which will  publishes the events for each update in DB and they have consumer which will listen for the changes and do invalidation in Cache

Cache key

They used below format RK{<tablename} | <PARTIONkEY>| <ROWKEY>|<INSTANCE>}

Cache Warming

A Docstore instance spawns two different geographical regions to ensure high availability and fault tolerance. they both have two seprate redis in there region  In case of a region failover, another region must be able to serve all requests.

If we have two region we need to sync db and cache data among the region such that if one region get down we will get data from other region but the problem is for the Docstore has its own cross-region replication mechanism. If we replicate the cache content using Redis cross-region replication, we will have two independent replication mechanisms, which could lead to cache vs. storage engine inconsistency

So to solve this they tail the Redis write stream and replicate keys to the remote region. In the remote region instead of directly updating the remote cache, read requests are issued to the query engine layer which, upon a cache miss, reads from the database and writes to the cache such that now both region have same consistent data.

Circuit Breakers

If a Redis node goes down, we’d like to be able to short circuit requests to that node to avoid the unnecessary latency penalty of a Redis get/set request

To achieve this, we use a sliding window circuit breaker. We count the number of errors on each node per time bucket and compute the number of errors in the sliding window width.

Avoiding DB overload on cache down: let say the redis node is down then suddenly all request will forward to DB. db will be overloaded to avoid that they dynamically adjust the db timeout of the query

Facebook

How Meta Achieves 99.99999999% Cache Consistency:

common race condition for inconsistency:

  1. The client queries the cache for a value not present in it
  2. So the cache queries the database for the value: x = 0
  3. In the meantime, the value in the database gets changed: x = 1
  4. But the cache invalidation event reaches the cache first: x = 1
  5. Then the value from cache fill reaches the cache: x = 0

To solve this they created  observability solution.

Monitoring They created a separate service to monitor cache inconsistency & called it Polaris

  • It acts like a cache server & receives cache invalidation events
  • Then it queries cache servers to find data inconsistency
  • It queues inconsistent cache servers & checks again later
  • It checks data correctness during writes, so finding cache inconsistency is faster
  • Simply put, it measures cache inconsistency
  • Polaris queries the database at timescales of 1, 5, or 10 minutes. It lets them back off efficiently & improve accuracy.

Tracing

  • It logs only data changes that occur during the race condition time window. Thus log storage becomes cheaper
  • It keeps an index of recently modified data to determine if the next data change must be logged
  • Polaris reads logs if cache inconsistency is found & then sends notifications

Logging

Pinterest

Twitter

To achieve stability and scalability, they used Open Distro for Elasticsearch, but added a proxy and two services: Ingestion Service and Backfill Service.

The proxy separates read and write traffic from clients, handles client authentication, and provides additional metrics and flexible routing and throttling. This design creates a single entry point for all requests and makes it easier for customers to build solutions.

The Ingestion Service was introduced to handle large traffic spikes. It queues requests from clients into a Kafka topic, and worker clients then send the requests to the Elasticsearch cluster. The service batches requests, listens to back-pressure, auto-throttles, and retries with backoff, smoothing out traffic to the cluster and preventing overload.