func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, error) {
w := &worker{
kind: params.Kind,
typ: params.Type,
os: params.OS,
arch: params.Arch,
kernel: params.Kernel,
variant: params.Variant,
labels: params.Labels,
channel: make(chan *core.Stage),
}
q.Lock()
q.workers[w] = struct{}{}
q.Unlock()
select {
case q.ready <- struct{}{}:
default:
}
select {
case <-ctx.Done():
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, ctx.Err()
case b := <-w.channel:
return b, nil
}
}
func (q *queue) signal(ctx context.Context) error {
if err := q.globMx.LockContext(ctx); err != nil {
return err
}
defer q.globMx.UnlockContext(ctx)
q.Lock()
count := len(q.workers)
pause := q.paused
q.Unlock()
if pause {
return nil
}
if count == 0 {
return nil
}
items, err := q.store.ListIncomplete(ctx)
if err != nil {
return err
}
q.Lock()
defer q.Unlock()
for _, item := range items {
if item.Status == core.StatusRunning {
continue
}
if item.Machine != "" {
continue
}
// if the stage defines concurrency limits we
// need to make sure those limits are not exceeded
// before proceeding.
if withinLimits(item, items) == false {
continue
}
// if the system defines concurrency limits
// per repository we need to make sure those limits
// are not exceeded before proceeding.
if shouldThrottle(item, items, item.LimitRepo) == true {
continue
}
loop:
for w := range q.workers {
// the worker must match the resource kind and type
if !matchResource(w.kind, w.typ, item.Kind, item.Type) {
continue
}
if w.os != "" || w.arch != "" || w.variant != "" || w.kernel != "" {
// the worker is platform-specific. check to ensure
// the queue item matches the worker platform.
if w.os != item.OS {
continue
}
if w.arch != item.Arch {
continue
}
// if the pipeline defines a variant it must match
// the worker variant (e.g. arm6, arm7, etc).
if item.Variant != "" && item.Variant != w.variant {
continue
}
// if the pipeline defines a kernel version it must match
// the worker kernel version (e.g. 1709, 1803).
if item.Kernel != "" && item.Kernel != w.kernel {
continue
}
}
if len(item.Labels) > 0 || len(w.labels) > 0 {
if !checkLabels(item.Labels, w.labels) {
continue
}
}
// // the queue has 60 seconds to ack the item, otherwise
// // it is eligible for processing by another worker.
// // item.Expires = time.Now().Add(time.Minute).Unix()
// err := q.store.Update(ctx, item)
// if err != nil {
// log.Ctx(ctx).Warn().
// Err(err).
// Int64("build_id", item.BuildID).
// Int64("stage_id", item.ID).
// Msg("cannot update queue item")
// continue
// }
select {
case w.channel <- item:
delete(q.workers, w)
break loop
}
}
}
return nil
}
ther worker channel is block, when the request context is cancel before stage send to the worker channel, queue will be deadlock, agent request for stage is always failed.