Scheduler queue deadlock

func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, error) {
	w := &worker{
		kind:    params.Kind,
		typ:     params.Type,
		os:      params.OS,
		arch:    params.Arch,
		kernel:  params.Kernel,
		variant: params.Variant,
		labels:  params.Labels,
		channel: make(chan *core.Stage),
	}
	q.Lock()
	q.workers[w] = struct{}{}
	q.Unlock()

	select {
	case q.ready <- struct{}{}:
	default:
	}

	select {
	case <-ctx.Done():
		q.Lock()
		delete(q.workers, w)
		q.Unlock()
		return nil, ctx.Err()
	case b := <-w.channel:
		return b, nil
	}
}
func (q *queue) signal(ctx context.Context) error {
	if err := q.globMx.LockContext(ctx); err != nil {
		return err
	}
	defer q.globMx.UnlockContext(ctx)

	q.Lock()
	count := len(q.workers)
	pause := q.paused
	q.Unlock()
	if pause {
		return nil
	}
	if count == 0 {
		return nil
	}
	items, err := q.store.ListIncomplete(ctx)
	if err != nil {
		return err
	}

	q.Lock()
	defer q.Unlock()
	for _, item := range items {
		if item.Status == core.StatusRunning {
			continue
		}
		if item.Machine != "" {
			continue
		}

		// if the stage defines concurrency limits we
		// need to make sure those limits are not exceeded
		// before proceeding.
		if withinLimits(item, items) == false {
			continue
		}

		// if the system defines concurrency limits
		// per repository we need to make sure those limits
		// are not exceeded before proceeding.
		if shouldThrottle(item, items, item.LimitRepo) == true {
			continue
		}

	loop:
		for w := range q.workers {
			// the worker must match the resource kind and type
			if !matchResource(w.kind, w.typ, item.Kind, item.Type) {
				continue
			}

			if w.os != "" || w.arch != "" || w.variant != "" || w.kernel != "" {
				// the worker is platform-specific. check to ensure
				// the queue item matches the worker platform.
				if w.os != item.OS {
					continue
				}
				if w.arch != item.Arch {
					continue
				}
				// if the pipeline defines a variant it must match
				// the worker variant (e.g. arm6, arm7, etc).
				if item.Variant != "" && item.Variant != w.variant {
					continue
				}
				// if the pipeline defines a kernel version it must match
				// the worker kernel version (e.g. 1709, 1803).
				if item.Kernel != "" && item.Kernel != w.kernel {
					continue
				}
			}

			if len(item.Labels) > 0 || len(w.labels) > 0 {
				if !checkLabels(item.Labels, w.labels) {
					continue
				}
			}

			// // the queue has 60 seconds to ack the item, otherwise
			// // it is eligible for processing by another worker.
			// // item.Expires = time.Now().Add(time.Minute).Unix()
			// err := q.store.Update(ctx, item)

			// if err != nil {
			// 	log.Ctx(ctx).Warn().
			// 		Err(err).
			// 		Int64("build_id", item.BuildID).
			// 		Int64("stage_id", item.ID).
			// 		Msg("cannot update queue item")
			// 	continue
			// }
			select {
			case w.channel <- item:
				delete(q.workers, w)
				break loop
			}
		}
	}
	return nil
}

ther worker channel is block, when the request context is cancel before stage send to the worker channel, queue will be deadlock, agent request for stage is always failed.