當前位置:首頁>職場>程序員面試會問底層原理嗎(你說說限流的原理)
發布時間:2024-01-24閱讀(18)
限流作為現在微服務中常見的穩定性措施,在面試中肯定也是經常會被問到的,我在面試的時候也經常喜歡問一下你對限流算法知道哪一些?有看過源碼嗎?實現原理是什么?
第一部分先講講限流算法,最后再講講源碼的實現原理。
限流算法關于限流的算法大體上可以分為四類:固定窗口計數器、滑動窗口計數器、漏桶(也有稱漏斗,英文Leaky bucket)、令牌桶(英文token bucket)。
固定窗口固定窗口,相比其他的限流算法,這應該是最簡單的一種。
它簡單地對一個固定的時間窗口內的請求數量進行計數,如果超過請求數量的閾值,將被直接丟棄。
這個簡單的限流算法優缺點都很明顯。優點的話就是簡單,缺點舉個例子來說。
比如我們下圖中的黃色區域就是固定時間窗口,默認時間范圍是60s,限流數量是100。
如圖中括號內所示,前面一段時間都沒有流量,剛好后面30秒內來了100個請求,此時因為沒有超過限流閾值,所以請求全部通過,然后下一個窗口的20秒內同樣通過了100個請求。
所以變相的相當于在這個括號的40秒的時間內就通過了200個請求,超過了我們限流的閾值。

為了優化這個問題,于是有了滑動窗口算法,顧名思義,滑動窗口就是時間窗口在隨著時間推移不停地移動。
滑動窗口把一個固定時間窗口再繼續拆分成N個小窗口,然后對每個小窗口分別進行計數,所有小窗口請求之和不能超過我們設定的限流閾值。
以下圖舉例子來說,假設我們的窗口拆分成了3個小窗口,小窗口都是20s,同樣基于上面的例子,當在第三個20s的時候來了100個請求,可以通過。
然后時間窗口滑動,下一個20s請求又來了100個請求,此時我們滑動窗口的60s范圍內請求數量肯定就超過100了啊,所以請求被拒絕。

漏桶算法,人如其名,他就是一個漏的桶,不管請求的數量有多少,最終都會以固定的出口流量大小勻速流出,如果請求的流量超過漏桶大小,那么超出的流量將會被丟棄。
也就是說流量流入的速度是不定的,但是流出的速度是恒定的。
這個和MQ削峰填谷的思想比較類似,在面對突然激增的流量的時候,通過漏桶算法可以做到勻速排隊,固定速度限流。
漏桶算法的優勢是勻速,勻速是優點也是缺點,很多人說漏桶不能處理突增流量,這個說法并不準確。
漏桶本來就應該是為了處理間歇性的突增流量,流量一下起來了,然后系統處理不過來,可以在空閑的時候去處理,防止了突增流量導致系統崩潰,保護了系統的穩定性。
但是,換一個思路來想,其實這些突增的流量對于系統來說完全沒有壓力,你還在慢慢地勻速排隊,其實是對系統性能的浪費。
所以,對于這種有場景來說,令牌桶算法比漏桶就更有優勢。
令牌桶token bucket令牌桶算法是指系統以一定地速度往令牌桶里丟令牌,當一個請求過來的時候,會去令牌桶里申請一個令牌,如果能夠獲取到令牌,那么請求就可以正常進行,反之被丟棄。
現在的令牌桶算法,像Guava和Sentinel的實現都有冷啟動/預熱的方式,為了避免在流量激增的同時把系統打掛,令牌桶算法會在最開始一段時間內冷啟動,隨著流量的增加,系統會根據流量大小動態地調整生成令牌的速度,最終直到請求達到系統的閾值。
源碼舉例我們以sentinel舉例,sentinel中統計用到了滑動窗口算法,然后也有用到漏桶、令牌桶算法。
滑動窗口sentinel中就使用到了滑動窗口算法來進行統計,不過他的實現和我上面畫的圖有點不一樣,實際上sentinel中的滑動窗口用一個圓形來描述更合理一點。
前期就是創建節點,然后slot串起來就是一個責任鏈模式,StatisticSlot通過滑動窗口來統計數據,FlowSlot是真正限流的邏輯,還有一些降級、系統保護的措施,最終形成了整個sentinel的限流方式。

滑動窗口的實現主要可以看LeapArray的代碼,默認的話定義了時間窗口的相關參數。
對于sentinel來說其實窗口分為秒和分鐘兩個級別,秒的話窗口數量是2,分鐘則是60個窗口,每個窗口的時間長度是1s,總的時間周期就是60s,分成60個窗口,這里我們就以分鐘級別的統計來說。
public abstract class LeapArray<T> { //窗口時間長度,毫秒數,默認1000ms protected int windowLengthInMs; //窗口數量,默認60 protected int sampleCount; //毫秒時間周期,默認60*1000 protected int intervalInMs; //秒級時間周期,默認60 private double intervalInSecond; //時間窗口數組 protected final AtomicReferenceArray<WindowWrap<T>> array;
然后我們要看的就是它是怎么計算出當前窗口的,其實源碼里寫的聽清楚的,但是如果你按照之前想象把他當做一條直線延伸去想的話估計不太好理解。
首先計算數組索引下標和時間窗口時間這個都比較簡單,難點應該大部分在于第三點窗口大于old這個是什么鬼,詳細說下這幾種情況。
從這個我們可以發現就是針對每個WindowWrap時間窗口都進行了統計,最后實際上在后面的幾個地方都會用到時間窗口統計的QPS結果,這里就不再贅述了,知道即可。
private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int) (timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs;}public WindowWrap<T> currentWindow(long timeMillis) { //當前時間如果小于0,返回空 if (timeMillis < 0) { return null; } //計算時間窗口的索引 int idx = calculateTimeIdx(timeMillis); // 計算當前時間窗口的開始時間 long windowStart = calculateWindowStart(timeMillis); while (true) { //在窗口數組中獲得窗口 WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * 比如當前時間是888,根據計算得到的數組窗口位置是個空,所以直接創建一個新窗口就好了 */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * 這個更好了,剛好等于,直接返回就行 */ return old; } else if (windowStart > old.windowStart()) { /* * B0 B1 B2 B3 B4 * |_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * 這個要當成圓形理解就好了,之前如果是1200一個完整的圓形,然后繼續從1200開始,如果現在時間是1676,落在在B2的位置, * 窗口開始時間是1600,獲取到的old時間其實會是600,所以肯定是過期了,直接重置窗口就可以了 */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { // 這個不太可能出現,嗯。。時鐘回撥 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } }}
sentinel主要根據FlowSlot中的流控進行流量控制,其中RateLimiterController就是漏桶算法的實現,這個實現相比其他幾個還是簡單多了,稍微看一下應該就明白了。
public class RateLimiterController implements TrafficShapingController { //最大等待超時時間,默認500ms private final int maxQueueingTimeMs; //限流數量 private final double count; //上一次的通過時間 private final AtomicLong latestPassedTime = new AtomicLong(-1); @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); //時間平攤到1s內的花費 long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms //計算這一次請求預計的時間 long expectedTime = costTime latestPassedTime.get(); //花費時間小于當前時間,pass,最后通過時間 = 當前時間 if (expectedTime <= currentTime) { latestPassedTime.set(currentTime); return true; } else { //預計通過的時間超過當前時間,要進行排隊等待,重新獲取一下,避免出現問題,差額就是需要等待的時間 long waitTime = costTime latestPassedTime.get() - TimeUtil.currentTimeMillis(); //等待時間超過最大等待時間,丟棄 if (waitTime > maxQueueingTimeMs) { return false; } else { //反之,可以更新最后一次通過時間了 long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); //更新后再判斷,還是超過最大超時時間,那么就丟棄,時間重置 if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } //在時間范圍之內的話,就等待 if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; }}
最后是令牌桶,這個不在于實現的復制,而是你看源碼會發現都算的些啥玩意兒。。。sentinel的令牌桶實現基于Guava,代碼在WarmUpController中。
這個算法那些各種計算邏輯其實我們可以不管(因為我也沒看懂。。),但是流程上我們是清晰的就可以了。
幾個核心的參數看注釋,構造方法里那些計算邏輯暫時不管他是怎么算的(我也沒整明白,但是不影響我們理解),關鍵看canPass是怎么做的。
public class WarmUpController implements TrafficShapingController { //限流QPS protected double count; //冷啟動系數,默認=3 private int coldFactor; //警戒的令牌數 protected int warningToken = 0; //最大令牌數 private int maxToken; //斜率,產生令牌的速度 protected double slope; //存儲的令牌數量 protected AtomicLong storedTokens = new AtomicLong(0); //最后一次填充令牌時間 protected AtomicLong lastFilledTime = new AtomicLong(0); public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } public WarmUpController(double count, int warmUpPeriodInSec) { construct(count, warmUpPeriodInSec, 3); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; //stableInterval 穩定產生令牌的時間周期,1/QPS //warmUpPeriodInSec 預熱/冷啟動時間 ,默認 10s warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1); maxToken = warningToken (int)(2 * warmUpPeriodInSec * count / (1.0 coldFactor)); //斜率的計算參考Guava,當做一個固定改的公式 slope = (coldFactor - 1.0) / count / (maxToken - warningToken); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { //當前時間窗口通過的QPS long passQps = (long) node.passQps(); //上一個時間窗口QPS long previousQps = (long) node.previousPassQps(); //填充令牌 syncToken(previousQps); // 開始計算它的斜率 // 如果進入了警戒線,開始調整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { //當前的令牌超過警戒線,獲得超過警戒線的令牌數 long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope 1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope 1.0 / count)); if (passQps acquireCount <= warningQps) { return true; } } else { if (passQps acquireCount <= count) { return true; } } return false; }}
填充令牌的邏輯如下:
protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); //去掉當前時間的毫秒 currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); //控制每秒填充一次令牌 if (currentTime <= oldLastFillTime) { return; } //當前的令牌數量 long oldValue = storedTokens.get(); //獲取新的令牌數量,包含添加令牌的邏輯,這就是預熱的邏輯 long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { //存儲的令牌數量當然要減去上一次消耗的令牌 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); }}
private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; //水位低于警戒線,就生成令牌 if (oldValue < warningToken) { //如果桶中令牌低于警戒線,根據上一次的時間差,得到新的令牌數,因為去掉了毫秒,1秒生成的令牌就是閾值count //第一次都是0的話,會生成count數量的令牌 newValue = (long)(oldValue (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { //反之,如果是高于警戒線,要判斷QPS。因為QPS越高,生成令牌就要越慢,QPS低的話生成令牌要越快 if (passQps < (int)count / coldFactor) { newValue = (long)(oldValue (currentTime - lastFilledTime.get()) * count / 1000); } } //不要超過最大令牌數 return Math.min(newValue, maxToken);}
上面的邏輯理順之后,我們就可以繼續看限流的部分邏輯:
long restToken = storedTokens.get(); if (restToken >= warningToken) { //當前的令牌超過警戒線,獲得超過警戒線的令牌數 long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope 1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope 1.0 / count)); if (passQps acquireCount <= warningQps) { return true; } } else { if (passQps acquireCount <= count) { return true; } }
所以,按照低QPS到突增高QPS的流程,來想象一下這個過程:
這以上的部分一直處于警戒線之上,實際上就是叫做冷啟動/預熱的過程。
那這樣一來,實際就達到了我們處理突增流量的目的,整個系統在漫漫地適應突然飆高的QPS,然后最終達到系統的QPS閾值。

因為算法如果單獨說的話都比較簡單,一說大家都可以聽明白,不需要幾個字就能說明白,所以還是得弄點源碼看看別人是怎么玩的,所以盡管我很討厭放源碼,但是還是不得不干。
光靠別人說一點其實有點看不明白,按照順序讀一遍的話心里就有數了。
那源碼的話最難以理解的就是令牌桶的實現了,說實話那幾個計算的邏輯我看了好幾遍不知道他算的什么鬼,但是思想我們理解就行了,其他的邏輯相對來說就比較容易理解。
歡迎分享轉載→http://www.avcorse.com/read-253297.html
Copyright ? 2024 有趣生活 All Rights Reserve吉ICP備19000289號-5 TXT地圖HTML地圖XML地圖