Files
msw/core/discover.go
2024-05-16 15:51:06 +08:00

156 lines
3.4 KiB
Go

package core
import (
"context"
"fmt"
"log"
proto "msw/proto"
"net"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
func inc(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
func Discover(cidrs ...string) map[string]*proto.VersionResponse {
tasks := make(chan net.IP, 1024)
resultLock := sync.Mutex{}
seen := map[string]*proto.VersionResponse{}
wg := sync.WaitGroup{}
// start 256 workers
for i := 0; i < 256; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for ip := range tasks {
version, conn, err := CheckHealth(ip)
if err != nil {
continue
}
resultLock.Lock()
oldVersion, ok := seen[version.Hostname]
if ok {
oldVersion.Addr = removeDuplicate(append(oldVersion.Addr, version.Addr...))
} else {
seen[version.Hostname] = version
}
resultLock.Unlock()
AddConn(version.Hostname, conn)
}
}()
}
// start producer
go func() {
defer close(tasks)
for _, cidr := range cidrs {
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
log.Println(err)
}
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
ipCopy := make(net.IP, len(ip))
copy(ipCopy, ip)
tasks <- ipCopy
}
}
}()
wg.Wait()
report := fmt.Sprintf("Discovered %d nodes:\n", len(seen))
for _, v := range seen {
report += fmt.Sprintf(" %s: v%d.%d.%d started_at %s (%s), at %s %dms\n",
v.Hostname, v.Major, v.Minor, v.Patch,
v.StartedAt.AsTime().In(time.Local),
time.Now().Sub(v.StartedAt.AsTime().In(time.Local)).Round(time.Second),
v.Addr, v.Latency)
}
log.Print(report)
PrintConns()
return seen
}
var kacp = keepalive.ClientParameters{
Time: 1 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: 5 * time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
func CheckHealth(ip net.IP) (*proto.VersionResponse, *grpc.ClientConn, error) {
target := ip.String() + ":3939"
conn, err := grpc.NewClient(
target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(kacp),
)
if err != nil {
log.Fatal("Create client error:", err)
}
rpc := proto.NewMSWClient(conn)
ctx, cancle := context.WithTimeout(context.Background(), 1*time.Second)
defer cancle()
begin := time.Now()
r, err := rpc.Version(ctx, &proto.Empty{})
if err != nil {
defer conn.Close()
return &proto.VersionResponse{}, conn, err
}
r.Latency = time.Since(begin).Milliseconds()
r.Addr = append(r.Addr, ip.String()+":3939")
if r.Hostname == "" {
log.Println("Empty hostname from", ip)
return &proto.VersionResponse{}, nil, fmt.Errorf("Empty hostname from %s", ip)
}
myHostname, err := os.Hostname()
if err != nil {
log.Println("Failed to get hostname:", err)
myHostname = "unknown"
}
if r.Hostname == myHostname {
log.Println("Skip self", r.Hostname, ip)
return &proto.VersionResponse{}, nil, fmt.Errorf("Skip self %s", r.Hostname)
}
return r, conn, nil
}
func removeDuplicate[T comparable](sliceList []T) []T {
allKeys := make(map[T]bool)
list := []T{}
for _, item := range sliceList {
if _, value := allKeys[item]; !value {
allKeys[item] = true
list = append(list, item)
}
}
return list
}