Skip to content

Commit

Permalink
More ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
Lars T Hansen committed Mar 7, 2025
1 parent 5662782 commit 197ee93
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ target/
*.rej
*.orig
util/ingest-kafka/ingest-kafka
util/ingest-files/ingest-files


6 changes: 6 additions & 0 deletions util/ingest-files/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module ingest-files

go 1.23.6

require formats v0.0.0-00010101000000-000000000000 // indirect
replace formats => ../formats
160 changes: 160 additions & 0 deletions util/ingest-files/ingest-files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Ingest old-format files. The trailing arguments are names of files or directories. If they are
// files they must match '*.csv' for old-style `sonar ps` data or `sysinfo-*.json` for old-style
// `sonar sysinfo` data. Otherwise they are going to be treated as directories.

package main

import (
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"io/fs"
"log"
"os"
"path"
"regexp"
"strings"

"formats/oldfmt"
)

var (
verbose = flag.Bool("v", false, "Verbose")
)

var (
sampleFile = regexp.MustCompile(`^(?:.*/)?(.*)\.csv$`)
sysinfoFile = regexp.MustCompile(`^(?:.*/)?sysinfo-(.*)\.json$`)
)

func main() {
// TODO: Usage message to indicate trailing args
flag.Parse()
for _, candidate := range flag.Args() {
if tryMatch(candidate) {
continue
}
fs.WalkDir(os.DirFS(candidate), ".", func(fpath string, _ fs.DirEntry, err error) error {
if err != nil {
log.Fatal(err)
}
tryMatch(path.Join(candidate, fpath))
return nil
})
}
}

func tryMatch(candidate string) bool {
if m := sampleFile.FindStringSubmatch(candidate); m != nil {
consumeOldSampleFile(candidate, m[1])
return true
}
if m := sysinfoFile.FindStringSubmatch(candidate); m != nil {
consumeOldSysinfoFile(candidate, m[1])
return true
}
return false
}

func consumeOldSampleFile(fn, hostname string) {
// The file contains zero or more records on the old sample format. v0.6.0 and earlier are
// ignored.
f, err := os.Open(fn)
if err != nil {
log.Fatal(err)
}
defer f.Close()
r := csv.NewReader(f)
r.FieldsPerRecord = -1 // variable
var lineno int
for {
lineno++
fields, err := r.Read()
if err != nil {
log.Fatalf("Error on line %d: %v", lineno, err)
}
if len(fields) == 0 {
continue
}
// Old-format records start with the digit `2` (initial digit in timestamp)
if strings.HasPrefix(fields[0], "2") {
continue
}
record := new(oldfmt.SampleEnvelope)
var foundVersion bool
for _, fld := range fields {
key, value, found := strings.Cut(fld, "=")
if !found {
continue
}
switch key {
case "cores":
case "cmd":
case "cpu%":
case "cpukib":
case "cputime_sec":
case "gpu%":
case "gpufail":
case "gpuinfo":
case "gpukib":
case "gpumem%":
case "gpus":
case "host":
case "job":
case "load":
case "memtotalkib":
case "pid":
case "ppid":
case "rssanonkib":
case "rolledup":
case "time":
case "user":
case "v":
foundVersion = true
record.Version = value
}
}
if !foundVersion {
// TODO: More conditions
continue
}
consumeOldSample(record)
}
}

func consumeOldSysinfoFile(fn, hostname string) {
// The file contains zero or more records on the old sysinfo format, they are not
// comma-separated or in an array. So we must decode one at a time
f, err := os.Open(fn)
if err != nil {
log.Fatal(err)
}
defer f.Close()
dec := json.NewDecoder(f)
for dec.More() {
info := new(oldfmt.Sysinfo)
err := dec.Decode(info)
if err != nil {
log.Fatal(err)
}
consumeOldSysinfo(info)
}
}

// The following are the same as in the ingest-kafka code

func consumeOldSysinfo(info *oldfmt.Sysinfo) {
fmt.Printf("Sysinfo for %s %s\n", info.Hostname, info.Timestamp)
fmt.Printf(" Desc %s\n CpuCores %d\n MemGB %d\n", info.Description, info.CpuCores, info.MemGB)
}

func consumeOldSample(info *oldfmt.SampleEnvelope) {
fmt.Printf("Sample for %s %s\n", info.Hostname, info.Timestamp)
for _, s := range info.Samples {
if s.CpuTimeSec > 100 {
fmt.Printf(" `%s` %d\n", s.Cmd, s.CpuTimeSec)
}
}
}

38 changes: 26 additions & 12 deletions util/ingest-kafka/ingest-kafka.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// This will listen for `sample` and `sysinfo` traffic from a kafka broker, and "do something"
// with the data. See comments in ../../tests/daemon-local.cfg for an example of how to use this.
// `ingest-kafka` will listen for `sample` and `sysinfo` traffic from a local kafka broker, and "do
// something" with the data. See comments in ../../tests/daemon-local.cfg for an example of how to
// use this.
//
// Currently "do something" is print stuff on stdout, but eventually it may mean placing the data
// in a database.
Expand All @@ -9,6 +10,7 @@ package main
import (
"context"
"encoding/json"
"flag"
"fmt"

"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -21,24 +23,36 @@ const (
clusterName = "akebakken.no"
)

var (
broker = flag.String("broker", "localhost:9092", "Broker `host:port`")
verbose = flag.Bool("v", false, "Verbose")
)

func main() {
flag.Parse()
cl, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.SeedBrokers(*broker),
kgo.ConsumerGroup("sonar-ingest"),
kgo.ConsumeTopics(clusterName+".sample", clusterName+".sysinfo"),
)
if err != nil {
panic(err)
}
defer cl.Close()
println("Connected")
if *verbose {
println("Connected")
}

ctx := context.Background()

for {
println("Fetching")
if *verbose {
println("Fetching")
}
fetches := cl.PollFetches(ctx)
println("Fetched")
if *verbose {
println("Fetched")
}
if errs := fetches.Errors(); len(errs) > 0 {
// All errors are retried internally when fetching, but non-retriable errors are
// returned from polls so that users can notice and take action.
Expand All @@ -50,15 +64,15 @@ func main() {
record := iter.Next()
switch record.Topic {
case clusterName + ".sample":
var info oldfmt.SampleEnvelope
err := json.Unmarshal(record.Value, &info)
info := new(oldfmt.SampleEnvelope)
err := json.Unmarshal(record.Value, info)
if err != nil {
panic(err)
}
consumeOldSample(info)
case clusterName + ".sysinfo":
var info oldfmt.Sysinfo
err := json.Unmarshal(record.Value, &info)
info := new(oldfmt.Sysinfo)
err := json.Unmarshal(record.Value, info)
if err != nil {
panic(err)
}
Expand All @@ -73,7 +87,7 @@ func main() {
}
}

func consumeOldSample(info oldfmt.SampleEnvelope) {
func consumeOldSample(info *oldfmt.SampleEnvelope) {
fmt.Printf("Sample for %s %s\n", info.Hostname, info.Timestamp)
for _, s := range info.Samples {
if s.CpuTimeSec > 100 {
Expand All @@ -82,7 +96,7 @@ func consumeOldSample(info oldfmt.SampleEnvelope) {
}
}

func consumeOldSysinfo(info oldfmt.Sysinfo) {
func consumeOldSysinfo(info *oldfmt.Sysinfo) {
fmt.Printf("Sysinfo for %s %s\n", info.Hostname, info.Timestamp)
fmt.Printf(" Desc %s\n CpuCores %d\n MemGB %d\n", info.Description, info.CpuCores, info.MemGB)
}

0 comments on commit 197ee93

Please sign in to comment.