diff options
-rw-r--r-- | args.go | 56 | ||||
-rw-r--r-- | exporter-multiplexer.go | 58 | ||||
-rw-r--r-- | metrics.go | 514 | ||||
-rw-r--r-- | part.go | 150 | ||||
-rw-r--r-- | source.go | 104 |
5 files changed, 882 insertions, 0 deletions
@@ -0,0 +1,56 @@ +package main + +import ( + "strings" + "encoding/json" + "gopkg.in/yaml.v2" + "gopkg.in/alecthomas/kingpin.v2" + "github.com/prometheus/prometheus/model/relabel" +) + +func jsonDump(v any) string{ + j, err := json.Marshal(v) + if err != nil{ + return "!!error" + } + return string(j) +} + +type srcListA []string +// kingpin.Value +func (self *srcListA) String() string{ + return jsonDump(self) +} +func (self *srcListA) Set(arg string) error{ + *self = append(*self, arg) + return nil +} +// kingpin.remainderArg +func (*srcListA) IsCumulative() bool{ + return true +} +func argSrcList(s kingpin.Settings) *[]string{ + ret := []string{} + s.SetValue((*srcListA)(&ret)) + return &ret +} + +type relabelA map[string]*[]*relabel.Config +// kingpin.Value +func (self relabelA) String() string{ + return jsonDump(self) +} +func (self relabelA) Set(val string) error{ + va := strings.SplitN(val, "=", 2) + self[va[0]] = &[]*relabel.Config{} + return yaml.UnmarshalStrict(([]byte)(va[1]), self[va[0]]) +} +// kingpin.repeatableFlag +func (relabelA) IsCumulative() bool{ + return true +} +func argRelabel(s kingpin.Settings) map[string]*[]*relabel.Config{ + ret := make(map[string]*[]*relabel.Config) + s.SetValue((relabelA)(ret)) + return ret +} diff --git a/exporter-multiplexer.go b/exporter-multiplexer.go new file mode 100644 index 0000000..c88e01b --- /dev/null +++ b/exporter-multiplexer.go @@ -0,0 +1,58 @@ +// r5f.jp/cgit/prometheus-exporter-multiplexer.git + +package main + +import ( + "fmt" + "log/slog" + "net/http" + logflag "github.com/prometheus/common/promslog/flag" + "gopkg.in/alecthomas/kingpin.v2" + "github.com/prometheus/common/promslog" + tkweb "github.com/prometheus/exporter-toolkit/web" + tkflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" +) + + +var ( + relabelMap = argRelabel(kingpin.Flag( + "relabel", "name=yaml.", + ).Short('r')) + srcList = argSrcList(kingpin.Arg( + "sources", "Metric source.", + ).Required()) + logger *slog.Logger +) + +func main() { + promslogConfig := &promslog.Config{} + logflag.AddFlags(kingpin.CommandLine, promslogConfig) + tkFlag := tkflag.AddFlags(kingpin.CommandLine, ":9199") + kingpin.CommandLine.HelpFlag.Short('h') + kingpin.Parse() + + logger = promslog.New(promslogConfig) + parts, err := PartsFromArgs(*srcList, relabelMap) + if err != nil{ + logger.Error(err.Error()) + return + } + + http.HandleFunc("/metrics", func( + w http.ResponseWriter, r *http.Request, + ){ + out, err := parts.Get() + if err != nil{ + logger.Error(err.Error()) + w.WriteHeader(500) + } + w.Header()["Content-Type"] = []string{"application/openmetrics-text"} + fmt.Fprint(w, out) + }) + + err = tkweb.ListenAndServe(&http.Server{}, tkFlag, logger) + if err != nil{ + logger.Error(err.Error()) + return + } +} diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..74aae15 --- /dev/null +++ b/metrics.go @@ -0,0 +1,514 @@ +package main + +import ( + "fmt" + "cmp" + "slices" + "maps" + "strings" + "strconv" + "errors" + "io" + "mime" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/textparse" +) + +func Escape(src string) string{ + return strings.NewReplacer("\n", `\n`, `"`, `\"`, `\`, `\\`).Replace(src) +} +func SortLabels(ls labels.Labels) labels.Labels{ + ls = ls.Copy() + slices.SortFunc(ls, func(a, b labels.Label) int{ + return cmp.Compare(a.Name, b.Name) + }) + return ls +} + +type Sample struct{ + labels labels.Labels + value float64 + suffix string +} +func (self Sample) cmpString() string{ + return SortLabels(self.labels).String() +} +func (self Sample) String() string{ + ret := "" + firstLabel := 0 + if self.labels.Len() != 0 && self.labels[0].Name == labels.MetricName{ + firstLabel = 1 + ret += self.labels[0].Value + } + if len(self.labels[firstLabel:]) > 0{ + ret += "{" + for i, l := range self.labels[firstLabel:]{ + if i != 0{ + ret += "," + } + ret += l.Name + "=" + `"` + Escape(l.Value) + `"` + } + ret += "}" + } + ret += fmt.Sprintf(" %g%s", self.value, self.suffix) + return ret +} + +type MetricFamily struct{ + name string + ty model.MetricType + unit string + help string +} +type MetricSampleLabelType int +const ( + MetricSampleLabelTypeUnspec= MetricSampleLabelType(0) + MetricSampleLabelTypeFloat = MetricSampleLabelType(1) + MetricSampleLabelTypeStr = MetricSampleLabelType(2) +) +type MetricSampleLabel struct{ + Name string + Type MetricSampleLabelType +} +type MetricSampleSet struct{ + Name string + Labels []MetricSampleLabel +} +func (self MetricFamily) Header() string{ + ret := "" + ret += fmt.Sprintf("# TYPE %s %s\n", self.name, self.ty) + if self.unit != ""{ + ret += fmt.Sprintf("# UNIT %s %s\n", self.name, self.unit) + } + if self.help != ""{ + ret += fmt.Sprintf("# HELP %s %s\n", self.name, Escape(self.help)) + } + return ret +} +func (self MetricFamily) MetricNamesLabels() []MetricSampleSet{ + switch self.ty{ + case model.MetricTypeCounter: + return []MetricSampleSet{ + {Name: self.name + "_total"}, + {Name: self.name + "_created"}} + case model.MetricTypeGauge: + return []MetricSampleSet{{Name: self.name}} + case model.MetricTypeHistogram: + return []MetricSampleSet{ + {Name: self.name + "_bucket", + Labels: []MetricSampleLabel{ + {Name: "le", Type: MetricSampleLabelTypeFloat}, + }}, + {Name: self.name + "_sum"}, + {Name: self.name + "_created"}, + {Name: self.name + "_count"}} + case model.MetricTypeGaugeHistogram: + return []MetricSampleSet{ + {Name: self.name + "_bucket", + Labels: []MetricSampleLabel{ + {Name: "le", Type: MetricSampleLabelTypeFloat}, + }}, + {Name: self.name + "_gsum"}, + {Name: self.name + "_gcount"}} + case model.MetricTypeSummary: + return []MetricSampleSet{ + {Name: self.name + "_sum"}, + {Name: self.name + "_count"}, + {Name: self.name + "_created"}, + {Name: self.name, Labels: []MetricSampleLabel{{Name: "quantile"}}}} + case model.MetricTypeInfo: + return []MetricSampleSet{{Name: self.name + "_info"}} + case model.MetricTypeStateset: + return []MetricSampleSet{{ + Name: self.name, + Labels: []MetricSampleLabel{ + {Name: self.name, Type: MetricSampleLabelTypeStr}, + }}} + default: + return []MetricSampleSet{{Name: self.name}} + } +} +func (self MetricFamily) MetricNames() []string{ + nl := self.MetricNamesLabels() + ret := make([]string, 0, len(nl)) + for _, nle := range nl{ + ret = append(ret, nle.Name) + } + return ret +} +func (self MetricFamily) CheckMetricName(name string) bool{ + return slices.Contains(self.MetricNames(), name) +} +func (self *MetricFamily) Merge(other MetricFamily){ + // assert(self.name == other.name) + if self.unit == ""{ + self.unit = other.unit + }else if other.unit != "" && self.unit != other.unit{ + self.unit = "" + logger.Warn(fmt.Sprintf( + "Conflicting metric unit of %q", self.name)) + } + if self.ty == "" || self.ty == model.MetricTypeUnknown{ + self.ty = other.ty + }else if !(self.ty == "" || self.ty == model.MetricTypeUnknown) && + self.ty != other.ty{ + self.ty = model.MetricTypeUnknown + logger.Warn(fmt.Sprintf( + "Conflicting metric types of %q", self.name)) + } + if self.help == ""{ + self.help = other.help + }else if len(self.help) < len(other.help){ + self.help = other.help + } +} + +type MetricFamilies struct{ + metas map[string]*MetricFamily + metamap map[string]*MetricFamily + samples map[string]map[string]Sample + meta_conflicts map[string]bool + metamap_conflicts map[string]bool + sample_conflicts map[string]bool +} +func EmptyMetricFamilies() MetricFamilies{ + return MetricFamilies{ + metas: map[string]*MetricFamily{}, + metamap: map[string]*MetricFamily{}, + samples: map[string]map[string]Sample{}, + meta_conflicts: map[string]bool{}, + metamap_conflicts: map[string]bool{}, + sample_conflicts: map[string]bool{}, + } +} +func (self MetricFamilies) Clone() MetricFamilies{ + ret := EmptyMetricFamilies() + ret.meta_conflicts = maps.Clone(self.meta_conflicts) + ret.metamap_conflicts = maps.Clone(self.metamap_conflicts) + ret.sample_conflicts = maps.Clone(self.sample_conflicts) + for _, v := range self.metas{ + ret.AddMeta(*v) + } + for n, m := range self.samples{ + dm := ret.samples[n] + if dm == nil{ + dm = map[string]Sample{} + ret.samples[n] = dm + } + for l, s := range m{ + dm[l] = s + } + } + return ret +} +func (self MetricFamilies) String() string{ + samples := maps.Clone(self.samples) + ret := "" + for _, mf := range self.metas{ + // labels, __name__ + ms := map[string]map[string]*[]Sample{} + for _, mss := range mf.MetricNamesLabels(){ + for _, s := range samples[mss.Name]{ + ml := labels.Labels{} + for _, l := range s.labels{ + if l.Name == "__name__" || + slices.ContainsFunc(mss.Labels, + func(ld MetricSampleLabel)bool{ + return ld.Name == l.Name + }){ + continue + } + ml = append(ml, l) + } + mls := SortLabels(ml).String() + ss := ms[mls][mss.Name] + if ss == nil{ + if ms[mls] == nil{ + ms[mls] = map[string]*[]Sample{} + } + ss = &[]Sample{} + ms[mls][mss.Name] = ss + } + *ss = append(*ss, s) + } + delete(samples, mss.Name) + } + if len(ms) == 0{ + continue + } + ret += mf.Header() + for _, s := range ms{ + for _, mss := range mf.MetricNamesLabels(){ + if s[mss.Name] == nil{ + continue + } + slices.SortFunc(*s[mss.Name], func(a,b Sample)int{ + for _, l := range mss.Labels{ + as := a.labels.Get(l.Name) + bs := b.labels.Get(l.Name) + switch l.Type{ + case MetricSampleLabelTypeUnspec: + continue + case MetricSampleLabelTypeFloat: + an, ae := strconv.ParseFloat(as, 64) + bn, be := strconv.ParseFloat(bs, 64) + if ae == nil && be == nil{ + if an < bn{ return -1 + }else if bn < an{ return 1 + }else{ continue } + }else{ + if ae == nil{ return 1 + }else if be == nil{ return -1 + }else{ + if as < bs{ return -1 + }else if bs < as{ return 1 + }else{ continue } + } + } + case MetricSampleLabelTypeStr: + if as < bs{ return -1 + }else if bs < as{ return 1 + }else{ continue } + } + } + return 0 + }) + for _, m := range *s[mss.Name]{ + ret += m.String() + "\n" + } + } + } + } + for _, mn := range samples{ + for _, m := range mn{ + ret += m.String() + "\n" + } + } + return ret + "# EOF\n" +} +func (self *MetricFamilies) addSampleWithKeys(s Sample, mn string, sk string){ + if self.samples[mn] == nil{ + self.samples[mn] = map[string]Sample{} + } + if !self.sample_conflicts[sk]{ + _, found := self.samples[mn][sk] + if found{ + logger.Error(fmt.Sprintf("Conflicting labelset %s", sk)) + self.sample_conflicts[sk] = true + delete(self.samples[mn], sk) + if len(self.samples[mn]) == 0{ + delete(self.samples, mn) + } + }else{ + if len(s.labels) != 0 && s.labels[0].Name != labels.MetricName && + s.labels.Has(labels.MetricName){ + s.labels = append(labels.New( + labels.Label{ + Name: labels.MetricName, + Value: mn, + }, + ), + s.labels.DropMetricName()...) + } + self.samples[mn][sk] = s + } + } +} +func (self *MetricFamilies) AddSample(s Sample){ + self.addSampleWithKeys(s, s.labels.Get(labels.MetricName), s.cmpString()) +} +func (self *MetricFamilies) AddMeta(m MetricFamily){ + mn := m.MetricNames() + if self.meta_conflicts[m.name]{ + return + } + for _, mni := range mn{ + if self.metamap_conflicts[mni]{ + return + } + } + + c := make([]*MetricFamily, 0, 2) + if ci := self.metas[m.name]; ci != nil{ + if m.ty == ci.ty{ + ci.Merge(m) + return + }else{ + c = append(c, ci) + } + } + for _, mni := range mn{ + if ci := self.metamap[mni]; ci != nil{ + c = append(c, ci) + } + } + if len(c) == 0{ + self.metas[m.name] = &m + for _, mni := range mn{ + self.metamap[mni] = &m + } + }else{ + logger.Warn(fmt.Sprintf( + "Conflicting metric families %q (%s) and %q (%s)", + m.name, m.ty, c[0].name, c[0].ty)) + c = append(c, &m) + for _, ci := range c{ + delete(self.metas, ci.name) + self.meta_conflicts[ci.name] = true + for _, mni := range ci.MetricNames(){ + delete(self.metamap, mni) + self.metamap_conflicts[mni] = true + } + } + } +} +func (self *MetricFamilies) Merge(other MetricFamilies){ + for _, f := range other.metas{ + self.AddMeta(*f) + } + for n, m := range other.samples{ + for l, s := range m{ + self.addSampleWithKeys(s, n, l) + } + } +} +func (self MetricFamilies) Relabel( + f func(labels.Labels) (labels.Labels, bool), +) MetricFamilies{ + ret := EmptyMetricFamilies() + for on, m := range self.metas{ + l := labels.New( + labels.Label{ + Name: labels.MetricName, + Value: on, + }, + ) + l, keep := f(l) + if keep{ + nm := *m + nm.name = l.Get(labels.MetricName) + ret.AddMeta(nm) + } + } + + for _, m := range self.samples{ + for _, s := range m{ + l, keep := f(s.labels) + if keep{ + ns := s + ns.labels = l + ret.AddSample(ns) + } + } + } + + return ret +} + +func ParseMetrics(text []byte, fty string) (MetricFamilies, error){ + ty, _, err := mime.ParseMediaType(fty) + if err != nil{ + ty = "" + } + + // ignore errors + p, _ := textparse.New(text, fty, false, labels.NewSymbolTable()) + + ret := EmptyMetricFamilies() + metas := map[string]*MetricFamily{} + curf := (*MetricFamily)(nil) + byname := func(name string) *MetricFamily{ + if curf == nil || curf.name != name{ + curf = metas[name] + if curf == nil{ + curf = new(MetricFamily) + metas[name] = curf + curf.name = name + } + } + return curf + } + + for{ + rt, err := p.Next() + if errors.Is(err, io.EOF){ + break + }else if err != nil{ + return MetricFamilies{}, err + } + + switch rt{ + case textparse.EntryType: + name, ty := p.Type() + meta := byname(string(name)) + if meta.ty != ""{ + logger.Warn(fmt.Sprintf( + "Multiple type header for metric %q", name)) + } + meta.ty = ty + case textparse.EntryUnit: + name, unit := p.Unit() + meta := byname(string(name)) + if meta.unit != ""{ + logger.Warn(fmt.Sprintf( + "Multiple type header for metric %q", name)) + } + meta.unit = string(unit) + case textparse.EntryHelp: + name, help := p.Help() + meta := byname(string(name)) + if meta.unit != ""{ + logger.Warn(fmt.Sprintf( + "Multiple help header for metric %q", name)) + meta.unit += "\n" + } + meta.help += string(help) + case textparse.EntrySeries: + _, time, val := p.Series() + if time != nil{ + return MetricFamilies{}, fmt.Errorf("Metric with timestamp") + } + l := labels.New() + p.Metric(&l) + s := Sample{ + labels: l, + value: val, + } + ex := exemplar.Exemplar{} + if p.Exemplar(&ex){ + s.suffix = fmt.Sprintf(" # %s %g", ex.Labels.String(), ex.Value) + if ex.HasTs{ + s.suffix += fmt.Sprintf(" %.3f", float64(ex.Ts) / 1000.0) + } + if p.Exemplar(&ex){ + logger.Warn("Discarding exemplar") + } + } + ret.AddSample(s) + case textparse.EntryHistogram: + // TODO p.Histogram() + logger.Error("Histogram is not supported") + case textparse.EntryComment: + // ignore comments + //case textparse.EntryInvalid: + default: + logger.Error("Invalid entry found") + } + } + + for _, m := range metas{ + if m.ty == ""{ + m.ty = model.MetricTypeUnknown + } + if m.ty == "counter" && ty != "application/openmetrics-text"{ + if m.name == "go_memstats_alloc_bytes_total"{ + continue // to avoid warning + } + m.name = strings.TrimSuffix(m.name, "_total") + } + ret.AddMeta(*m) + } + return ret, nil +} @@ -0,0 +1,150 @@ +package main + +import ( + "fmt" + "strings" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" +) + +type Part struct{ + Name string + Transforms []Transform + Source Source +} +func (self Part) Get() (MetricFamilies, error){ + mf, err := self.Source.Get() + if err != nil{ + return MetricFamilies{}, fmt.Errorf("while reading source: %w", err) + } + for _, t := range self.Transforms{ + mf, err = t.Process(mf) + if err != nil{ + return MetricFamilies{}, + fmt.Errorf("while transforming input: %w", err) + } + } + return mf, nil +} + +func PartFromArg(arg string, re map[string]*[]*relabel.Config) (Part, error){ + tf := []Transform{} + namef := func(n string)string{ return n } + at := strings.Index(arg, "@") + if at >= 0 && strings.IndexAny(arg[0:at], "=,:") < 0{ + name := arg[0:at] + namef = func(string)string{ return name } + arg = arg[at+1:] + } + for true{ + sp := strings.IndexAny(arg, "=,:") + if sp < 0{ + return Part{}, fmt.Errorf("Unexpected end of part spec") + }else if arg[sp] == ':'{ + if arg[0:sp] == "http" || arg[0:sp] == "https"{ + s, err := NewHttpSource(arg) + if err != nil{ + return Part{}, err + } + return Part{ + Name: namef(arg), + Transforms: tf, + Source: s, + }, nil + }else if arg[0:sp] == "file"{ + fn := arg[sp+1:] + if strings.HasPrefix(fn, "//"){ + fn = fn[2:] + } + return Part{ + Name: namef(fn), + Transforms: tf, + Source: FileSource(fn), + }, nil + }else{ + return Part{}, fmt.Errorf("Unknown protocol %q", arg[0:sp]) + } + }else if arg[sp] == '='{ + vstop := strings.Index(arg, ",") + if vstop < 0{ + return Part{}, fmt.Errorf("Unexpected end of part spec") + } + tf = append(tf, SetLabel{ + Name: arg[0:sp], + Val: arg[sp+1:vstop], + }) + arg = arg[vstop+1:] + }else{ // arg[sp] == ',' + r := re[arg[0:sp]] + if r == nil{ + return Part{}, fmt.Errorf("Undefined relabel %q", arg[0:sp]) + } + tf = append(tf, Relabel(*r)) + arg = arg[sp+1:] + } + } + return Part{}, fmt.Errorf("impossible branch") +} + +type Parts []Part +func (self Parts) Get() (MetricFamilies, error){ + ret := EmptyMetricFamilies() + for _, part := range self{ + p, err := part.Get() + if err != nil{ + logger.Error(fmt.Sprintf("In %q: %s", part.Name, err.Error())) + }else{ + ret.Merge(p) + } + } + return ret, nil +} +func PartsFromArgs( + args []string, + re map[string]*[]*relabel.Config, +) (Parts, error){ + ret := Parts([]Part{}) + for _, arg := range args{ + p, err := PartFromArg(arg, re) + if err != nil{ + return []Part{}, err + } + ret = append(ret, p) + } + return ret, nil +} + +type Transform interface{ + Process(target MetricFamilies) (MetricFamilies, error) +} + +type Relabel []*relabel.Config +func (self Relabel) Process(target MetricFamilies) (MetricFamilies, error){ + f := func(l labels.Labels) (labels.Labels, bool){ + return relabel.Process(l, self...) + } + return target.Relabel(f), nil +} + +type SetLabel struct{ + Name string + Val string +} +func (self SetLabel) Process(target MetricFamilies) (MetricFamilies, error){ + f := func(l labels.Labels) (labels.Labels, bool){ + l = l.Copy() + for _, ll := range l{ + if ll.Name == self.Name{ + ll.Value = self.Val + goto FIN + } + } + l = append(l, labels.Label{ + Name: self.Name, + Value: self.Val, + }) +FIN: + return l, true + } + return target.Relabel(f), nil +} diff --git a/source.go b/source.go new file mode 100644 index 0000000..353a225 --- /dev/null +++ b/source.go @@ -0,0 +1,104 @@ +package main + +import ( + "strings" + "io" + "fmt" + "os" + "context" + "net" + "net/http" +) + +type Source interface{ + Get() (MetricFamilies, error) +} + +type textSource interface{ + textGet() (io.ReadCloser, string, error) +} +func sourceGetHelper(self textSource) (MetricFamilies, error){ + rc, ty, err := self.textGet() + if err != nil{ + return MetricFamilies{}, + fmt.Errorf("while opening: %w", err) + } + text, err := io.ReadAll(rc) + rc.Close() + if err != nil{ + return MetricFamilies{}, + fmt.Errorf("while reading: %w", err) + } + ret, err := ParseMetrics(text, ty) + if err != nil{ + return MetricFamilies{}, + fmt.Errorf("while parsing: %w", err) + } + return ret, nil +} + +type FileSource string +func (self FileSource) Get() (MetricFamilies, error){ + return sourceGetHelper(self) +} +func (self FileSource) textGet() (io.ReadCloser, string, error){ + f, err := os.Open(string(self)) + if err != nil{ + return nil, "", err + } + return f, "application/openmetrics-text", nil +} + +type HttpSource struct{ + vurl string + client *http.Client +} +func NewHttpSource(loc string) (HttpSource, error){ + return HttpSource{ + vurl: loc, + client: &http.Client{}, + }, nil +} +func NewUnixHttpSource(loc string) (HttpSource, error){ + colon := strings.Index(loc, ":") + if colon < 0 { return HttpSource{}, fmt.Errorf("No protocol part") } + if loc[0:colon] != "unix+http"{ + return HttpSource{}, fmt.Errorf("Invalid protocol") + } + loc = loc[colon+1:] + path := loc + domstart := strings.Index(loc, "//") + if 0 < domstart{ + path = loc[0:domstart] + if strings.Index(loc[domstart+2:], "/") < 0{ + loc = "http:" + loc[domstart:] + "/metrics" + }else{ + loc = "http:" + loc[domstart:] + } + }else{ + loc = "http://0.0.0.0/metrics" + } + tp := http.Transport{ + DialContext: func(c context.Context, _, _ string)(net.Conn, error){ + return (&net.Dialer{}).DialContext(c, "unix", path) + }, + } + return HttpSource{ + vurl: loc, + client: &http.Client{Transport: &tp}, + }, nil +} +func (self HttpSource) Get() (MetricFamilies, error){ + return sourceGetHelper(self) +} +func (self HttpSource) textGet() (io.ReadCloser, string, error){ + r, err := self.client.Get(self.vurl) + if err != nil{ + return nil, "", err + } + if r.StatusCode != 200{ + r.Body.Close() + return nil, "", fmt.Errorf("unexpected response: %s", r.Status) + } + return r.Body, r.Header.Get("Content-Type"), nil +} |