84 lines
1.7 KiB
Go
84 lines
1.7 KiB
Go
|
|
package main
|
||
|
|
|
||
|
|
import (
|
||
|
|
"fmt"
|
||
|
|
"net/http"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/labstack/echo/v4"
|
||
|
|
)
|
||
|
|
|
||
|
|
func (a *App) handleEvents(c echo.Context) error {
|
||
|
|
res := c.Response()
|
||
|
|
req := c.Request()
|
||
|
|
|
||
|
|
res.Header().Set(echo.HeaderContentType, "text/event-stream")
|
||
|
|
res.Header().Set("Cache-Control", "no-cache, no-transform")
|
||
|
|
res.Header().Set("Connection", "keep-alive")
|
||
|
|
res.Header().Set("X-Accel-Buffering", "no")
|
||
|
|
res.WriteHeader(http.StatusOK)
|
||
|
|
|
||
|
|
flusher, ok := res.Writer.(http.Flusher)
|
||
|
|
if !ok {
|
||
|
|
return writeError(c, http.StatusInternalServerError, "streaming not supported")
|
||
|
|
}
|
||
|
|
|
||
|
|
if _, err := fmt.Fprintf(res, "event: ready\ndata: {\"ts\":\"%s\"}\n\n", time.Now().UTC().Format(time.RFC3339)); err != nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
flusher.Flush()
|
||
|
|
|
||
|
|
id, events := a.events.Subscribe()
|
||
|
|
defer a.events.Unsubscribe(id)
|
||
|
|
|
||
|
|
keepAlive := time.NewTicker(20 * time.Second)
|
||
|
|
defer keepAlive.Stop()
|
||
|
|
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-req.Context().Done():
|
||
|
|
return nil
|
||
|
|
case <-events:
|
||
|
|
if _, err := fmt.Fprintf(res, "event: state\ndata: {\"ts\":\"%s\"}\n\n", time.Now().UTC().Format(time.RFC3339)); err != nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
flusher.Flush()
|
||
|
|
case t := <-keepAlive.C:
|
||
|
|
if _, err := fmt.Fprintf(res, ": ping %d\n\n", t.Unix()); err != nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
flusher.Flush()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (h *EventHub) Subscribe() (int, <-chan struct{}) {
|
||
|
|
h.mu.Lock()
|
||
|
|
defer h.mu.Unlock()
|
||
|
|
h.nextID++
|
||
|
|
id := h.nextID
|
||
|
|
ch := make(chan struct{}, 1)
|
||
|
|
h.subscribers[id] = ch
|
||
|
|
return id, ch
|
||
|
|
}
|
||
|
|
|
||
|
|
func (h *EventHub) Unsubscribe(id int) {
|
||
|
|
h.mu.Lock()
|
||
|
|
defer h.mu.Unlock()
|
||
|
|
if ch, ok := h.subscribers[id]; ok {
|
||
|
|
delete(h.subscribers, id)
|
||
|
|
close(ch)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (h *EventHub) Broadcast() {
|
||
|
|
h.mu.RLock()
|
||
|
|
defer h.mu.RUnlock()
|
||
|
|
for _, ch := range h.subscribers {
|
||
|
|
select {
|
||
|
|
case ch <- struct{}{}:
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|