initial commit
This commit is contained in:
216
internal/device/manager.go
Normal file
216
internal/device/manager.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package device
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.sinav-lab.com/sinav/keenetic-exporter-v2/internal/config"
|
||||
"gitea.sinav-lab.com/sinav/keenetic-exporter-v2/internal/keenetic"
|
||||
)
|
||||
|
||||
type Device struct {
|
||||
Name string
|
||||
Client *keenetic.Client
|
||||
SkipCollectors []string
|
||||
|
||||
UpdateInterval time.Duration
|
||||
CacheTTL time.Duration
|
||||
|
||||
Cache map[string]any
|
||||
CacheMutex sync.RWMutex
|
||||
LastUpdate time.Time
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
devices []*Device
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
func NewManager(configs []config.DeviceConfig) *Manager {
|
||||
m := &Manager{}
|
||||
|
||||
for _, cfg := range configs {
|
||||
client, err := keenetic.NewClient(cfg.Address)
|
||||
if err != nil {
|
||||
log.Printf("Failed to create client for %s: %v", cfg.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
timeout, err := time.ParseDuration(cfg.Timeout)
|
||||
if err != nil {
|
||||
timeout = 10 * time.Second
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
err = client.Init(ctx, cfg.Username, cfg.Password)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Failed to initialize client for %s: %v", cfg.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
updateInterval, _ := time.ParseDuration(cfg.UpdateInterval)
|
||||
cacheTTL, _ := time.ParseDuration(cfg.CacheTTL)
|
||||
|
||||
deviceCtx, deviceCancel := context.WithCancel(context.Background())
|
||||
device := &Device{
|
||||
Name: cfg.Name,
|
||||
Client: client,
|
||||
SkipCollectors: cfg.SkipCollectors,
|
||||
UpdateInterval: updateInterval,
|
||||
CacheTTL: cacheTTL,
|
||||
Cache: make(map[string]any),
|
||||
ctx: deviceCtx,
|
||||
cancel: deviceCancel,
|
||||
}
|
||||
|
||||
m.devices = append(m.devices, device)
|
||||
log.Printf("Successfully initialized device: %s (hostname: %s)", cfg.Name, client.Hostname)
|
||||
|
||||
// Запуск фонового обновления
|
||||
device.wg.Add(1)
|
||||
go device.startBackgroundUpdater()
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *Manager) GetDevices() []*Device {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
result := make([]*Device, len(m.devices))
|
||||
copy(result, m.devices)
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *Manager) GetDevice(name string) (*Device, error) {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
for i := range m.devices {
|
||||
if m.devices[i].Name == name {
|
||||
return m.devices[i], nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("device %s not found", name)
|
||||
}
|
||||
|
||||
func (m *Manager) Shutdown(ctx context.Context) error {
|
||||
m.mutex.RLock()
|
||||
devices := m.devices
|
||||
m.mutex.RUnlock()
|
||||
|
||||
log.Printf("shutting down %d device(s)...", len(devices))
|
||||
|
||||
// Cancel all device contexts
|
||||
for _, dev := range devices {
|
||||
if dev.cancel != nil {
|
||||
dev.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all background updaters to finish with timeout
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for _, dev := range devices {
|
||||
dev.wg.Wait()
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
log.Println("all background updaters stopped successfully")
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("shutdown timeout exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Device) startBackgroundUpdater() {
|
||||
defer d.wg.Done()
|
||||
|
||||
if d.UpdateInterval <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("starting background updater for device %s (interval: %s)", d.Name, d.UpdateInterval)
|
||||
|
||||
ticker := time.NewTicker(d.UpdateInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
d.updateCache()
|
||||
|
||||
case <-d.ctx.Done():
|
||||
log.Printf("stopping background updater for device %s", d.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Device) updateCache() {
|
||||
collectors := map[string]func(context.Context) (any, error){
|
||||
"system": func(ctx context.Context) (any, error) {
|
||||
return d.Client.GetSystemInfo(ctx)
|
||||
},
|
||||
"interface": func(ctx context.Context) (any, error) {
|
||||
return d.Client.GetInterfaceInfo(ctx)
|
||||
},
|
||||
"internet": func(ctx context.Context) (any, error) {
|
||||
return d.Client.GetInternetStatus(ctx)
|
||||
},
|
||||
"hotspot": func(ctx context.Context) (any, error) {
|
||||
return d.Client.GetHotspotClientInfo(ctx)
|
||||
},
|
||||
"interface_stats": func(ctx context.Context) (any, error) {
|
||||
return d.Client.GetConnectedInterfaceStats(ctx)
|
||||
},
|
||||
"process": func(ctx context.Context) (any, error) {
|
||||
return d.Client.GetProcessInfo(ctx)
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
updatedCount := 0
|
||||
for name, fetchFn := range collectors {
|
||||
if d.shouldSkipCollector(name) {
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := fetchFn(ctx)
|
||||
if err != nil {
|
||||
log.Printf("background update failed for device %s, collector %s: %v", d.Name, name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
d.CacheMutex.Lock()
|
||||
d.Cache[name] = data
|
||||
d.LastUpdate = time.Now()
|
||||
d.CacheMutex.Unlock()
|
||||
|
||||
updatedCount++
|
||||
}
|
||||
|
||||
if updatedCount > 0 {
|
||||
log.Printf("background update completed for device %s: %d collector(s) updated", d.Name, updatedCount)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Device) shouldSkipCollector(name string) bool {
|
||||
return slices.Contains(d.SkipCollectors, name)
|
||||
}
|
||||
Reference in New Issue
Block a user