package main import ( "fmt" "net/http" "time" "github.com/labstack/echo/v4" ) func (a *App) handleEvents(c echo.Context) error { res := c.Response() req := c.Request() res.Header().Set(echo.HeaderContentType, "text/event-stream") res.Header().Set("Cache-Control", "no-cache, no-transform") res.Header().Set("Connection", "keep-alive") res.Header().Set("X-Accel-Buffering", "no") res.WriteHeader(http.StatusOK) flusher, ok := res.Writer.(http.Flusher) if !ok { return writeError(c, http.StatusInternalServerError, "streaming not supported") } if _, err := fmt.Fprintf(res, "event: ready\ndata: {\"ts\":\"%s\"}\n\n", time.Now().UTC().Format(time.RFC3339)); err != nil { return nil } flusher.Flush() id, events := a.events.Subscribe() defer a.events.Unsubscribe(id) keepAlive := time.NewTicker(20 * time.Second) defer keepAlive.Stop() for { select { case <-req.Context().Done(): return nil case <-events: if _, err := fmt.Fprintf(res, "event: state\ndata: {\"ts\":\"%s\"}\n\n", time.Now().UTC().Format(time.RFC3339)); err != nil { return nil } flusher.Flush() case t := <-keepAlive.C: if _, err := fmt.Fprintf(res, ": ping %d\n\n", t.Unix()); err != nil { return nil } flusher.Flush() } } } func (h *EventHub) Subscribe() (int, <-chan struct{}) { h.mu.Lock() defer h.mu.Unlock() h.nextID++ id := h.nextID ch := make(chan struct{}, 1) h.subscribers[id] = ch return id, ch } func (h *EventHub) Unsubscribe(id int) { h.mu.Lock() defer h.mu.Unlock() if ch, ok := h.subscribers[id]; ok { delete(h.subscribers, id) close(ch) } } func (h *EventHub) Broadcast() { h.mu.RLock() defer h.mu.RUnlock() for _, ch := range h.subscribers { select { case ch <- struct{}{}: default: } } }