Moving event handling for external subscribers to use go-events #2747
Moving event handling for external subscribers to use go-events #2747
Conversation
564bbc8
to
d0f5ce3
Compare
@aaronlehmann @alexmavr this PR is a rough pattern of the changes. Let me know if this is a reasonable way to use the queue. I have let the old event handler stay, because the rescheduling code uses it, but will work on removing it in a follow-up later. Still remaining:
|
api/handlers.go
Outdated
f.Flush() | ||
} | ||
if err != nil { | ||
log.Debugf("failed to write event to output stream %s", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there was an error writing to the socket, this code should return from the function. Currently this loop will keep receiving events and trying to write them forever.
This looks like the right approach to me. |
69e0eca
to
cfcf142
Compare
ef239b5
to
26d10bd
Compare
@aaronlehmann @alexmavr I've updated the PR, and fixed all outstanding issues, barring one (getting an accurate counter for the number of listeners, which at this point will return a lower estimate). For reference and help with reviewing, the main event handler code is in The watch queue itself and related functions are part of The rest of it should be hopefully straightforward. Also ping @dhiltgen @wsong @aluzzardi |
api/events.go
Outdated
// remove this hack once 1.10 is broadly adopted | ||
from := e.From | ||
e.From = e.From + " node:" + e.Engine.Name | ||
|
||
// Lock access to the attributes map | ||
e.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this operate on a copy of e
, so that the event itself (which is shared between all the subscribers) is not modified by each subscriber?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaronlehmann yeah, I intended to do that, but must have forgotten. I think doing a deepcopy of the event is the way to go.
api/handlers.go
Outdated
defer cancelFunc() | ||
|
||
// create timer for --until | ||
timer := time.NewTimer(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't create the timer in the case where --until
is not specified. It would fire immediately, and the select
would always be able to take the <-timer.C
branch, hogging CPU.
How about something like this:
var (
timer *time.Timer
timerCh <-chan time.Time
)
if until > 0 {
dur := time.Unix(until, 0).Sub(time.Now())
timer = time.NewTimer(dur)
timerCh = timer.C
}
Then have a case <-timerCh:
in the select below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a better idea.
api/handlers.go
Outdated
if e, ok := eChan.(*cluster.Event); ok { | ||
if data, err := normalizeEvent(e); err != nil { | ||
return | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need an else
because of the return
above.
api/handlers.go
Outdated
|
||
for { | ||
select { | ||
case eChan := <-eventsChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is eChan
not an event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eChan
has an interface type, which we cast to *cluster.Event
below.
api/handlers.go
Outdated
if err != nil { | ||
return | ||
} | ||
_, err = fmt.Fprint(w, string(data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not write directly here, rather than reallocate the data as a string? w.Write(data)
should be sufficient.
} | ||
var closeNotify <-chan bool | ||
if closeNotifier, ok := w.(http.CloseNotifier); ok { | ||
closeNotify = closeNotifier.CloseNotify() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on go version, you can now use r.Context().Done()
to listen to this cancellation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the request context is set correctly in all cases in Swarm, some of the code is from before contexts were introduced in Go. I'll check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be from net/http
, so should be wired to the CloseNotifier
.
api/handlers.go
Outdated
|
||
for { | ||
select { | ||
case eChan := <-eventsChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaronlehmann Does this need "closed" detection? For example, will the watch ever shutdown this channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it will not, but moby/swarmkit#2285 adds this as an option. So it probably makes sense to add the check here in anticipation of using that later.
@@ -168,12 +170,20 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st | |||
|
|||
// Handle callbacks for the events | |||
func (c *Cluster) Handle(e *cluster.Event) error { | |||
// publish event to the watchQueue for external subscribers | |||
c.watchQueue.Publish(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we adapt this to the eventHandlers list? I am not sure that making this dependent on the watch queue makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevvooe I agree, but I also think that the eventual queue we will used will be moved upstream to go-events. This is supposed to be a temporary fix.
eventHandlers
will go away actually, and all events should be handled using the watchQueue. For now though, some internal rescheduling functionality uses events which I did not want to mess with in this PR, that's why we have this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nishanttotla That still doesn't explain why this isn't just a handler. You can do this with a closure to adapt Publish
to Handler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevvooe not sure I follow exactly here. What do you envision this handler to look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean: just make an event handler. I am not sure how this got merged in its current state. It just doesn't make any sense to have both a handler set and a publish set. I'd recommend cleaning this up before it gets too far. It is going to be a pain to remove later and will confuse future contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will clean that up soon. The handler will entirely go away as it is, and only the publish set will remain.
c326fc7
to
1ca4e7d
Compare
api/primary.go
Outdated
// eventsQueue uses the watch package from SwarmKit, which is based on | ||
// the go-events package. See https://github.com/docker/swarm/issues/2718 | ||
// for context | ||
eventsQueue := watch.NewQueue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that moby/swarmkit#2285 is merged, let's revendor swarmkit and change this constructor to:
NewQueue(WithTimeout(10 * time.Second), WithLimit(10000), WithCloseOutChan())
We probably want to define these values as config vars and plumb them through the standard config model used by classic swarm.
Assuming 1KB per event (gross overestimation), the size limit of 10000
for the queue should occupy no more than 10MB per listener in memory. For the edge case of 100 simultaneous inefficient listeners, this would lead up to to an additional 1 GB of memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexmavr done. PTAL.
a04af50
to
0284f9a
Compare
…0a8ca812 Signed-off-by: Nishant Totla <nishanttotla@gmail.com>
377d88c
to
810b74b
Compare
if !ok { | ||
break | ||
} | ||
data, err := normalizeEvent(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider event filtering here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexmavr maybe we could do it as a follow up?
37d35add5005832485c0225ec870121b78fcff1c Signed-off-by: Nishant Totla <nishanttotla@gmail.com>
…er/go-events Signed-off-by: Nishant Totla <nishanttotla@gmail.com>
89897a1
to
ba7fd94
Compare
Signed-off-by: Nishant Totla <nishanttotla@gmail.com>
ba7fd94
to
0f4f206
Compare
@alexmavr added a counter to track number of active listeners, PTAL. |
/cc @wsong for review |
Merging this now, since it's been reviewed and it worked in testing, both manual and |
@nishanttotla I'd recommend taking time to cleanup the double handler logic. That will result in a bug in the future if one adds some to the task queue and the handler set. |
Fix #2718
This PR starts to move event handling for external subscribers (API users) to use the battle-tested github.com/docker/go-events repo instead of
cluster.EventHandlers
defined internally in Swarm. To be more precise, it uses theQueue
abstraction defined in thewatch
package in github.com/docker/swarmkit.For now, event listening by the watchdog (for rescheduling) continues to be done by the old code.