gopass/internal/queue/background.go
niacdoial cdda40db36
Improve fsck handling and output (#2492)
* Fsck: Improved message handling and decreased commit spam.

* Merge upstream changes with local changes (part 2: manual fixes)

* pgp keyring: Do not import a pgp public key into the user's private keyring if the key there is identical to the one in the store's keyring

* fsck.go: made the code more go-idiomatic

* more changes to make code more go-idiomatic

* Fsck: fixed misleading messages caused by previous refactor

(also clarified the roles of the values in ErrorSeverity)

* Fsck: even smoother git use (pubkey updates now in the same git commit as the rest of fsck's changes)

Also removed dupeicate check of public keys, and added more tests around commit messages

* Ctxutil: Pruning unused functions, more go idiomaticity (and some tweaks regarding errors)

* Formatted files with gofmt

* fixed misc. error management

* More fixes and formatting

(plus one fixed text for the `link` action)

* unblock CI (attempt 1)

* fix problems discovered by CI

Signed-off-by: Dominik Schulz <dominik.schulz@gauner.org>
Co-authored-by: Dominik Schulz <dominik.schulz@gauner.org>
2023-01-01 14:52:55 +01:00

152 lines
3.2 KiB
Go

// Package queue implements an experimental background queue for cleanup jobs.
// Beware: It's likely broken.
// We can easily close a channel which might later be written to.
// The current locking is but a poor workaround.
// A better implementation would create a queue object in main, pass
// it through and wait for the channel to be empty before leaving main.
// Will do that later.
package queue
import (
"context"
"fmt"
"time"
"github.com/gopasspw/gopass/internal/out"
"github.com/gopasspw/gopass/pkg/debug"
)
type contextKey int
const (
ctxKeyQueue contextKey = iota
)
// Queuer is a queue interface.
type Queuer interface {
Add(Task) Task
Close(context.Context) error
Idle(time.Duration) error
}
// WithQueue adds the given queue to the context. Add a nil
// queue to disable queuing in this context.
func WithQueue(ctx context.Context, q *Queue) context.Context {
return context.WithValue(ctx, ctxKeyQueue, q)
}
// GetQueue returns an existing queue from the context or
// returns a noop one.
func GetQueue(ctx context.Context) Queuer {
if q, ok := ctx.Value(ctxKeyQueue).(*Queue); ok && q != nil {
return q
}
return &noop{}
}
type noop struct{}
// Add always returns the task.
func (n *noop) Add(t Task) Task {
return t
}
// Close always returns nil.
func (n *noop) Close(_ context.Context) error {
return nil
}
// Idle always returns nil.
func (n *noop) Idle(_ time.Duration) error {
return nil
}
// Task is a background task.
type Task func(ctx context.Context) (context.Context, error)
// Queue is a serialized background processing unit.
type Queue struct {
work chan Task
done chan struct{}
}
// New creates a new queue.
func New(ctx context.Context) *Queue {
q := &Queue{
work: make(chan Task, 1024),
done: make(chan struct{}, 1),
}
go q.run(ctx)
return q
}
func (q *Queue) run(ctx context.Context) {
for t := range q.work {
ctx2, err := t(ctx)
if err != nil {
out.Errorf(ctx, "Task failed: %s", err)
}
if ctx2 != nil {
// if a task returns a context, it is to transmit information to the next tasks in line
// so replace the in-queue context with the new one
// (each task has access to two contexts: one from the queue, and one from the function creating the task)
ctx = ctx2
}
debug.Log("Task done")
}
debug.Log("all tasks done")
q.done <- struct{}{}
}
// Add enqueues a new task.
func (q *Queue) Add(t Task) Task {
q.work <- t
debug.Log("enqueued task")
return func(ctx2 context.Context) (context.Context, error) {
return ctx2, nil
}
}
// Idle returns nil the next time the queue is empty.
func (q *Queue) Idle(maxWait time.Duration) error {
done := make(chan struct{})
go func() {
for {
if len(q.work) < 1 {
select {
case done <- struct{}{}:
// sent
default:
// no-op
}
}
time.Sleep(20 * time.Millisecond)
}
}()
select {
case <-done:
return nil
case <-time.After(maxWait):
return fmt.Errorf("timed out waiting for empty queue")
}
}
// Close waits for all tasks to be processed. Must only be called once on
// shutdown.
func (q *Queue) Close(ctx context.Context) error {
close(q.work)
select {
case <-q.done:
return nil
case <-ctx.Done():
debug.Log("context canceled")
return ctx.Err()
}
}