move parsers to internal library
many fixes related to previous changes added import command
This commit is contained in:
parent
c4bdf122a8
commit
0c8a25a2f4
|
@ -1,21 +1,148 @@
|
||||||
package cli
|
package cli
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/kschamplin/gotelem"
|
"github.com/kschamplin/gotelem/internal/db"
|
||||||
|
"github.com/kschamplin/gotelem/internal/logparsers"
|
||||||
|
"github.com/kschamplin/gotelem/skylab"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var parsersString string
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
subCmds = append(subCmds, clientCmd)
|
subCmds = append(subCmds, clientCmd)
|
||||||
|
parsersString = func() string {
|
||||||
|
// create a string like "'telem', 'candump', 'anotherparser'"
|
||||||
|
keys := make([]string, len(logparsers.ParsersMap))
|
||||||
|
i := 0
|
||||||
|
for k := range logparsers.ParsersMap {
|
||||||
|
keys[i] = k
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
s := strings.Join(keys, "', '")
|
||||||
|
return "'" + s + "'"
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var importCmd = &cli.Command{
|
||||||
|
Name: "import",
|
||||||
|
Aliases: []string{"i"},
|
||||||
|
Usage: "import a log file into a database",
|
||||||
|
ArgsUsage: "[log file]",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "format",
|
||||||
|
Aliases: []string{"f"},
|
||||||
|
Usage: "the format of the log file. One of " + parsersString,
|
||||||
|
Value: "telem",
|
||||||
|
},
|
||||||
|
&cli.PathFlag{
|
||||||
|
Name: "database",
|
||||||
|
Aliases: []string{"d", "db"},
|
||||||
|
Usage: "the path of the database",
|
||||||
|
Value: "./gotelem.db",
|
||||||
|
},
|
||||||
|
&cli.UintFlag{
|
||||||
|
Name: "batch-size",
|
||||||
|
Usage: "the maximum size of each SQL transaction",
|
||||||
|
Value: 50,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Action: importAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
func importAction(ctx *cli.Context) error {
|
||||||
|
path := ctx.Args().Get(0)
|
||||||
|
if path == "" {
|
||||||
|
fmt.Println("missing log file!")
|
||||||
|
cli.ShowAppHelpAndExit(ctx, -1)
|
||||||
|
}
|
||||||
|
fstream, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fReader := bufio.NewReader(fstream)
|
||||||
|
|
||||||
|
pfun, ok := logparsers.ParsersMap[ctx.String("format")]
|
||||||
|
if !ok {
|
||||||
|
fmt.Println("invalid format provided: must be one of " + parsersString)
|
||||||
|
cli.ShowAppHelpAndExit(ctx, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
dbPath := ctx.Path("database")
|
||||||
|
db, err := db.OpenTelemDb(dbPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error opening database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// we should batch data, avoiding individual transactions to the database.
|
||||||
|
bSize := ctx.Uint("batch-size")
|
||||||
|
eventsBatch := make([]*skylab.BusEvent, bSize)
|
||||||
|
|
||||||
|
batchIdx := 0
|
||||||
|
|
||||||
|
// stats for imports
|
||||||
|
n_packets := 0
|
||||||
|
n_unknown := 0
|
||||||
|
n_error := 0
|
||||||
|
for {
|
||||||
|
line, err := fReader.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break // end of file, go to the flush sequence
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f, err := pfun(line)
|
||||||
|
var idErr *skylab.UnknownIdError
|
||||||
|
if errors.As(err, &idErr) {
|
||||||
|
// unknown id
|
||||||
|
fmt.Printf("unknown id %v\n", idErr.Error())
|
||||||
|
n_unknown++
|
||||||
|
continue
|
||||||
|
} else if err != nil {
|
||||||
|
// TODO: we should consider absorbing all errors.
|
||||||
|
fmt.Printf("got an error %v\n", err)
|
||||||
|
n_error++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
n_packets++
|
||||||
|
eventsBatch[batchIdx] = f
|
||||||
|
batchIdx++
|
||||||
|
if batchIdx >= int(bSize) {
|
||||||
|
// flush it!!!!
|
||||||
|
err = db.AddEventsCtx(ctx.Context, eventsBatch...)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error adding to database %v\n", err)
|
||||||
|
}
|
||||||
|
batchIdx = 0 // reset the batch
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// check if we have remaining packets and flush them
|
||||||
|
if batchIdx > 0 {
|
||||||
|
err = db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here!
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error adding to database %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_packets, n_unknown, n_error)
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var clientCmd = &cli.Command{
|
var clientCmd = &cli.Command{
|
||||||
Name: "client",
|
Name: "client",
|
||||||
Aliases: []string{"c"},
|
Aliases: []string{"c"},
|
||||||
Usage: "interact with a gotelem server",
|
Subcommands: []*cli.Command{importCmd},
|
||||||
ArgsUsage: "[server url]",
|
Usage: "Client utilities and tools",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
Name: "gui",
|
Name: "gui",
|
||||||
|
@ -24,13 +151,11 @@ var clientCmd = &cli.Command{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Description: `
|
Description: `
|
||||||
Connects to a gotelem server or relay. Can be used to
|
Connects to a gotelem server or relay. Also acts as a helper command line tool.
|
||||||
`,
|
`,
|
||||||
Action: client,
|
Action: client,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func client(ctx *cli.Context) error {
|
func client(ctx *cli.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,8 +33,9 @@ var serveFlags = []cli.Flag{
|
||||||
},
|
},
|
||||||
&cli.PathFlag{
|
&cli.PathFlag{
|
||||||
Name: "db",
|
Name: "db",
|
||||||
Value: "gotelem.db",
|
Aliases: []string{"d"},
|
||||||
Usage: "database to serve",
|
DefaultText: "gotelem.db",
|
||||||
|
Usage: "database to serve, if not specified will use memory",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,7 +162,6 @@ func (r *rpcService) Start(ctx *cli.Context, deps svcDeps) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-chan struct{}) {
|
func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-chan struct{}) {
|
||||||
// reader := msgp.NewReader(conn)
|
|
||||||
|
|
||||||
subname := fmt.Sprint("tcp", conn.RemoteAddr().String())
|
subname := fmt.Sprint("tcp", conn.RemoteAddr().String())
|
||||||
|
|
||||||
|
@ -353,7 +353,7 @@ func (d *dbLoggingService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-rxCh:
|
case msg := <-rxCh:
|
||||||
tdb.AddEventsCtx(cCtx.Context, msg)
|
tdb.AddEventsCtx(cCtx.Context, &msg)
|
||||||
case <-cCtx.Done():
|
case <-cCtx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ package cli
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/kschamplin/gotelem"
|
"github.com/kschamplin/gotelem/internal/can"
|
||||||
"github.com/kschamplin/gotelem/skylab"
|
"github.com/kschamplin/gotelem/skylab"
|
||||||
"github.com/kschamplin/gotelem/socketcan"
|
"github.com/kschamplin/gotelem/socketcan"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
|
@ -77,7 +77,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
defer broker.Unsubscribe("socketCAN")
|
defer broker.Unsubscribe("socketCAN")
|
||||||
|
|
||||||
// make a channel to receive socketCAN frames.
|
// make a channel to receive socketCAN frames.
|
||||||
rxCan := make(chan gotelem.Frame)
|
rxCan := make(chan can.Frame)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
@ -89,27 +89,25 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var frame gotelem.Frame
|
var frame can.Frame
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-rxCh:
|
case msg := <-rxCh:
|
||||||
|
|
||||||
id, d, _ := skylab.ToCanFrame(msg.Data)
|
frame, err = skylab.ToCanFrame(msg.Data)
|
||||||
|
|
||||||
frame.Id = id
|
|
||||||
frame.Data = d
|
|
||||||
|
|
||||||
s.sock.Send(&frame)
|
s.sock.Send(&frame)
|
||||||
|
|
||||||
case msg := <-rxCan:
|
case msg := <-rxCan:
|
||||||
p, err := skylab.FromCanFrame(msg.Id, msg.Data)
|
p, err := skylab.FromCanFrame(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("error parsing can packet", "id", msg.Id)
|
logger.Warn("error parsing can packet", "id", msg.Id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cde := skylab.BusEvent{
|
cde := skylab.BusEvent{
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
Id: msg.Id,
|
Name: p.String(),
|
||||||
Data: p,
|
Data: p,
|
||||||
}
|
}
|
||||||
broker.Publish("socketCAN", cde)
|
broker.Publish("socketCAN", cde)
|
||||||
|
|
|
@ -2,19 +2,15 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/hex"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/kschamplin/gotelem/internal/can"
|
"github.com/kschamplin/gotelem/internal/logparsers"
|
||||||
"github.com/kschamplin/gotelem/skylab"
|
"github.com/kschamplin/gotelem/skylab"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
"golang.org/x/exp/slog"
|
"golang.org/x/exp/slog"
|
||||||
|
@ -46,6 +42,17 @@ required for piping candump into skylabify. Likewise, data should be stored with
|
||||||
-l.
|
-l.
|
||||||
|
|
||||||
`
|
`
|
||||||
|
parsersString := func() string {
|
||||||
|
// create a string like "'telem', 'candump', 'anotherparser'"
|
||||||
|
keys := make([]string, len(logparsers.ParsersMap))
|
||||||
|
i := 0
|
||||||
|
for k := range logparsers.ParsersMap {
|
||||||
|
keys[i] = k
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
s := strings.Join(keys, "', '")
|
||||||
|
return "'" + s + "'"
|
||||||
|
}()
|
||||||
|
|
||||||
app.Flags = []cli.Flag{
|
app.Flags = []cli.Flag{
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
|
@ -55,7 +62,7 @@ required for piping candump into skylabify. Likewise, data should be stored with
|
||||||
&cli.StringFlag{
|
&cli.StringFlag{
|
||||||
Name: "format",
|
Name: "format",
|
||||||
Aliases: []string{"f"},
|
Aliases: []string{"f"},
|
||||||
Usage: "the format of the incoming data. One of 'telem', 'candump'",
|
Usage: "the format of the incoming data. One of " + parsersString,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,150 +72,6 @@ required for piping candump into skylabify. Likewise, data should be stored with
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A FormatError is an error when parsing a format. Typically we simply ignore
|
|
||||||
// these and move on, but they can optionally wrap another error that is fatal.
|
|
||||||
type FormatError struct {
|
|
||||||
msg string
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *FormatError) Error() string {
|
|
||||||
return fmt.Sprintf("%s:%s", e.msg, e.err.Error())
|
|
||||||
}
|
|
||||||
func (e *FormatError) Unwrap() error {
|
|
||||||
return e.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewFormatError(msg string, err error) error {
|
|
||||||
return &FormatError{msg: msg, err: err}
|
|
||||||
}
|
|
||||||
|
|
||||||
// A Parser takes a string containing one line of a particular log file
|
|
||||||
// and returns an associated skylab.BusEvent representing the packet.
|
|
||||||
// if no packet is found, an error is returned instead.
|
|
||||||
type ParserFunc func(string) (skylab.BusEvent, error)
|
|
||||||
|
|
||||||
func parseCanDumpLine(dumpLine string) (b skylab.BusEvent, err error) {
|
|
||||||
// dumpline looks like this:
|
|
||||||
// (1684538768.521889) can0 200#8D643546
|
|
||||||
// remove trailing newline
|
|
||||||
dumpLine = strings.TrimSpace(dumpLine)
|
|
||||||
segments := strings.Split(dumpLine, " ")
|
|
||||||
|
|
||||||
var unixSeconds, unixMicros int64
|
|
||||||
fmt.Sscanf(segments[0], "(%d.%d)", &unixSeconds, &unixMicros)
|
|
||||||
b.Timestamp = time.Unix(unixSeconds, unixMicros)
|
|
||||||
|
|
||||||
// now we extract the remaining data:
|
|
||||||
hexes := strings.Split(segments[2], "#") // first portion is id, second is data
|
|
||||||
|
|
||||||
id, err := strconv.ParseUint(hexes[0], 16, 64)
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to parse id", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if (len(hexes[1]) % 2) != 0 {
|
|
||||||
err = NewFormatError("odd number of hex characters", nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
rawData, err := hex.DecodeString(hexes[1])
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to decode hex data", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
frame := can.Frame{
|
|
||||||
// TODO: fix extended ids. we assume not extended for now.
|
|
||||||
Id: can.CanID{Id: uint32(id), Extended: false},
|
|
||||||
Data: rawData,
|
|
||||||
Kind: can.CanDataFrame,
|
|
||||||
}
|
|
||||||
|
|
||||||
b.Data, err = skylab.FromCanFrame(frame)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to parse can frame", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the name
|
|
||||||
b.Name = b.Data.String()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseTelemLogLine(line string) (b skylab.BusEvent, err error) {
|
|
||||||
// strip trailng newline since we rely on it being gone
|
|
||||||
line = strings.TrimSpace(line)
|
|
||||||
// data is of the form
|
|
||||||
// 1698180835.318 0619D80564080EBE241
|
|
||||||
// the second part there is 3 nibbles (12 bits, 3 hex chars) for can ID,
|
|
||||||
// the rest is data.
|
|
||||||
// this regex does the processing.
|
|
||||||
r := regexp.MustCompile(`^(\d+).(\d{3}) (\w{3})(\w+)$`)
|
|
||||||
|
|
||||||
// these files tend to get corrupted. there are all kinds of nasties that can happen.
|
|
||||||
// defense against random panics
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
err = NewFormatError("caught panic", nil)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
a := r.FindStringSubmatch(line)
|
|
||||||
if a == nil {
|
|
||||||
err = NewFormatError("no regex match", nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var unixSeconds, unixMillis int64
|
|
||||||
// note that a contains 5 elements, the first being the full match.
|
|
||||||
// so we start from the second element
|
|
||||||
unixSeconds, err = strconv.ParseInt(a[1], 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to parse unix seconds", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
unixMillis, err = strconv.ParseInt(a[2], 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to parse unix millis", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ts := time.Unix(unixSeconds, unixMillis*1e6)
|
|
||||||
|
|
||||||
id, err := strconv.ParseUint(a[3], 16, 16)
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to parse id", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(a[4])%2 != 0 {
|
|
||||||
// odd hex chars, protect against a panic
|
|
||||||
err = NewFormatError("wrong amount of hex chars", nil)
|
|
||||||
}
|
|
||||||
rawData, err := hex.DecodeString(a[4])
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to parse hex data", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
frame := can.Frame{
|
|
||||||
Id: can.CanID{Id: uint32(id), Extended: false},
|
|
||||||
Data: rawData,
|
|
||||||
Kind: can.CanDataFrame,
|
|
||||||
}
|
|
||||||
b.Timestamp = ts
|
|
||||||
b.Data, err = skylab.FromCanFrame(frame)
|
|
||||||
if err != nil {
|
|
||||||
err = NewFormatError("failed to parse can frame", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b.Name = b.Data.String()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var parseMap = map[string]ParserFunc{
|
|
||||||
"telem": parseTelemLogLine,
|
|
||||||
"candump": parseCanDumpLine,
|
|
||||||
}
|
|
||||||
|
|
||||||
func run(ctx *cli.Context) (err error) {
|
func run(ctx *cli.Context) (err error) {
|
||||||
path := ctx.Args().Get(0)
|
path := ctx.Args().Get(0)
|
||||||
|
@ -229,9 +92,9 @@ func run(ctx *cli.Context) (err error) {
|
||||||
|
|
||||||
fileReader := bufio.NewReader(istream)
|
fileReader := bufio.NewReader(istream)
|
||||||
|
|
||||||
var pfun ParserFunc
|
var pfun logparsers.ParserFunc
|
||||||
|
|
||||||
pfun, ok := parseMap[ctx.String("format")]
|
pfun, ok := logparsers.ParsersMap[ctx.String("format")]
|
||||||
if !ok {
|
if !ok {
|
||||||
fmt.Println("invalid format!")
|
fmt.Println("invalid format!")
|
||||||
cli.ShowAppHelpAndExit(ctx, int(syscall.EINVAL))
|
cli.ShowAppHelpAndExit(ctx, int(syscall.EINVAL))
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
// Code generated by "stringer -output=frame_kind.go -type Kind"; DO NOT EDIT.
|
|
||||||
|
|
||||||
package gotelem
|
|
||||||
|
|
||||||
import "strconv"
|
|
||||||
|
|
||||||
func _() {
|
|
||||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
|
||||||
// Re-run the stringer command to generate them again.
|
|
||||||
var x [1]struct{}
|
|
||||||
_ = x[CanSFFFrame-0]
|
|
||||||
_ = x[CanEFFFrame-1]
|
|
||||||
_ = x[CanRTRFrame-2]
|
|
||||||
_ = x[CanErrFrame-3]
|
|
||||||
}
|
|
||||||
|
|
||||||
const _Kind_name = "CanSFFFrameCanEFFFrameCanRTRFrameCanErrFrame"
|
|
||||||
|
|
||||||
var _Kind_index = [...]uint8{0, 11, 22, 33, 44}
|
|
||||||
|
|
||||||
func (i Kind) String() string {
|
|
||||||
if i >= Kind(len(_Kind_index)-1) {
|
|
||||||
return "Kind(" + strconv.FormatInt(int64(i), 10) + ")"
|
|
||||||
}
|
|
||||||
return _Kind_name[_Kind_index[i]:_Kind_index[i+1]]
|
|
||||||
}
|
|
13
http.go
13
http.go
|
@ -46,8 +46,6 @@ func TelemRouter(log *slog.Logger, broker *Broker, db *db.TelemDb) http.Handler
|
||||||
// To future residents - you can add new API calls/systems in /api/v2
|
// To future residents - you can add new API calls/systems in /api/v2
|
||||||
// Don't break anything in api v1! keep legacy code working!
|
// Don't break anything in api v1! keep legacy code working!
|
||||||
|
|
||||||
// serve up a local status page.
|
|
||||||
|
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,7 +67,7 @@ func apiV1(broker *Broker, db *db.TelemDb) chi.Router {
|
||||||
r.Route("/packets", func(r chi.Router) {
|
r.Route("/packets", func(r chi.Router) {
|
||||||
r.Get("/subscribe", apiV1PacketSubscribe(broker, db))
|
r.Get("/subscribe", apiV1PacketSubscribe(broker, db))
|
||||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
|
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
var pkgs []skylab.BusEvent
|
var pkgs []*skylab.BusEvent
|
||||||
decoder := json.NewDecoder(r.Body)
|
decoder := json.NewDecoder(r.Body)
|
||||||
if err := decoder.Decode(&pkgs); err != nil {
|
if err := decoder.Decode(&pkgs); err != nil {
|
||||||
w.WriteHeader(http.StatusTeapot)
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
@ -111,7 +109,7 @@ func apiV1(broker *Broker, db *db.TelemDb) chi.Router {
|
||||||
|
|
||||||
// apiV1Subscriber is a websocket session for the v1 api.
|
// apiV1Subscriber is a websocket session for the v1 api.
|
||||||
type apiV1Subscriber struct {
|
type apiV1Subscriber struct {
|
||||||
idFilter []uint32 // list of Ids to subscribe to. If it's empty, subscribes to all.
|
nameFilter []string // names of packets we care about.
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a websocket stream.
|
// this is a websocket stream.
|
||||||
|
@ -143,13 +141,14 @@ func apiV1PacketSubscribe(broker *Broker, db *db.TelemDb) http.HandlerFunc {
|
||||||
case <-r.Context().Done():
|
case <-r.Context().Done():
|
||||||
return
|
return
|
||||||
case msgIn := <-sub:
|
case msgIn := <-sub:
|
||||||
if len(sess.idFilter) == 0 {
|
if len(sess.nameFilter) == 0 {
|
||||||
// send it.
|
// send it.
|
||||||
wsjson.Write(r.Context(), c, msgIn)
|
wsjson.Write(r.Context(), c, msgIn)
|
||||||
}
|
}
|
||||||
for _, id := range sess.idFilter {
|
for _, name := range sess.nameFilter {
|
||||||
if id == msgIn.Id {
|
if name == msgIn.Name {
|
||||||
// send it
|
// send it
|
||||||
|
wsjson.Write(r.Context(), c, msgIn)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -72,11 +71,11 @@ func (tdb *TelemDb) SetVersion(version int) error {
|
||||||
|
|
||||||
// sql expression to insert a bus event into the packets database.1
|
// sql expression to insert a bus event into the packets database.1
|
||||||
const sqlInsertEvent = `
|
const sqlInsertEvent = `
|
||||||
INSERT INTO "bus_events" (ts, name, data) VALUES ($1, $2, json($3));
|
INSERT INTO "bus_events" (ts, name, data) VALUES
|
||||||
`
|
`
|
||||||
|
|
||||||
// AddEvent adds the bus event to the database.
|
// AddEvent adds the bus event to the database.
|
||||||
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (err error) {
|
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...*skylab.BusEvent) (err error) {
|
||||||
//
|
//
|
||||||
tx, err := tdb.db.BeginTx(ctx, nil)
|
tx, err := tdb.db.BeginTx(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -118,12 +117,11 @@ func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (err error) {
|
func (tdb *TelemDb) AddEvents(events ...*skylab.BusEvent) (err error) {
|
||||||
|
|
||||||
return tdb.AddEventsCtx(context.Background(), events...)
|
return tdb.AddEventsCtx(context.Background(), events...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Query fragment guide:
|
/// Query fragment guide:
|
||||||
/// We need to be able to easily construct safe(!) and meaningful queries programatically
|
/// We need to be able to easily construct safe(!) and meaningful queries programatically
|
||||||
/// so we make some new types that can be turned into SQL fragments that go inside the where clause.
|
/// so we make some new types that can be turned into SQL fragments that go inside the where clause.
|
||||||
|
|
168
internal/logparsers/parsers.go
Normal file
168
internal/logparsers/parsers.go
Normal file
|
@ -0,0 +1,168 @@
|
||||||
|
package logparsers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/kschamplin/gotelem/internal/can"
|
||||||
|
"github.com/kschamplin/gotelem/skylab"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A FormatError is an error when parsing a format. Typically we simply ignore
|
||||||
|
// these and move on, but they can optionally wrap another error that is fatal.
|
||||||
|
type FormatError struct {
|
||||||
|
msg string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *FormatError) Error() string {
|
||||||
|
if e.err != nil {
|
||||||
|
return fmt.Sprintf("%s:%s", e.msg, e.err.Error())
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s", e.msg)
|
||||||
|
|
||||||
|
}
|
||||||
|
func (e *FormatError) Unwrap() error {
|
||||||
|
return e.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFormatError(msg string, err error) error {
|
||||||
|
return &FormatError{msg: msg, err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A Parser takes a string containing one line of a particular log file
|
||||||
|
// and returns an associated skylab.BusEvent representing the packet.
|
||||||
|
// if no packet is found, an error is returned instead.
|
||||||
|
type ParserFunc func(string) (*skylab.BusEvent, error)
|
||||||
|
|
||||||
|
func parseCanDumpLine(dumpLine string) (b *skylab.BusEvent, err error) {
|
||||||
|
// dumpline looks like this:
|
||||||
|
// (1684538768.521889) can0 200#8D643546
|
||||||
|
// remove trailing newline
|
||||||
|
dumpLine = strings.TrimSpace(dumpLine)
|
||||||
|
segments := strings.Split(dumpLine, " ")
|
||||||
|
|
||||||
|
var unixSeconds, unixMicros int64
|
||||||
|
fmt.Sscanf(segments[0], "(%d.%d)", &unixSeconds, &unixMicros)
|
||||||
|
|
||||||
|
// now we extract the remaining data:
|
||||||
|
hexes := strings.Split(segments[2], "#") // first portion is id, second is data
|
||||||
|
|
||||||
|
id, err := strconv.ParseUint(hexes[0], 16, 64)
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to parse id", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (len(hexes[1]) % 2) != 0 {
|
||||||
|
err = NewFormatError("odd number of hex characters", nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rawData, err := hex.DecodeString(hexes[1])
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to decode hex data", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
frame := can.Frame{
|
||||||
|
// TODO: fix extended ids. we assume not extended for now.
|
||||||
|
Id: can.CanID{Id: uint32(id), Extended: false},
|
||||||
|
Data: rawData,
|
||||||
|
Kind: can.CanDataFrame,
|
||||||
|
}
|
||||||
|
|
||||||
|
b = &skylab.BusEvent{
|
||||||
|
Timestamp: time.Unix(unixSeconds, unixMicros),
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Data, err = skylab.FromCanFrame(frame)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to parse can frame", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the name
|
||||||
|
b.Name = b.Data.String()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTelemLogLine(line string) (b *skylab.BusEvent, err error) {
|
||||||
|
// strip trailng newline since we rely on it being gone
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
// data is of the form
|
||||||
|
// 1698180835.318 0619D80564080EBE241
|
||||||
|
// the second part there is 3 nibbles (12 bits, 3 hex chars) for can ID,
|
||||||
|
// the rest is data.
|
||||||
|
// this regex does the processing.
|
||||||
|
r := regexp.MustCompile(`^(\d+).(\d{3}) (\w{3})(\w+)$`)
|
||||||
|
|
||||||
|
// these files tend to get corrupted. there are all kinds of nasties that can happen.
|
||||||
|
// defense against random panics
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
err = NewFormatError("caught panic", nil)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
a := r.FindStringSubmatch(line)
|
||||||
|
if a == nil || len(a) != 5 {
|
||||||
|
err = NewFormatError("no regex match", nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var unixSeconds, unixMillis int64
|
||||||
|
// note that a contains 5 elements, the first being the full match.
|
||||||
|
// so we start from the second element
|
||||||
|
unixSeconds, err = strconv.ParseInt(a[1], 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to parse unix seconds", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
unixMillis, err = strconv.ParseInt(a[2], 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to parse unix millis", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ts := time.Unix(unixSeconds, unixMillis*1e6)
|
||||||
|
|
||||||
|
id, err := strconv.ParseUint(a[3], 16, 16)
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to parse id", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(a[4])%2 != 0 {
|
||||||
|
// odd hex chars, protect against a panic
|
||||||
|
err = NewFormatError("wrong amount of hex chars", nil)
|
||||||
|
}
|
||||||
|
rawData, err := hex.DecodeString(a[4])
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to parse hex data", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
frame := can.Frame{
|
||||||
|
Id: can.CanID{Id: uint32(id), Extended: false},
|
||||||
|
Data: rawData,
|
||||||
|
Kind: can.CanDataFrame,
|
||||||
|
}
|
||||||
|
|
||||||
|
b = &skylab.BusEvent{
|
||||||
|
Timestamp: ts,
|
||||||
|
}
|
||||||
|
b.Data, err = skylab.FromCanFrame(frame)
|
||||||
|
if err != nil {
|
||||||
|
err = NewFormatError("failed to parse can frame", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.Name = b.Data.String()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var ParsersMap = map[string]ParserFunc{
|
||||||
|
"telem": parseTelemLogLine,
|
||||||
|
"candump": parseCanDumpLine,
|
||||||
|
}
|
|
@ -3,6 +3,9 @@ from typing import Dict, Optional, List, Union
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from attrs import define, field, validators
|
from attrs import define, field, validators
|
||||||
|
|
||||||
|
# we define a validator for our names - alphanumeric and underscores
|
||||||
|
# most things can't support numbers as the first character, so we don't either.
|
||||||
|
name_validator = validators.matches_re(r"^[A-Za-z_][A-Za-z0-9_]?$")
|
||||||
|
|
||||||
@define
|
@define
|
||||||
class Bus():
|
class Bus():
|
||||||
|
@ -58,7 +61,7 @@ class FieldType(str, Enum):
|
||||||
|
|
||||||
@define
|
@define
|
||||||
class CustomTypeDef():
|
class CustomTypeDef():
|
||||||
name: str
|
name: str = field(validator=[name_validator])
|
||||||
base_type: FieldType # should be a strict size
|
base_type: FieldType # should be a strict size
|
||||||
values: Union[List[str], Dict[str, int]]
|
values: Union[List[str], Dict[str, int]]
|
||||||
|
|
||||||
|
@ -66,11 +69,11 @@ class CustomTypeDef():
|
||||||
@define
|
@define
|
||||||
class BitfieldBit():
|
class BitfieldBit():
|
||||||
"micro class to represent one bit in bitfields"
|
"micro class to represent one bit in bitfields"
|
||||||
name: str
|
name: str = field(validator=[name_validator])
|
||||||
|
|
||||||
@define
|
@define
|
||||||
class Field():
|
class Field():
|
||||||
name: str = field(validator=[validators.matches_re(r"^[A-Za-z0-9_]+$")])
|
name: str = field(validator=[name_validator])
|
||||||
type: FieldType
|
type: FieldType
|
||||||
|
|
||||||
#metadata
|
#metadata
|
||||||
|
@ -80,7 +83,7 @@ class Field():
|
||||||
|
|
||||||
@define
|
@define
|
||||||
class BitField():
|
class BitField():
|
||||||
name: str = field(validator=[validators.matches_re(r"^[A-Za-z0-9_]+$")])
|
name: str = field(validator=[name_validator])
|
||||||
type: str = field(default="bitfield", init=False) # it's a constant value
|
type: str = field(default="bitfield", init=False) # it's a constant value
|
||||||
bits: List[BitfieldBit]
|
bits: List[BitfieldBit]
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
// this is needed so that we can run make_skylab.go
|
// this is needed so that we can run make_skylab.go
|
||||||
// without this, the yaml library will be removed
|
// without this, the yaml library will be removed
|
||||||
// when we run `go mod tidy`
|
// when we run `go mod tidy`
|
||||||
|
"github.com/kschamplin/gotelem/internal/can"
|
||||||
_ "gopkg.in/yaml.v3"
|
_ "gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,7 +63,7 @@ type Unmarshaler interface {
|
||||||
|
|
||||||
// Ider is a packet that can get its ID, based on the index of the packet, if any.
|
// Ider is a packet that can get its ID, based on the index of the packet, if any.
|
||||||
type Ider interface {
|
type Ider interface {
|
||||||
CANId() (uint32, error)
|
CanId() (can.CanID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sizer allows for fast allocation.
|
// Sizer allows for fast allocation.
|
||||||
|
@ -71,13 +72,15 @@ type Sizer interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
// CanSend takes a packet and makes CAN framing data.
|
// CanSend takes a packet and makes CAN framing data.
|
||||||
func ToCanFrame(p Packet) (id uint32, data []byte, err error) {
|
func ToCanFrame(p Packet) (f can.Frame, err error) {
|
||||||
|
|
||||||
id, err = p.CANId()
|
|
||||||
|
f.Id, err = p.CanId()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
data, err = p.MarshalPacket()
|
f.Data, err = p.MarshalPacket()
|
||||||
|
f.Kind = can.CanDataFrame
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -45,15 +45,17 @@ type {{$structName}} struct {
|
||||||
{{- end }}
|
{{- end }}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *{{$structName}}) CANId() (uint32, error) {
|
func (p *{{$structName}}) CanId() (can.CanID, error) {
|
||||||
|
c := can.CanID{Extended: {{.Extended}}}
|
||||||
{{- if .Repeat }}
|
{{- if .Repeat }}
|
||||||
if p.Idx >= {{.Repeat}} {
|
if p.Idx >= {{.Repeat}} {
|
||||||
return 0, &UnknownIdError{ {{ printf "0x%X" .Id }} }
|
return c, &UnknownIdError{ {{ printf "0x%X" .Id }} }
|
||||||
}
|
}
|
||||||
return {{ printf "0x%X" .Id }} + p.Idx, nil
|
c.Id = {{ printf "0x%X" .Id }} + p.Idx
|
||||||
{{- else }}
|
{{- else }}
|
||||||
return {{ printf "0x%X" .Id }}, nil
|
c.Id = {{ printf "0x%X" .Id }}
|
||||||
{{- end }}
|
{{- end }}
|
||||||
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *{{$structName}}) Size() uint {
|
func (p *{{$structName}}) Size() uint {
|
||||||
|
@ -107,10 +109,10 @@ var idMap = map[can.CanID]bool{
|
||||||
{{ range $p := .Packets -}}
|
{{ range $p := .Packets -}}
|
||||||
{{ if $p.Repeat }}
|
{{ if $p.Repeat }}
|
||||||
{{ range $idx := Nx (int $p.Id) $p.Repeat $p.Offset -}}
|
{{ range $idx := Nx (int $p.Id) $p.Repeat $p.Offset -}}
|
||||||
can.CanID{ Id: {{ $idx | printf "0x%X"}}, Extended: {{$p.Extended}} }: true,
|
{ Id: {{ $idx | printf "0x%X"}}, Extended: {{$p.Extended}} }: true,
|
||||||
{{ end }}
|
{{ end }}
|
||||||
{{- else }}
|
{{- else }}
|
||||||
can.CanID{ Id: {{ $p.Id | printf "0x%X" }}, Extended: {{$p.Extended}} }: true,
|
{ Id: {{ $p.Id | printf "0x%X" }}, Extended: {{$p.Extended}} }: true,
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue