package core import ( "context" "fmt" "log" proto "msw/proto" "net" "os" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" ) func inc(ip net.IP) { for j := len(ip) - 1; j >= 0; j-- { ip[j]++ if ip[j] > 0 { break } } } func Discover(cidrs ...string) map[string]*proto.VersionResponse { tasks := make(chan net.IP, 1024) resultLock := sync.Mutex{} seen := map[string]*proto.VersionResponse{} wg := sync.WaitGroup{} // start 256 workers for i := 0; i < 256; i++ { wg.Add(1) go func() { defer wg.Done() for ip := range tasks { version, conn, err := CheckHealth(ip) if err != nil { continue } resultLock.Lock() oldVersion, ok := seen[version.Hostname] if ok { oldVersion.Addr = removeDuplicate(append(oldVersion.Addr, version.Addr...)) } else { seen[version.Hostname] = version } resultLock.Unlock() AddConn(version.Hostname, conn) } }() } // start producer go func() { defer close(tasks) for _, cidr := range cidrs { ip, ipnet, err := net.ParseCIDR(cidr) if err != nil { log.Println(err) } for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { ipCopy := make(net.IP, len(ip)) copy(ipCopy, ip) tasks <- ipCopy } } }() wg.Wait() report := fmt.Sprintf("Discovered %d nodes:\n", len(seen)) for _, v := range seen { report += fmt.Sprintf(" %s: v%d.%d.%d started_at %s (%s), at %s %dms\n", v.Hostname, v.Major, v.Minor, v.Patch, v.StartedAt.AsTime().In(time.Local), time.Now().Sub(v.StartedAt.AsTime().In(time.Local)).Round(time.Second), v.Addr, v.Latency) } log.Print(report) PrintConns() return seen } var kacp = keepalive.ClientParameters{ Time: 1 * time.Second, // send pings every 10 seconds if there is no activity Timeout: 5 * time.Second, // wait 1 second for ping ack before considering the connection dead PermitWithoutStream: true, // send pings even without active streams } func CheckHealth(ip net.IP) (*proto.VersionResponse, *grpc.ClientConn, error) { target := ip.String() + ":3939" conn, err := grpc.NewClient( target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp), ) if err != nil { log.Fatal("Create client error:", err) } rpc := proto.NewMSWClient(conn) ctx, cancle := context.WithTimeout(context.Background(), 1*time.Second) defer cancle() begin := time.Now() r, err := rpc.Version(ctx, &proto.Empty{}) if err != nil { defer conn.Close() return &proto.VersionResponse{}, conn, err } r.Latency = time.Since(begin).Milliseconds() r.Addr = append(r.Addr, ip.String()+":3939") if r.Hostname == "" { log.Println("Empty hostname from", ip) return &proto.VersionResponse{}, nil, fmt.Errorf("Empty hostname from %s", ip) } myHostname, err := os.Hostname() if err != nil { log.Println("Failed to get hostname:", err) myHostname = "unknown" } if r.Hostname == myHostname { log.Println("Skip self", r.Hostname, ip) return &proto.VersionResponse{}, nil, fmt.Errorf("Skip self %s", r.Hostname) } return r, conn, nil } func removeDuplicate[T comparable](sliceList []T) []T { allKeys := make(map[T]bool) list := []T{} for _, item := range sliceList { if _, value := allKeys[item]; !value { allKeys[item] = true list = append(list, item) } } return list }