暂存

tmp
This commit is contained in:
2024-05-09 16:35:21 +08:00
parent 3afa0d81bb
commit bcc1b51006
27 changed files with 1526 additions and 0 deletions

3
core/channel.go Normal file
View File

@@ -0,0 +1,3 @@
package core
var pluginChannels = map[string]chan string{}

36
core/conns.go Normal file
View File

@@ -0,0 +1,36 @@
package core
import (
"fmt"
"log"
"sync"
"google.golang.org/grpc"
)
var conns = map[string]*grpc.ClientConn{}
var connsLock sync.Mutex
func AddConn(name string, conn *grpc.ClientConn) {
connsLock.Lock()
defer connsLock.Unlock()
if oldConn, ok := conns[name]; ok {
oldConn.Close()
}
conns[name] = conn
}
func PrintConns() {
connsLock.Lock()
defer connsLock.Unlock()
report := "Current connections:\n"
for name, conn := range conns {
report += fmt.Sprintf(" %s %s -> %s\n", conn.GetState(), name, conn.Target())
}
log.Print(report)
}

47
core/core.go Normal file
View File

@@ -0,0 +1,47 @@
package core
import (
"log"
"msw/proto"
"msw/rpc"
"msw/shell"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}
var kasp = keepalive.ServerParameters{
Time: 1 * time.Second,
Timeout: 5 * time.Second,
}
func Start() {
log.Println("Starting main loop")
go shell.ExecuteOne("./resources/node_exporter")
go shell.ExecuteOne("./resources/ipmi_exporter")
go func() {
listAddr := "0.0.0.0:3939"
list, err := net.Listen("tcp", listAddr)
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
)
proto.RegisterMSWServer(s, &rpc.MSWServer{})
log.Println("RPC Server started on", listAddr)
if err := s.Serve(list); err != nil {
log.Fatal(err)
}
}()
}

155
core/discover.go Normal file
View File

@@ -0,0 +1,155 @@
package core
import (
"context"
"fmt"
"log"
proto "msw/proto"
"net"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
func inc(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
func Discover(cidrs ...string) map[string]*proto.VersionResponse {
tasks := make(chan net.IP, 1024)
resultLock := sync.Mutex{}
seen := map[string]*proto.VersionResponse{}
wg := sync.WaitGroup{}
// start 256 workers
for i := 0; i < 256; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for ip := range tasks {
version, conn, err := CheckHealth(ip)
if err != nil {
continue
}
resultLock.Lock()
oldVersion, ok := seen[version.Hostname]
if ok {
oldVersion.Addr = removeDuplicate(append(oldVersion.Addr, version.Addr...))
} else {
seen[version.Hostname] = version
}
resultLock.Unlock()
AddConn(version.Hostname, conn)
}
}()
}
// start producer
go func() {
defer close(tasks)
for _, cidr := range cidrs {
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
log.Println(err)
}
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
ipCopy := make(net.IP, len(ip))
copy(ipCopy, ip)
tasks <- ipCopy
}
}
}()
wg.Wait()
report := fmt.Sprintf("Discovered %d nodes:\n", len(seen))
for _, v := range seen {
report += fmt.Sprintf(" %s: v%d.%d.%d started_at %s (%s), at %s %dms\n",
v.Hostname, v.Major, v.Minor, v.Patch,
v.StartedAt.AsTime().In(time.Local),
time.Now().Sub(v.StartedAt.AsTime().In(time.Local)).Round(time.Second),
v.Addr, v.Latency)
}
log.Print(report)
PrintConns()
return seen
}
var kacp = keepalive.ClientParameters{
Time: 1 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: 5 * time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
func CheckHealth(ip net.IP) (*proto.VersionResponse, *grpc.ClientConn, error) {
target := ip.String() + ":3939"
conn, err := grpc.NewClient(
target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(kacp),
)
if err != nil {
log.Fatal("Create client error:", err)
}
rpc := proto.NewMSWClient(conn)
ctx, cancle := context.WithTimeout(context.Background(), 1*time.Second)
defer cancle()
begin := time.Now()
r, err := rpc.Version(ctx, &proto.Empty{})
if err != nil {
defer conn.Close()
return &proto.VersionResponse{}, conn, err
}
r.Latency = time.Since(begin).Milliseconds()
r.Addr = append(r.Addr, ip.String()+":3939")
if r.Hostname == "" {
log.Println("Empty hostname from", ip)
return &proto.VersionResponse{}, nil, fmt.Errorf("Empty hostname from %s", ip)
}
myHostname, err := os.Hostname()
if err != nil {
log.Println("Failed to get hostname:", err)
myHostname = "unknown"
}
if r.Hostname == myHostname {
log.Println("Skip self", r.Hostname, ip)
return &proto.VersionResponse{}, nil, fmt.Errorf("Skip self %s", r.Hostname)
}
return r, conn, nil
}
func removeDuplicate[T comparable](sliceList []T) []T {
allKeys := make(map[T]bool)
list := []T{}
for _, item := range sliceList {
if _, value := allKeys[item]; !value {
allKeys[item] = true
list = append(list, item)
}
}
return list
}

22
core/restart.go Normal file
View File

@@ -0,0 +1,22 @@
package core
import (
"os"
"syscall"
)
func RestartSelf() {
executable, err := os.Executable()
if err != nil {
panic("获取可执行文件路径失败: " + err.Error())
}
args := os.Args
env := os.Environ()
// 使用exec替换当前进程为新的进程
err = syscall.Exec(executable, args, env)
if err != nil {
panic("重启失败: " + err.Error())
}
}