156 lines
3.4 KiB
Go
156 lines
3.4 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
proto "msw/proto"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/keepalive"
|
|
)
|
|
|
|
func inc(ip net.IP) {
|
|
for j := len(ip) - 1; j >= 0; j-- {
|
|
ip[j]++
|
|
if ip[j] > 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func Discover(cidrs ...string) map[string]*proto.VersionResponse {
|
|
tasks := make(chan net.IP, 1024)
|
|
resultLock := sync.Mutex{}
|
|
seen := map[string]*proto.VersionResponse{}
|
|
wg := sync.WaitGroup{}
|
|
|
|
// start 256 workers
|
|
for i := 0; i < 256; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for ip := range tasks {
|
|
version, conn, err := CheckHealth(ip)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
resultLock.Lock()
|
|
oldVersion, ok := seen[version.Hostname]
|
|
if ok {
|
|
oldVersion.Addr = removeDuplicate(append(oldVersion.Addr, version.Addr...))
|
|
} else {
|
|
seen[version.Hostname] = version
|
|
}
|
|
resultLock.Unlock()
|
|
|
|
AddConn(version.Hostname, conn)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// start producer
|
|
go func() {
|
|
defer close(tasks)
|
|
for _, cidr := range cidrs {
|
|
ip, ipnet, err := net.ParseCIDR(cidr)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
|
|
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
|
|
ipCopy := make(net.IP, len(ip))
|
|
copy(ipCopy, ip)
|
|
tasks <- ipCopy
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
report := fmt.Sprintf("Discovered %d nodes:\n", len(seen))
|
|
for _, v := range seen {
|
|
report += fmt.Sprintf(" %s: v%d.%d.%d started_at %s (%s), at %s %dms\n",
|
|
v.Hostname, v.Major, v.Minor, v.Patch,
|
|
v.StartedAt.AsTime().In(time.Local),
|
|
time.Now().Sub(v.StartedAt.AsTime().In(time.Local)).Round(time.Second),
|
|
v.Addr, v.Latency)
|
|
}
|
|
log.Print(report)
|
|
|
|
PrintConns()
|
|
|
|
return seen
|
|
}
|
|
|
|
var kacp = keepalive.ClientParameters{
|
|
Time: 1 * time.Second, // send pings every 10 seconds if there is no activity
|
|
Timeout: 5 * time.Second, // wait 1 second for ping ack before considering the connection dead
|
|
PermitWithoutStream: true, // send pings even without active streams
|
|
}
|
|
|
|
func CheckHealth(ip net.IP) (*proto.VersionResponse, *grpc.ClientConn, error) {
|
|
target := ip.String() + ":3939"
|
|
conn, err := grpc.NewClient(
|
|
target,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithKeepaliveParams(kacp),
|
|
)
|
|
if err != nil {
|
|
log.Fatal("Create client error:", err)
|
|
}
|
|
|
|
rpc := proto.NewMSWClient(conn)
|
|
|
|
ctx, cancle := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancle()
|
|
|
|
begin := time.Now()
|
|
|
|
r, err := rpc.Version(ctx, &proto.Empty{})
|
|
if err != nil {
|
|
defer conn.Close()
|
|
return &proto.VersionResponse{}, conn, err
|
|
}
|
|
|
|
r.Latency = time.Since(begin).Milliseconds()
|
|
|
|
r.Addr = append(r.Addr, ip.String()+":3939")
|
|
|
|
if r.Hostname == "" {
|
|
log.Println("Empty hostname from", ip)
|
|
return &proto.VersionResponse{}, nil, fmt.Errorf("Empty hostname from %s", ip)
|
|
}
|
|
|
|
myHostname, err := os.Hostname()
|
|
if err != nil {
|
|
log.Println("Failed to get hostname:", err)
|
|
myHostname = "unknown"
|
|
}
|
|
|
|
if r.Hostname == myHostname {
|
|
log.Println("Skip self", r.Hostname, ip)
|
|
return &proto.VersionResponse{}, nil, fmt.Errorf("Skip self %s", r.Hostname)
|
|
}
|
|
|
|
return r, conn, nil
|
|
}
|
|
|
|
func removeDuplicate[T comparable](sliceList []T) []T {
|
|
allKeys := make(map[T]bool)
|
|
list := []T{}
|
|
for _, item := range sliceList {
|
|
if _, value := allKeys[item]; !value {
|
|
allKeys[item] = true
|
|
list = append(list, item)
|
|
}
|
|
}
|
|
return list
|
|
}
|