Skip to content

Commit e286ded

Browse files
author
agarkov.pavel3
committed
README.md
1 parent fdffbea commit e286ded

File tree

2 files changed

+271
-6
lines changed

2 files changed

+271
-6
lines changed

README.md

Lines changed: 266 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,266 @@
1-
# rate-pool
1+
# rate-envelope-queue
2+
3+
Лёгкий пакет для управления пулом задач (**envelopes**) поверх `k8s.io/client-go/util/workqueue` с ограничением параллелизма, ретраями, периодическим планированием и хуками до/после выполнения.
4+
5+
> Основано на `workqueue` из client-go: очередь дедуплицирует одинаковые элементы (один и тот же **указатель**) и поддерживает rate-limiting / отложенное перепланирование.
6+
7+
---
8+
9+
## Возможности
10+
11+
- **Фиксированный пул воркеров**: настраиваемый параллелизм через `WithLimitOption`.
12+
- **Периодические и одноразовые задачи**: `Interval > 0` → периодические, `Interval == 0` → одноразовые.
13+
- **Дедлайны**: `Deadline > 0` ограничивает время выполнения `Invoke`.
14+
- **Хуки**: `BeforeHook`/`AfterHook` с отдельным тайм‑бюджетом.
15+
- **Сентинель для остановки типа**: верните `ErrStopEnvelope` из `BeforeHook`/`Invoke`/`AfterHook`, чтобы добавить `Type` в blacklist и прекратить дальнейшее планирование таких задач.
16+
- **Грациозная остановка**: `Drain` (дождаться завершения) или `Stop` (остановиться сразу).
17+
- **Динамическое добавление** задач во время работы.
18+
- **Явная валидация** входных данных и понятные ошибки.
19+
20+
---
21+
22+
## Требования
23+
24+
- Go **1.22+** (используется `for range <int>`).
25+
- Модули:
26+
- `k8s.io/client-go/util/workqueue`
27+
- `golang.org/x/time/rate`
28+
29+
---
30+
31+
## Установка
32+
33+
```bash
34+
go get github.com/PavelAgarkov/rate-pool/pkg
35+
```
36+
37+
Импорт:
38+
39+
```go
40+
import "github.com/PavelAgarkov/rate-pool/pkg"
41+
```
42+
43+
---
44+
45+
## Быстрый старт
46+
47+
```go
48+
package main
49+
50+
import (
51+
"context"
52+
"fmt"
53+
"time"
54+
55+
"github.com/PavelAgarkov/rate-pool/pkg"
56+
)
57+
58+
func main() {
59+
ctx, cancel := context.WithCancel(context.Background())
60+
defer cancel()
61+
62+
// Очередь: 3 воркера, дожидаемся drain при остановке
63+
q := pkg.NewRateEnvelopeQueue(
64+
pkg.WithLimitOption(3),
65+
pkg.WithWaitingOption(true),
66+
pkg.WithStopModeOption(pkg.Drain),
67+
// лимитер/конфиг можно не задавать — будет дефолт
68+
)
69+
70+
// Периодическая задача (каждые 3с) с дедлайном 1с
71+
metrics := &pkg.Envelope{
72+
Id: 2,
73+
Type: "metrics",
74+
Interval: 3 * time.Second,
75+
Deadline: 1 * time.Second,
76+
Invoke: func(ctx context.Context) error {
77+
fmt.Println("📊 metrics tick", time.Now())
78+
return nil
79+
},
80+
}
81+
82+
// Одноразовая задача (Interval == 0)
83+
oneShot := &pkg.Envelope{
84+
Id: 3,
85+
Type: "oneshot",
86+
Interval: 0,
87+
Deadline: 2 * time.Second,
88+
Invoke: func(ctx context.Context) error {
89+
fmt.Println("🔥 single run", time.Now())
90+
return nil
91+
},
92+
}
93+
94+
q.Start(ctx)
95+
if err := q.Add(metrics, oneShot); err != nil {
96+
panic(err)
97+
}
98+
99+
time.Sleep(10 * time.Second)
100+
q.Stop() // Drain: дождётся завершения текущих работ
101+
}
102+
```
103+
104+
---
105+
106+
## Поведение
107+
108+
| Сценарий | Действие очереди |
109+
|--------------------------------------------------|------------------------------------------------------------------------|
110+
| `Invoke` вернул `nil` | `Forget` + для периодических: `AddAfter(Interval)` |
111+
| Контекст истёк / отменён | `Forget` + для периодических: `AddAfter(Interval)` |
112+
| `Invoke` вернул ошибку (не `ErrStopEnvelope`) | Периодические: `AddRateLimited` (бэкофф); одноразовые: `Forget` |
113+
| Возврат `ErrStopEnvelope` (любой хук/Invoke) | `Forget` + поместить `Type` в **blacklist** |
114+
| Ошибка в `BeforeHook` (не `ErrStopEnvelope`) | Периодические: `AddRateLimited`; одноразовые: `Forget` |
115+
| Ошибка в `AfterHook` (не `ErrStopEnvelope`) | Только лог; если `ErrStopEnvelope` — тип в blacklist |
116+
117+
> `Deadline == 0`**без таймаута**. Для периодических задач рекомендуется `Deadline <= Interval` (валидация это проверяет).
118+
119+
---
120+
121+
## Статический vs динамический набор
122+
123+
- **Статический набор**: поддерживайте массив указателей на `Envelope` и добавляйте его один раз — периодические будут перепланироваться, пока вы не остановите очередь или не вернёте `ErrStopEnvelope`.
124+
- **Динамический набор**: добавляйте новые `Envelope` в любой момент через `Add(...)`.
125+
- **Важно**: дедупликация в `workqueue` работает по **компарабельности элемента**; для указателей — по адресу. Для корректной дедупликации **не меняйте указатель** на `Envelope` во время жизни элемента в очереди.
126+
127+
---
128+
129+
## API
130+
131+
### Типы
132+
133+
```go
134+
type StopMode string
135+
136+
const (
137+
Drain StopMode = "drain" // дождаться обработки (graceful)
138+
Stop StopMode = "stop" // остановиться сразу
139+
)
140+
141+
type Envelope struct {
142+
Id uint64
143+
Type string
144+
Interval time.Duration // 0 = одноразовая задача
145+
Deadline time.Duration // 0 = без таймаута
146+
147+
BeforeHook func(ctx context.Context, item *Envelope) error
148+
Invoke func(ctx context.Context) error
149+
AfterHook func(ctx context.Context, item *Envelope) error
150+
}
151+
152+
type QueuePool interface {
153+
Start(ctx context.Context)
154+
Add(envelopes ...*Envelope) error
155+
Stop()
156+
}
157+
```
158+
159+
### Создание очереди
160+
161+
```go
162+
q := pkg.NewRateEnvelopeQueue(
163+
pkg.WithLimitOption(n), // число воркеров (>0)
164+
pkg.WithWaitingOption(true|false), // ждать ли завершения воркеров в Stop()
165+
pkg.WithStopModeOption(pkg.Drain|pkg.Stop),
166+
pkg.WithLimiterOption(customLimiter), // опционально; если nil — дефолт
167+
pkg.WithWorkqueueConfigOption(conf), // клиентский конфиг очереди (например, Name для метрик)
168+
)
169+
```
170+
171+
**Дефолтный rate-limiter**:
172+
`MaxOf(ItemExponentialFailureRateLimiter(1s..30s), BucketRateLimiter(5 rps, burst=10))`.
173+
174+
### Ошибки
175+
176+
- `ErrStopEnvelope` — положить `Type` в blacklist (верните её из `BeforeHook`/`Invoke`/`AfterHook`).
177+
- `ErrEnvelopeInBlacklist` — попытка добавить envelope с типом из blacklist.
178+
- `ErrEnvelopeQueueIsNotRunning` — вызов `Add` до `Start`/после `Stop`.
179+
- `ErrAdditionEnvelopeToQueueBadFields``Type == ""`, `Invoke == nil`, `Interval < 0`, `Deadline < 0`.
180+
- `ErrAdditionEnvelopeToQueueBadIntervals` — для периодических `Deadline > Interval`.
181+
182+
---
183+
184+
## Пример из теста (упрощённый)
185+
186+
```go
187+
ctx, cancel := context.WithCancel(context.Background())
188+
defer cancel()
189+
190+
email := &pkg.Envelope{
191+
Id: 1,
192+
Type: "email",
193+
Interval: 5 * time.Second,
194+
Deadline: 3 * time.Second,
195+
Invoke: func(ctx context.Context) error {
196+
time.Sleep(5 * time.Second) // превысит дедлайн
197+
fmt.Println("📧 Email v1", time.Now())
198+
return nil
199+
},
200+
BeforeHook: func(ctx context.Context, e *pkg.Envelope) error {
201+
fmt.Println("hook before email", e.Id, time.Now())
202+
return nil
203+
},
204+
AfterHook: func(ctx context.Context, e *pkg.Envelope) error {
205+
fmt.Println("hook after email", e.Id, time.Now())
206+
// Остановим дальнейшие email
207+
return pkg.ErrStopEnvelope
208+
},
209+
}
210+
211+
metrics := &pkg.Envelope{
212+
Id: 2,
213+
Type: "metrics",
214+
Interval: 3 * time.Second,
215+
Deadline: 1 * time.Second,
216+
Invoke: func(ctx context.Context) error {
217+
fmt.Println("📊 Metrics", time.Now())
218+
return nil
219+
},
220+
}
221+
222+
food := &pkg.Envelope{
223+
Id: 3,
224+
Type: "food",
225+
Interval: 2 * time.Second,
226+
Deadline: 1 * time.Second,
227+
Invoke: func(ctx context.Context) error {
228+
fmt.Println("🍔 Fooding", time.Now())
229+
return nil
230+
},
231+
}
232+
233+
q := pkg.NewRateEnvelopeQueue(
234+
pkg.WithLimitOption(3),
235+
pkg.WithWaitingOption(true),
236+
pkg.WithStopModeOption(pkg.Drain),
237+
)
238+
239+
q.Start(ctx)
240+
_ = q.Add(email, metrics, food, email) // второй add(email) будет дедуплицирован
241+
242+
// по таймеру завершить приложение
243+
time.AfterFunc(25*time.Second, cancel)
244+
<-ctx.Done()
245+
246+
q.Stop()
247+
fmt.Println("queue: done")
248+
```
249+
250+
---
251+
252+
## Замечания по эксплуатации
253+
254+
- Чтобы одинаковые периодические задачи не «стреляли строем», можно добавить **jitter** к `AddAfter` (±5–10%).
255+
- В `workqueue` есть имя очереди (через конфиг), это удобно для экспонирования **метрик**.
256+
- Очередь можно остановить в любой момент (`Stop()`); для повторного использования создайте **новый объект** очереди.
257+
258+
---
259+
260+
## Лицензия
261+
262+
MIT
263+
264+
---
265+
266+
**Префикс логов:** `"[rate-envelope-queue]"`.

pkg/rate_envelope_queue.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,24 +256,24 @@ func (rateQueue *RateEnvelopeQueue) checkInBlacklist(t string) bool {
256256
return exists
257257
}
258258

259-
func (rateQueue *RateEnvelopeQueue) Add(envelops ...*Envelope) error {
260-
if err := rateQueue.validateAdd(envelops...); err != nil {
259+
func (rateQueue *RateEnvelopeQueue) Add(envelopes ...*Envelope) error {
260+
if err := rateQueue.validateAdd(envelopes...); err != nil {
261261
return err
262262
}
263263

264-
for _, envelope := range envelops {
264+
for _, envelope := range envelopes {
265265
rateQueue.queue.Add(envelope)
266266
}
267267

268268
return nil
269269
}
270270

271-
func (rateQueue *RateEnvelopeQueue) validateAdd(envelops ...*Envelope) error {
271+
func (rateQueue *RateEnvelopeQueue) validateAdd(envelopes ...*Envelope) error {
272272
if !rateQueue.run.Load() || rateQueue.queue == nil {
273273
return ErrEnvelopeQueueIsNotRunning
274274
}
275275

276-
for _, envelope := range envelops {
276+
for _, envelope := range envelopes {
277277
if envelope.Type == "" || envelope.Invoke == nil || envelope.Interval < 0 || envelope.Deadline < 0 {
278278
return ErrAdditionEnvelopeToQueueBadFields
279279
}

0 commit comments

Comments
 (0)