update
This commit is contained in:
83
backend/events.go
Normal file
83
backend/events.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
)
|
||||
|
||||
func (a *App) handleEvents(c echo.Context) error {
|
||||
res := c.Response()
|
||||
req := c.Request()
|
||||
|
||||
res.Header().Set(echo.HeaderContentType, "text/event-stream")
|
||||
res.Header().Set("Cache-Control", "no-cache, no-transform")
|
||||
res.Header().Set("Connection", "keep-alive")
|
||||
res.Header().Set("X-Accel-Buffering", "no")
|
||||
res.WriteHeader(http.StatusOK)
|
||||
|
||||
flusher, ok := res.Writer.(http.Flusher)
|
||||
if !ok {
|
||||
return writeError(c, http.StatusInternalServerError, "streaming not supported")
|
||||
}
|
||||
|
||||
if _, err := fmt.Fprintf(res, "event: ready\ndata: {\"ts\":\"%s\"}\n\n", time.Now().UTC().Format(time.RFC3339)); err != nil {
|
||||
return nil
|
||||
}
|
||||
flusher.Flush()
|
||||
|
||||
id, events := a.events.Subscribe()
|
||||
defer a.events.Unsubscribe(id)
|
||||
|
||||
keepAlive := time.NewTicker(20 * time.Second)
|
||||
defer keepAlive.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-req.Context().Done():
|
||||
return nil
|
||||
case <-events:
|
||||
if _, err := fmt.Fprintf(res, "event: state\ndata: {\"ts\":\"%s\"}\n\n", time.Now().UTC().Format(time.RFC3339)); err != nil {
|
||||
return nil
|
||||
}
|
||||
flusher.Flush()
|
||||
case t := <-keepAlive.C:
|
||||
if _, err := fmt.Fprintf(res, ": ping %d\n\n", t.Unix()); err != nil {
|
||||
return nil
|
||||
}
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *EventHub) Subscribe() (int, <-chan struct{}) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.nextID++
|
||||
id := h.nextID
|
||||
ch := make(chan struct{}, 1)
|
||||
h.subscribers[id] = ch
|
||||
return id, ch
|
||||
}
|
||||
|
||||
func (h *EventHub) Unsubscribe(id int) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if ch, ok := h.subscribers[id]; ok {
|
||||
delete(h.subscribers, id)
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *EventHub) Broadcast() {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for _, ch := range h.subscribers {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user