1変数で管理できるレート計測用カウンター
1. はじめに:カウンターとレートの違い
Webアプリケーションやシステムの運用では、リクエスト数やエラー数、ジョブの処理件数などを計測するために、単純なカウンターがよく使われます。例えば、total_requests += 1 のように加算していくだけで、累積値を簡単に把握できます。
しかし、実際の運用では「累積値」だけでなく、リアルタイムな「変化量」が重要になるケースが少なくありません。たとえば、累積エラー数が1,000件というアラートだけでは、深刻度を判断できません。1週間かけて徐々に増えた1,000件なら大きな問題ではないかもしれませんが、直近の10秒間で突然1,000件発生した場合、それは致命的な障害の可能性が高いでしょう。
この記事では、後者の「変化量(レート)」を軽量に計測するための手法について解説します。通常は、指数減衰の計算は2変数で表現することが多いですが、提案手法では1つの変数だけで実現する方法について説明します。
2. 従来の実装方法
「変化量(レート)」を測るための代表的な実装方法には、主に2つのアプローチがあります。
2.1 バケットを使った方法
直感的な方法として、一定時間ごとにバケットを用意し、その中での件数を数える方法があります。例えば、「直近1分間のリクエスト数」を知りたい場合、1秒ごとのバケットを60個保持します。
[12, 9, 10, 13, ...] // 直近60秒分のバケットのイメージ
この方法は分かりやすいですが、複数の状態を保持する必要があるため、計測対象の時間幅を広げたり、システム全体のメトリクス数が増えたりすると、メモリや処理のオーバーヘッドが大きくなります。
2.2 指数減衰を使った方法
もう一つの有力な方法は、「古いイベントほど重みを軽くする」指数減衰の考え方です。これは、EWMA(指数加重移動平均)や指数移動平均などと呼ばれる手法で、以下の計算式で現在値を求めます。
\[
\operatorname{value}(\mathrm{now}) =
\operatorname{value}(\mathrm{last}) \times
\exp\left(-\frac{\mathrm{now} - \mathrm{last}}{\mathrm{duration}}\right)
\]
value(last): 最後に更新した時点での値
last: 最後に更新した時刻
now: 現在時刻
duration: 減衰の時間スケール(時定数・実効的な窓幅)
この計算式をそのままプログラムに実装する場合、「最後に更新した時刻(last_updated)」と「その時点での値(value_at_last_updated)」の2つの変数が必要になります。
3. 1変数で指数減衰を管理できる提案手法
3.1 概要
通常、指数減衰カウンターを実装するには、上記の2つの状態を保持する必要がありますが、提案手法では「時刻風のパラメータ(pointer_time)」という1つの変数だけで管理します。
この手法のポイントは、現在値そのものを保存せず、現在時刻と減衰の時間スケールから現在値を復元できるパラメータ(pointer_time)を保存する点です。
\[
\operatorname{value}(\mathrm{now}) =
\exp\left(\frac{\mathrm{pointer\_time} - \mathrm{now}}{\mathrm{duration}}\right)
\]
ここで注意するのは、pointer_time は実際のイベント発生時刻ではなく、指数減衰曲線の位置を表現するためのパラメータだということです。
pointer_time が現在時刻より未来にある: 値が大きい
pointer_time が現在時刻に近い: 値が1に近い
pointer_time が現在時刻より過去にある: 値が減衰している
つまり、値を単なる「点」として記憶せず、「指数曲線の位置」として記憶することで、必要な変数を1つに削減しています。
3.2 カウントアップの方法
では、この1変数カウンターでどのように値を加算するのでしょうか。ある時刻 \(t\) で値を \(x\) 増やす場合、以下の3ステップで処理します。
- Decode(復元): 現在時刻 \(t\) における減衰後の値を計算
- Increment(加算): 復元した値に増分 \(x\) を足す
- Encode(更新): 新しい値を元に、次回の計算に使う
pointer_time を逆算して保存
Decode では、保存済みの pointer_time から現在値を復元します。
\[
\mathrm{old\_value} =
\exp\left(\frac{\mathrm{pointer\_time} - t}{\mathrm{duration}}\right)
\]
\[
\mathrm{new\_value} = \mathrm{old\_value} + x
\]
最後に、新しい値を表す pointer_time を逆算します。
\[
\mathrm{pointer\_time}_{\mathrm{new}} =
t + \mathrm{duration} \times \log(\mathrm{new\_value})
\]
最終的に保存するのは、更新後の pointer_time だけです。
3.3 なぜ更新時刻が不要になるのか?
一見不思議に思えますが、数式を展開するとその理由が分かります。一般的な指数減衰の式は次の通りです。
\[
\operatorname{value}(\mathrm{now}) =
\operatorname{value}(\mathrm{last}) \times
\exp\left(-\frac{\mathrm{now} - \mathrm{last}}{\mathrm{duration}}\right)
\]
ここで、\(\operatorname{value}(\mathrm{last})\) を提案手法の表現 \(\exp((\mathrm{pointer\_time} - \mathrm{last}) / \mathrm{duration})\) に置き換えると、次のようになります。
\[
\begin{aligned}
\operatorname{value}(\mathrm{now})
&= \exp\left(\frac{\mathrm{pointer\_time} - \mathrm{last}}{\mathrm{duration}}\right)
\times
\exp\left(-\frac{\mathrm{now} - \mathrm{last}}{\mathrm{duration}}\right) \\
&= \exp\left(\frac{\mathrm{pointer\_time} - \mathrm{now}}{\mathrm{duration}}\right)
\end{aligned}
\]
このように、last(最後に更新した時刻)が相殺されて消えてしまいます。つまり、「最後に更新した時刻」と「その時刻での値」という2つの情報は、pointer_time という1つの変数に集約できるのです。
3.4 duration と半減期の関係
この記事で使っている duration は、半減期ではなく、指数減衰カーネルの積分値、つまり実効的な窓幅として定義しています。経過時間 \(a\) に対する重みを \(w(a) = \exp(-a / \mathrm{duration})\) とすると、重みの総面積は以下のように duration そのものになります。
\[
\int_0^\infty \exp\left(-\frac{a}{\mathrm{duration}}\right) da
= \mathrm{duration}
\]
そのため、毎秒 \(r\) 回のイベントが一定に来る場合、指数減衰カウンターの定常値は \(r \times \mathrm{duration}\) になります。つまり、duration = 1 なら毎秒、duration = 60 なら毎分、duration = 3600 なら毎時のレートとして読みやすい値になります。
一方、半減期 \(h\) は、重みが半分になるまでの時間として定義されます。つまり、以下を満たす \(h\) です。
\[
\exp\left(-\frac{h}{\mathrm{duration}}\right) = \frac{1}{2}
\]
\[
h = \mathrm{duration} \times \ln 2
\]
したがって、duration = 60 のカウンターは「半減期60秒」ではなく、半減期が約41.6秒のカウンターです。ここではレート表示を分かりやすくするため、半減期ではなく実効窓幅としての duration を使っています。
4. 実装方法:固定小数点の時刻として保存する
この手法の実装上の要点は、pointer_time をそのまま浮動小数点数として共有するのではなく、整数に変換して AtomicI64 に保存することです。状態が1変数で済むため、読み出しと更新を1つのCAS(Compare-And-Swap)で完結できます。
4.1 保存形式と倍率
保存する値は、秒単位の pointer_time に一定の倍率を掛けた固定小数点整数にします。
\[
\mathrm{raw} = \operatorname{round}(\mathrm{pointer\_time} \times \mathrm{scale})
\]
例えば scale = 1_000_000 なら、保存単位は1マイクロ秒です。i64 の最大値は約 \(9.22 \times 10^{18}\) なので、この倍率でも表現できる時間幅は約29万年あります。monotonic clock の起点からの経過秒を保存する用途では、オーバーフローは通常問題になりません。
一方で、倍率を小さくしすぎると、高レート時に1イベント分の更新が丸めで消えやすくなります。値を \(v\)、増分を1、時間スケールを \(\tau\) とすると、1回の加算で pointer_time が動く量はおおよそ \(\tau / v\) です。定常状態では \(v \simeq \mathrm{rate} \times \tau\) なので、整数値の変化量はおおよそ次のようになります。
\[
\mathrm{raw\_delta} \simeq \frac{\mathrm{scale}}{\mathrm{rate}}
\]
つまり、scale = 10_000 は1カウンターあたり毎秒数千イベント程度までなら実用的ですが、毎秒1万イベントを超えるような用途では量子化の影響が見え始めます。汎用的な実装では scale = 1_000_000 を既定値にしておくと、分解能とオーバーフロー余裕のバランスがよくなります。
4.2 Atomic変数での管理
保存値が1つの整数に収まるため、カウンター本体は AtomicI64 だけで表現できます。duration や倍率はカウンターの設定値であり、更新のたびにアトミックに書き換える状態ではありません。
use std::sync::atomic::{AtomicI64, Ordering};
const SCALE: f64 = 1_000_000.0;
pub struct RateCounter {
pointer_time: AtomicI64,
duration_secs: f64,
}
impl RateCounter {
pub fn new(duration_secs: f64) -> Self {
assert!(duration_secs.is_finite() && duration_secs > 0.0);
Self {
pointer_time: AtomicI64::new(i64::MIN),
duration_secs,
}
}
}
初期値には i64::MIN を入れておきます。これは scale = 1_000_000 なら約29万年前の pointer_time に相当するため、現在値へデコードすると実質的に0になります。空状態のための分岐を別に持たず、通常の減衰計算だけで扱えるのが利点です。
4.3 デコード、加算、再エンコード
読み出しは、保存された整数を秒単位の pointer_time に戻し、現在時刻との差から値を復元します。初期値の i64::MIN も単に非常に古い時刻として扱われるため、特別な分岐は不要です。
fn decode_value(raw: i64, now_secs: f64, duration_secs: f64) -> f64 {
let pointer_time = raw as f64 / SCALE;
((pointer_time - now_secs) / duration_secs).exp()
}
fn encode_time(pointer_time: f64) -> i64 {
(pointer_time * SCALE)
.round()
.clamp(i64::MIN as f64, i64::MAX as f64) as i64
}
加算では、現在の保存値を読み、現在値に戻し、増分を足してから、再び pointer_time に変換します。このとき他スレッドが先に更新していた場合はCASが失敗するので、新しい値を使って同じ計算をやり直します。
impl RateCounter {
pub fn value_at(&self, now_secs: f64) -> f64 {
let raw = self.pointer_time.load(Ordering::Relaxed);
decode_value(raw, now_secs, self.duration_secs)
}
pub fn increment_at(&self, now_secs: f64, x: f64) {
assert!(now_secs.is_finite());
assert!(x.is_finite() && x > 0.0);
let mut raw = self.pointer_time.load(Ordering::Relaxed);
loop {
let old_value = decode_value(raw, now_secs, self.duration_secs);
let new_value = old_value + x;
let new_pointer_time = now_secs + self.duration_secs * new_value.ln();
let new_raw = encode_time(new_pointer_time);
match self.pointer_time.compare_exchange_weak(
raw,
new_raw,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(actual) => raw = actual,
}
}
}
}
このカウンターは他のメモリ状態との順序関係を表すためのものではなく、単独の近似値を更新するためのものなので、通常は Ordering::Relaxed で十分です。イベント処理本体との happens-before 関係をカウンター値で表したい場合だけ、より強い順序を検討します。
この実装は、duration_secs が正の有限値で、now_secs と増分 x も有限値であることを前提にしています。その条件を満たす限り、初期値の i64::MIN は非常に古い pointer_time として自然に扱われ、最初の加算で通常の値へ戻ります。
4.4 実装上の注意点
時刻には、システム時刻ではなく単調増加する monotonic clock を使います。システム時刻はNTP補正や手動変更で戻ることがあり、指数減衰の計算では値が不自然に増減する原因になります。Rustなら、プロセス開始時の Instant からの経過秒を使うのが扱いやすいです。
use std::time::Instant;
let origin = Instant::now();
let now_secs = origin.elapsed().as_secs_f64();
また、非常に長い時間更新されていないカウンターでは、デコード時の exp が0にアンダーフローすることがあります。この場合は実質的に値が0まで減衰したと見なせるため、多くの用途では問題になりません。初期値の i64::MIN もこの性質を利用しています。
まとめると、実装の既定値としては AtomicI64 に pointer_time * 1_000_000 を保存し、初期値を i64::MIN にして、CASループでデコード・加算・再エンコードする構成が扱いやすいです。
5. 数理的な背景
この手法は、指数減衰集計(exponentially decayed aggregate)の特殊なケースとして説明できます。
時刻 \(t_i\) に重み \(w_i\) のイベントが発生したときの指数減衰カウント \(D(t)\) は、次のように定義できます。
\[
D(t) = \sum_i w_i \times \exp\left(-\frac{t - t_i}{\tau}\right)
\]
\[
D(t) = \exp\left(-\frac{t}{\tau}\right) \times \sum_i w_i \times \exp\left(\frac{t_i}{\tau}\right)
\]
つまり、読み出し時刻 \(t\) に依存する部分はシグマの外に出せるため、システムが保持すべき状態 \(S\) は以下の通りです。
\[
S = \sum_i w_i \times \exp\left(\frac{t_i}{\tau}\right)
\]
この \(S\) を対数領域に変換すると、次のようになります。
\[
p = \tau \times \log(S)
\]
これが提案手法の pointer_time に相当します。また、インクリメント操作は log-sum-exp の計算と等価です。
\[
p_{\mathrm{new}} =
\tau \times \log\left(
\exp\left(\frac{p_{\mathrm{old}}}{\tau}\right)
+ x \times \exp\left(\frac{t}{\tau}\right)
\right)
\]
実際の実装では、一度デコードして加算し、再度エンコードすることで効率的に処理しています。
6. まとめ
システム運用では、「変化量(レート)」を把握することも重要です。通常の指数減衰カウンターでは2つの状態を保持する必要がありますが、提案手法では1つの時刻風パラメータ(pointer_time)だけで実現できます。
- 現在値は \(\exp((\mathrm{pointer\_time} - \mathrm{now}) / \mathrm{duration})\) で復元可能
- 加算は保存値をデコード、加算、再エンコードの3ステップで完了
AtomicI64 のようなアトミック変数で管理可能
厳密な期間の件数計測には向きませんが、負荷やエラーの勢いといった「現在の傾向」を把握するのに適した軽量な手法です。
7. 参考文献
本稿の指数減衰集計、スライディングウィンドウとの対比、EWMAによるレート計測、および log-sum-exp の数値計算は、以下の文献および実装標準を参考にしています。
- Cohen, E., & Strauss, M. J. (2006). "Maintaining Time-Decaying Stream Aggregates". Journal of Algorithms, 59, 19-36. 時間減衰するストリーム集計を一般的に扱うための基礎文献です。
- Cormode, G., Korn, F., & Tirthapura, S. (2008). "Exponentially Decayed Aggregates on Data Streams". ICDE 2008. 指数減衰集計をデータストリーム上で扱う手法を整理した文献で、本稿の数理的背景に近い内容です。
- Cormode, G., Shkapenyuk, V., Srivastava, D., & Xu, B. (2009). "Forward Decay: A Practical Time Decay Model for Streaming Systems". ICDE 2009. 固定された時点から前向きに減衰を表現する考え方を導入しており、実装しやすい時間減衰モデルの背景として参考になります。
- Datar, M., Gionis, A., Indyk, P., & Motwani, R. (2002). "Maintaining Stream Statistics over Sliding Windows". SIAM Journal on Computing. スライディングウィンドウ型のストリーム統計を扱う古典的な文献で、バケットや窓幅ベースの方法との対比に関係します。
- NIST/SEMATECH Engineering Statistics Handbook, "EWMA Control Charts". EWMAの基本的な考え方と、過去の観測値に指数的に小さくなる重みを与える監視手法の実務的な背景を説明しています。
- Dropwizard Metrics documentation, "Meters". 実務的なメトリクスライブラリにおけるレート計測の例で、平均レートに加えて1分、5分、15分の移動平均を扱う設計の参考になります。
- SciPy documentation,
scipy.special.logsumexp. log-sum-exp を数値的に安定して計算するための標準的な関数で、本稿の対数領域での加算の理解に役立ちます。