Monday, December 4, 2023
HomeBig DataBettering Concurrency in Redis Charge Limiting System

Bettering Concurrency in Redis Charge Limiting System


Background

Charge limiting is a method used to guard companies from overload. As well as, it may be used to forestall hunger of a multi-tenant useful resource by a number of very giant prospects. At Rockset, we primarily use fee limiting to guard our:

  1. metadata retailer from overload brought on by too many API requests.
  2. log retailer from filling up as a consequence of mismatched enter and output charges
  3. management aircraft from too many state transitions.

We use Redisson RateLimiter which makes use of Redis beneath the hood to trace fee utilization. At a really primary stage, our utilization of the library appears to be like like this (omitting particular enterprise logic for higher readability):

class RedisRateLimiter {
  non-public remaining RRateLimiter rateLimitService = ...;

  public boolean isNotRateLimited(String key, int requestedTokens) {
      return rateLimitService.purchase(key, requestedTokens);
  }
}

Let’s not dive into the small print of RRateLimiter, however suffice it to say that this makes a community name to Redis. RedisRateLimiter.purchase will return true if requestedTokens wouldn’t exceed your fee restrict and false in any other case.

Drawback

Lately, we noticed that as a consequence of many requests to Redis, the CPU on our Redis cluster was getting near 100%. The very first thing we tried was vertically scaling up our Redis occasion to purchase us time. Nevertheless, vertical scaling has its personal limits and each few weeks we might find yourself with one other surge in Redis CPU.

We additionally observed that Redisson makes use of Lua scripting on the server aspect and observed that lua compilation was taking on a good chunk of CPU time. One other low hanging fruit we tried was configuring Redisson to cache lua compilation on the server aspect, lowering CPU time spent on this activity. Since this was a easy config change, it didn’t require a code deploy and was straightforward to get out.

Other than vertical scaling and enhancing configuration, we brainstormed a number of different approaches to the issue:

  1. We might shard Redis over the speed restrict keys to unfold the load and horizontally scale.
  2. We might queue fee restrict requests regionally and have a single thread that periodically (i.e. each 50ms) takes n objects off the queue and requests a bigger batch of tokens from Redis.
  3. We might proactively reserve bigger batches of tokens and cache them regionally. When a request for tokens is available in, strive getting back from the native cache. If that does not exist, go fetch a bigger batch. That is analogous to Malloc not making a sys name each time reminiscence is requested and as a substitute reserving bigger chunks that it manages.

Horizontally scaling Redis by sharding is a superb long-term resolution; it’s in all probability one thing we’re going to finish up doing in some unspecified time in the future.

The issue with the second method is it raises a number of complexities: How continuously does the thread pull from the queue and ballot? Do you cap the scale of the queue and if that’s the case, what occurs if the queue is full? How do you even set the cap on the queue? What if Redis has 50 tokens and we batch 10 requests every needing 10 tokens (asking Redis for a complete of 100 tokens)? Ideally 5 requests ought to succeed, however in actuality all 10 would fail. These issues are solvable, however would make the implementation fairly complicated. Thus, we ended up implementing the third resolution.

As proven in the direction of the top of the put up, this implementation lowered Redis connections on fee restrict calls by 96%. The remainder of this put up will discover how we carried out the third method. It goes into a number of the pitfalls, complexities, and issues to contemplate when engaged on a batch-oriented resolution corresponding to this one.

Implementation

Notice that code introduced on this weblog is in Java. Not all error dealing with is proven for simplicity. Additionally, I’ll reference a now() methodology which merely returns the unix timestamp in seconds from epoch.

Let’s begin easy:

class RedisRateLimiter {
  non-public remaining RRateLimiter rateLimitService = ...;
  non-public remaining lengthy batchSize = ...;
  non-public remaining lengthy timeWindowSecs = ...;
  non-public lengthy reservedTokens = 0;
  non-public lengthy expirationTs = 0;

  public boolean isNotRateLimited(String key, int requestedTokens) {
    // On this case, we'd as properly make a direct name to
    // simplify issues.
    if (requestedTokens > batchSize) {
      return rateLimitService.purchase(key, requestedTokens);
    }

    if (reservedTokens >= requestedTokens && expirationTs <= now()) {
      reservedTokens -= requestedTokens;
      return true;
    }

    if (rateLimitService.purchase(key, batchSize)) {
      reservedTokens = batchSize - requestedTokens;
      expirationTs = now() + timeWindowSecs;
      return true;
    }

    return false;
  }
}

This code appears to be like positive upon first look, however what occurs if a number of threads must name isNotRateLimited on the similar time? The above code is actually not thread protected. I’ll depart as an train to the reader why making reservedTokens into an Atomic variable will not resolve the issue (though do tell us in the event you provide you with a intelligent lock-free resolution). If Atomics will not work, we will strive utilizing Locks as a substitute:

class RedisRateLimiter {
  non-public remaining RRateLimiter rateLimitService = ...;
  non-public remaining lengthy batchSize = ...;
  non-public remaining lengthy timeWindowSecs = ...;
  non-public remaining Lock lock = new ReentrantLock();
  non-public lengthy reservedTokens = 0;
  non-public lengthy expirationTs = 0;

  public boolean isNotRateLimited(String key, int requestedTokens) {
    // On this case, we'd as properly make a direct name to
    // simplify issues.
    if (requestedTokens > batchSize) {
      return rateLimitService.purchase(key, requestedTokens);
    }

    lock.lock();
    strive {
      if (reservedTokens >= requestedTokens && expirationTs <= now()) {
        reservedTokens -= requestedTokens;
        return true;
      } else if (expirationTs <= now()) {
        // Dissipate remaining tokens
        requestedTokens -= reservedTokens;
        reservedTokens = 0;
      }
    } lastly {
      // Simple to miss; do not lock throughout the community request.
      lock.unlock();
    }

    if (rateLimitService.purchase(key, batchSize)) {
      lock.lock();
      reservedTokens = (batchSize - requestedTokens);
      expirationTs = now() + timeWindowSecs;
      lock.unlock();
      return true;
    }

    return false;
  }
}

Whereas at first look this appears to be like appropriate, there’s one refined drawback with it. What occurs if a number of threads see there aren’t sufficient reservedTokens? To illustrate reservedTokens is 0, our batchSize is 100, and 5 threads request 20 tokens every concurrently.

All 5 threads will see that there aren’t sufficient reserved tokens and every will fetch 100 tokens. Now, this machine is left with 450 reservedTokens and 5x too many requests to the exterior retailer. Can we do higher? All we actually want is for one thread to go and fetch a batch after which the opposite 4 threads can simply make the most of that batch. 1 community name, and fewer wasted tokens.

With some booleans and situation variables, we will fairly simply obtain this. In case you’re unfamiliar with how situation variables work, try the java docs; most languages may have some form of situation variable implementation as properly. This is the code:

class RedisRateLimiter {
  non-public remaining RRateLimiter rateLimitService = ...;
  non-public remaining lengthy batchSize = ...;
  non-public remaining lengthy timeWindowSecs = ...;
  non-public remaining Lock lock = new ReentrantLock();
  non-public remaining Situation fetchCondition = lock.newCondition();
  non-public boolean fetchInProgress = false;
  non-public lengthy reservedTokens = 0;
  non-public lengthy expirationTs = 0;

  public boolean isNotRateLimited(String key, int requestedTokens) {
    // On this case, we'd as properly make a direct name to
    // simplify issues.
    if (requestedTokens > batchSize) {
      return rateLimitService.purchase(key, requestedTokens);
    }

    boolean doFetch = false;
    lock.lock();
    strive {
      if (reservedTokens >= requestedTokens && expirationTs <= now()) {
        reservedTokens -= requestedTokens;
        return true;
      } else if (expirationTs <= now()) {
        requestedTokens -= reservedTokens;
        reservedTokens = 0;
      }

      if (fetchInProgress) {
        // Thread is already fetching; let's look forward to it to complete.
        fetchCondition.await();
        if (reservedTokens >= requestedTokens) {
          reservedTokens -= requestedTokens;
          return true;
        }
        return false;
      } else {
        doFetch = true; // This thread ought to fetch the batch
        fetchInProgress = true; // Keep away from different threads from fetching.
      }
    } lastly {
      lock.unlock();
    }

    if (doFetch) {
      boolean acquired = rateLimitService.purchase(key, batchSize);
      lock.lock();
      if (acquired) {
        reservedTokens = (batchSize - requestedTokens);
        expirationTs = now() + timeWindowSecs;
      }
      fetchCondition.signalAll(); // Get up ready threads
      lock.unlock();
      return acquired;
    }

    return false;
  }
}

Now, we are going to solely ever have one thread at a time fetching a batch. Whereas the code is logically appropriate, we’d find yourself fee limiting a thread too aggressively:

To illustrate our batch dimension is 100 and we’ve 5 threads requesting 25 tokens every concurrently. The primary thread (name it T1) will fetch the batch from the exterior service. The opposite 4 threads will wait on the situation variable. Nevertheless, the fifth thread may have waited for no motive as a result of the primary 4 threads will deplete all of the tokens within the fetched batch. As a substitute, it might need been higher to both:

  1. Instantly return false for the fifth thread (this can fee restrict too aggressively)
  2. Or have the fifth thread make a direct name to the exterior service, not ready on the primary thread.

The second resolution is carried out beneath:

class RedisRateLimiter {
  non-public remaining RRateLimiter rateLimitService = ...;
  non-public remaining lengthy batchSize = ...;
  non-public remaining lengthy timeWindowSecs = ...;
  non-public remaining Lock lock = new ReentrantLock();
  non-public remaining Situation fetchCondition = lock.newCondition();
  non-public boolean fetchInProgress = false;
  non-public lengthy reservedTokens = 0;
  non-public lengthy expirationTs = 0;
  // Variety of tokens that ready threads will deplete.
  non-public lengthy unreservedFetchTokens = 0;
  // Utilized by ready threads to find out if the fetch they're
  // ready for succeeded or not.
  non-public boolean didFetchSucceed = false;

  public boolean isNotRateLimited(String key, int requestedTokens) {
    // On this case, we'd as properly make a direct name to
    // simplify issues.
    if (requestedTokens > batchSize) {
      return rateLimitService.purchase(key, requestedTokens);
    }

    boolean doFetch = false;
    lock.lock();
    strive {
      if (reservedTokens >= requestedTokens && expirationTimesatmp <= now()) {
        reservedTokens -= requestedTokens;
        return true;
      } else if (expirationTimestamp <= now()) {
        requestedTokens -= reservedTokens;
        reservedTokens = 0;        
      }

      if (fetchInProgress) {
        if (unreservedFetchTokens >= requestedTokens) {
          // Reserve your spot in line
          unreservedFetchTokens -= requestedTokens;
          fetchCondition.await();
          // If we get right here and the fetch succeeded, then we
          // are positive.
          return didFetchSucceed;
        }
      } else {
        doFetch = true;
        fetchInProgress = true;
        unreservedFetchTokens = batch - requestedTokens;
      }
    } lastly {
      lock.unlock();
    }

    if (doFetch) {
      boolean acquired = rateLimitService.purchase(key, batchSize);
      lock.lock();
      didFetchSucceed = acquired;
      if (acquired) {
        reservedTokens = unreservedFetchTokens;
        expirationTs = now() + timeWindowSecs;
      }
      fetchCondition.signalAll(); // Get up ready threads
      lock.unlock();
      return acquired;
    }

    // If we get right here, it means there weren't sufficient
    // unreservedFetchTokens. Let's simply make our personal
    // name reasonably than ready in line.
    return rateLimitService.purchase(key, tokensRequested);
  }
}

Lastly, we have arrived at a suitable resolution. In follow, the lock rivalry ought to be minimal as we’re solely setting a number of primitive values. However, as with something, it is best to benchmark this resolution on your use case and see if it is sensible.

Setting the batch dimension

One remaining query is easy methods to set batchSize. There’s a tradeoff right here: If batchSize is simply too low, the variety of requests to Redis will method the variety of requests to isNotRateLimited. If batchSize is simply too excessive, hosts will reserve too many tokens, ravenous out different hosts. One factor to contemplate is whether or not these hosts could be auto scaled. In that case, as soon as numHosts * batchSize exceeds the speed restrict, different hosts will begin getting starved out even when the variety of requests is beneath the speed restrict.

To handle a few of this, it might be fascinating to discover utilizing a dynamically set batch dimension. If this machine used up the complete final batch, possibly it could actually request 1.5x the batch subsequent time (with a cap after all). Alternatively, if batches are going to waste, maybe solely ask for half the batch subsequent time.

Outcomes

As an preliminary start line, we set the batchSize to be 1/1000 of the speed restrict for a given useful resource. For our workload, this resulted in ~4% of fee restrict requests going to Redis, a large enchancment. This may be seen within the chart beneath, the place the x-axis is time and the y-axis is % of requests hitting Redis:

how-we-improved-the-concurrency-and-scalability-of-our-redis-rate-limiting - figure1

Bettering our fee limiting at Rockset is an ongoing course of and this in all probability gained’t be the final enchancment we have to make on this space. Keep tuned for extra. And in the event you’re focused on fixing all these issues, we’re hiring!

A fast apart

As an apart, the next code has a really refined concurrency bug. Can you notice it?

class RedisRateLimiter {
  non-public remaining RRateLimiter rateLimitService = ...;
  non-public remaining lengthy batchSize = ...;
  non-public remaining lengthy timeWindowSecs = ...;
  non-public remaining Lock lock = new ReentrantLock();
  non-public remaining Situation fetchCondition = lock.newCondition();
  non-public boolean fetchInProgress = false;
  non-public lengthy reservedTokens = 0;
  non-public lengthy expirationTs = 0;
  // Variety of tokens that ready threads will deplete.
  non-public lengthy unreservedFetchTokens = 0;

  public boolean isNotRateLimited(String key, int requestedTokens) {
    // On this case, we'd as properly make a direct name to
    // simplify issues.
    if (requestedTokens > batchSize) {
      return rateLimitService.purchase(key, requestedTokens);
    }

    boolean doFetch = false;
    lock.lock();
    strive {
      if (reservedTokens >= requestedTokens) {
        reservedTokens -= requestedTokens;
        return true;
      } else if (expirationTimestamp <= now()) {
        requestedTokens -= reservedTokens;
        reservedTokens = 0;        
      }

      if (fetchInProgress) {
        if (unreservedFetchTokens >= requestedTokens) {
          // Reserve your spot in line
          unreservedFetchTokens -= requestedTokens;
          fetchCondition.await();
          if (reservedTokens >= requestedTokens) {
            reservedTokens -= requestedTokens;
            return true;
          }
          return false;
        }
      } else {
        doFetch = true;
        fetchInProgress = true;
        unreservedFetchTokens = batch - requestedTokens;
      }
    } lastly {
      lock.unlock();
    }

    if (doFetch) {
      boolean acquired = rateLimitService.purchase(key, batchSize);
      lock.lock();
      if (acquired) {
        reservedTokens = (batchSize - requestedTokens);
        expirationTs = now() + timeWindowSecs;
      }
      fetchCondition.signalAll(); // Get up ready threads
      lock.unlock();
      return acquired;
    }

    // If we get right here, it means there weren't sufficient
    // unreservedFetchTokens. Let's simply make our personal
    // name reasonably than ready in line.
    return rateLimitService.purchase(key, tokensRequested);
  }
}

Trace: Even when rateLimitService.purchase all the time returned true, you’ll be able to find yourself in conditions the place isNotRateLimited returns false.



RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments