initial commit

This commit is contained in:
2025-10-17 09:29:37 +03:00
commit 9739ede62e
25 changed files with 2270 additions and 0 deletions

216
internal/device/manager.go Normal file
View File

@@ -0,0 +1,216 @@
package device
import (
"context"
"fmt"
"log"
"slices"
"sync"
"time"
"gitea.sinav-lab.com/sinav/keenetic-exporter-v2/internal/config"
"gitea.sinav-lab.com/sinav/keenetic-exporter-v2/internal/keenetic"
)
type Device struct {
Name string
Client *keenetic.Client
SkipCollectors []string
UpdateInterval time.Duration
CacheTTL time.Duration
Cache map[string]any
CacheMutex sync.RWMutex
LastUpdate time.Time
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
type Manager struct {
devices []*Device
mutex sync.RWMutex
}
func NewManager(configs []config.DeviceConfig) *Manager {
m := &Manager{}
for _, cfg := range configs {
client, err := keenetic.NewClient(cfg.Address)
if err != nil {
log.Printf("Failed to create client for %s: %v", cfg.Name, err)
continue
}
timeout, err := time.ParseDuration(cfg.Timeout)
if err != nil {
timeout = 10 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
err = client.Init(ctx, cfg.Username, cfg.Password)
cancel()
if err != nil {
log.Printf("Failed to initialize client for %s: %v", cfg.Name, err)
continue
}
updateInterval, _ := time.ParseDuration(cfg.UpdateInterval)
cacheTTL, _ := time.ParseDuration(cfg.CacheTTL)
deviceCtx, deviceCancel := context.WithCancel(context.Background())
device := &Device{
Name: cfg.Name,
Client: client,
SkipCollectors: cfg.SkipCollectors,
UpdateInterval: updateInterval,
CacheTTL: cacheTTL,
Cache: make(map[string]any),
ctx: deviceCtx,
cancel: deviceCancel,
}
m.devices = append(m.devices, device)
log.Printf("Successfully initialized device: %s (hostname: %s)", cfg.Name, client.Hostname)
// Запуск фонового обновления
device.wg.Add(1)
go device.startBackgroundUpdater()
}
return m
}
func (m *Manager) GetDevices() []*Device {
m.mutex.RLock()
defer m.mutex.RUnlock()
result := make([]*Device, len(m.devices))
copy(result, m.devices)
return result
}
func (m *Manager) GetDevice(name string) (*Device, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for i := range m.devices {
if m.devices[i].Name == name {
return m.devices[i], nil
}
}
return nil, fmt.Errorf("device %s not found", name)
}
func (m *Manager) Shutdown(ctx context.Context) error {
m.mutex.RLock()
devices := m.devices
m.mutex.RUnlock()
log.Printf("shutting down %d device(s)...", len(devices))
// Cancel all device contexts
for _, dev := range devices {
if dev.cancel != nil {
dev.cancel()
}
}
// Wait for all background updaters to finish with timeout
done := make(chan struct{})
go func() {
for _, dev := range devices {
dev.wg.Wait()
}
close(done)
}()
select {
case <-done:
log.Println("all background updaters stopped successfully")
return nil
case <-ctx.Done():
return fmt.Errorf("shutdown timeout exceeded")
}
}
func (d *Device) startBackgroundUpdater() {
defer d.wg.Done()
if d.UpdateInterval <= 0 {
return
}
log.Printf("starting background updater for device %s (interval: %s)", d.Name, d.UpdateInterval)
ticker := time.NewTicker(d.UpdateInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.updateCache()
case <-d.ctx.Done():
log.Printf("stopping background updater for device %s", d.Name)
return
}
}
}
func (d *Device) updateCache() {
collectors := map[string]func(context.Context) (any, error){
"system": func(ctx context.Context) (any, error) {
return d.Client.GetSystemInfo(ctx)
},
"interface": func(ctx context.Context) (any, error) {
return d.Client.GetInterfaceInfo(ctx)
},
"internet": func(ctx context.Context) (any, error) {
return d.Client.GetInternetStatus(ctx)
},
"hotspot": func(ctx context.Context) (any, error) {
return d.Client.GetHotspotClientInfo(ctx)
},
"interface_stats": func(ctx context.Context) (any, error) {
return d.Client.GetConnectedInterfaceStats(ctx)
},
"process": func(ctx context.Context) (any, error) {
return d.Client.GetProcessInfo(ctx)
},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
updatedCount := 0
for name, fetchFn := range collectors {
if d.shouldSkipCollector(name) {
continue
}
data, err := fetchFn(ctx)
if err != nil {
log.Printf("background update failed for device %s, collector %s: %v", d.Name, name, err)
continue
}
d.CacheMutex.Lock()
d.Cache[name] = data
d.LastUpdate = time.Now()
d.CacheMutex.Unlock()
updatedCount++
}
if updatedCount > 0 {
log.Printf("background update completed for device %s: %d collector(s) updated", d.Name, updatedCount)
}
}
func (d *Device) shouldSkipCollector(name string) bool {
return slices.Contains(d.SkipCollectors, name)
}