Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions PROJECT_STATUS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Machinebeat — Project Status

**Last Updated:** 2026-02-02

## Overview
Elastic Machinebeat is a custom beat for industrial protocols (OPC UA, MQTT, PLC4X). Goal: modernize dependencies and target **Beats v9.x**.

## Current State

### Git Status
- **Branch:** master
- **Target:** Beats v9.2.4 (aligned with elastic/beats v9 branch)
- **Changes:**
- `go.mod` adjusted to target Beats v9 (using pseudo-version `v7.0.0-alpha2...` mapped to v9.2.4 commit `fd909e2bd441`).
- `replace` directives aligned with Beats v9.
- Source code imports updated: `github.com/elastic/beats/v7/libbeat/logp` -> `github.com/elastic/elastic-agent-libs/logp` (moved in v9).
- `go.sum` updated via `go mod tidy`.

### Versions Detected (Local Files)
| Component | go.mod Version |
|-----------|----------------|
| Go | 1.24.11 (Toolchain: 1.24.12) |
| libbeat | v7.0.0-alpha2... (v9.2.4 commit fd909e2) |
| elastic-agent-libs | v0.26.2 |
| paho.mqtt | v1.5.1 |
| golang.org/x/tools | v0.40.0 |

### Build Status
- ✅ **SUCCESS:** `go mod tidy` completed successfully. Dependencies are resolved.
- ❌ **Build:** `go build ./...` fails: `common.MapStr` undefined in `module/mqtt/topic/client.go` and `module/opcua/nodevalue/nodevalue.go`.
- ⚠️ `mage` not installed (optional).

## Upgrade Plan

### Phase 1: Dependency Update (Complete)
- [x] Adjust `go.mod` for Beats v9.x target.
- [x] Run `go mod tidy` successfully.
- [x] Fix `libbeat/logp` import paths.

### Phase 2: Build & Test
- [ ] Fix `common.MapStr` usage (replace with v9 equivalent, e.g. `mapstr.M` from `elastic-agent-libs/mapstr`).
- [ ] Re-run `go build` / `mage build`.

## Notes
- `go.mod` uses a pseudo-version for `beats/v7` and a `replace` directive to point to the v9 commit, because `beats` v9 still declares `v7` module path but tags are `v9.x.x`.
- `libbeat/logp` moved to `elastic-agent-libs/logp`, requiring code changes.
3 changes: 2 additions & 1 deletion cmd/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/elastic-agent-libs/logp"
)

// BuildModulesManager adds support for modules management to a beat
Expand All @@ -40,7 +41,7 @@ func BuildModulesManager(beat *beat.Beat) (cmd.ModulesManager, error) {
return nil, errors.Errorf("wrong settings for config.modules.path, it is expected to end with *.yml. Got: %s", glob)
}

modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled")
modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled", logp.NewLogger("cfgfile"))
if err != nil {
return nil, errors.Wrap(err, "initialization error")
}
Expand Down
315 changes: 178 additions & 137 deletions go.mod

Large diffs are not rendered by default.

908 changes: 496 additions & 412 deletions go.sum

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions module/mqtt/topic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/metricbeat/mb"

MQTT "github.com/eclipse/paho.mqtt.golang"
Expand Down Expand Up @@ -130,11 +130,11 @@ func subscribeOnConnect(client MQTT.Client) {
func onMessage(client MQTT.Client, msg MQTT.Message) {
logp.Debug("MQTT", "MQTT message received: %s", string(msg.Payload()))
var mbEvent mb.Event
event := make(common.MapStr)
root := make(common.MapStr)
event := make(mapstr.M)
root := make(mapstr.M)

if config.LegacyFields {
var message = make(common.MapStr)
var message = make(mapstr.M)
message["content"] = string(msg.Payload())

if strings.HasPrefix(msg.Topic(), "$") {
Expand All @@ -159,7 +159,7 @@ func onMessage(client MQTT.Client, msg MQTT.Message) {
mbEvent.ModuleFields = event
events <- mbEvent

logp.Debug("MQTT", "Event sent: %t")
logp.Debug("MQTT", "Event sent: %t", true)
}

// DefaultConnectionLostHandler does nothing
Expand Down
2 changes: 1 addition & 1 deletion module/mqtt/topic/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package topic

import (
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
_ "github.com/elastic/beats/v7/libbeat/logp"
_ "github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
)

Expand Down
56 changes: 31 additions & 25 deletions module/opcua/nodevalue/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package nodevalue

import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/gopcua/opcua"
"github.com/gopcua/opcua/debug"
Expand Down Expand Up @@ -77,16 +77,19 @@ func (client *Client) connect() (bool, error) {
endpoints, err := opcua.GetEndpoints(client.ctx, config.Endpoint)
if err != nil {
logp.Error(err)
logp.Debug("Connect", err.Error())
logp.Debug("Connect", "%s", err.Error())
}

var policy = ua.FormatSecurityPolicyURI(config.Policy)
var mode = ua.MessageSecurityModeFromString(config.Mode)
logp.Info("[OPCUA] Your selected policy: %v and security mode: %v", policy, mode)

ep := opcua.SelectEndpoint(endpoints, config.Policy, ua.MessageSecurityModeFromString(config.Mode))
if ep == nil {
ep, err := opcua.SelectEndpoint(endpoints, config.Policy, ua.MessageSecurityModeFromString(config.Mode))
if err != nil {
logp.Err("[OPCUA] Failed to find suitable endpoint: %v", err)
}

if ep == nil {
logp.Err("[OPCUA] Failed to find suitable endpoint. Will try to switch to default [No security settings]. The following configurations are available for security:")
printEndpoints(endpoints)
client.endpoint = config.Endpoint
Expand Down Expand Up @@ -121,7 +124,10 @@ func (client *Client) connect() (bool, error) {
}

ctx := context.Background()
client.opcua = opcua.NewClient(client.endpoint, opts...)
client.opcua, err = opcua.NewClient(client.endpoint, opts...)
if err != nil {
return false, err
}
if err := client.opcua.Connect(ctx); err != nil {
return false, err
}
Expand All @@ -148,19 +154,19 @@ func (client *Client) appendNodeInformation() error {

if nodeCfg.Name == "" {
logp.Debug("Append Information", "Collect display name")
name, err := node.DisplayName()
name, err := node.DisplayName(context.Background())
if err == nil {
nodeCfg.Name = name.Text
} else {
logp.Debug("Collect", err.Error())
logp.Debug("Collect", "%s", err.Error())
}
}
if nodeCfg.DataType == "" {
logp.Debug("Append Information", "Collect data type")
attrs, err := node.Attributes(ua.AttributeIDDataType)
attrs, err := node.Attributes(context.Background(), ua.AttributeIDDataType)
if err != nil {
logp.Error(err)
logp.Debug("Collect", err.Error())
logp.Debug("Collect", "%s", err.Error())
} else {
nodeCfg.DataType = getDataType(attrs[0])
}
Expand Down Expand Up @@ -189,7 +195,7 @@ func (client *Client) collectData() ([]*ResponseObject, error) {
}

logp.Debug("Collect", "Sending request")
m, err := opcuaClient.ReadWithContext(client.ctx, req)
m, err := opcuaClient.Read(client.ctx, req)
if err != nil {
return retVal, err
}
Expand Down Expand Up @@ -228,13 +234,13 @@ func (client *Client) subscribeTo() {
subInterval, err := time.ParseDuration(strconv.Itoa(client.config.Subscription.PublishInterval) + "ms")
if err != nil {
logp.Error(err)
logp.Debug("Subscribe", err.Error())
logp.Debug("Subscribe", "%s", err.Error())
}

// start channel-based subscription
ch := make(chan *opcua.PublishNotificationData)

sub, err := opcuaClient.Subscribe(&opcua.SubscriptionParameters{
sub, err := opcuaClient.Subscribe(context.Background(), &opcua.SubscriptionParameters{
Interval: subInterval,
LifetimeCount: client.config.Subscription.LifeTimeCount,
MaxKeepAliveCount: client.config.Subscription.MaxKeepAliveCount,
Expand All @@ -244,7 +250,7 @@ func (client *Client) subscribeTo() {
if err != nil {
logp.Info("Error occured")
logp.Error(err)
logp.Debug("Subscribe", err.Error())
logp.Debug("Subscribe", "%s", err.Error())
return
}

Expand All @@ -258,7 +264,7 @@ func (client *Client) subscribeTo() {
if err != nil {
logp.Info("Error occured, will skip node: %v", nodeCfg.ID)
logp.Error(err)
logp.Debug("Subscribe", err.Error())
logp.Debug("Subscribe", "%s", err.Error())
continue
}

Expand All @@ -273,12 +279,12 @@ func (client *Client) subscribeTo() {
logp.Debug("Subscribe", "[OPCUA] Monitoring using data change filter")
miCreateRequest = client.dataChangeRequest(nodeId, handle)
}
res, err := sub.Monitor(ua.TimestampsToReturnBoth, miCreateRequest)
res, err := sub.Monitor(context.Background(), ua.TimestampsToReturnBoth, miCreateRequest)
if err != nil || res.Results[0].StatusCode != ua.StatusOK {
logp.Info("Error occured, will skip node: %v", nodeCfg.ID)
if err != nil {
logp.Error(err)
logp.Debug("Subscribe", err.Error())
logp.Debug("Subscribe", "%s", err.Error())
}
continue
}
Expand Down Expand Up @@ -378,7 +384,7 @@ func (client *Client) startBrowse() {
if err != nil {
logp.Info("Error occured, will skip node: %v", nodeCfg.ID)
logp.Error(err)
logp.Debug("Subscribe", err.Error())
logp.Debug("Subscribe", "%s", err.Error())
continue
}
nodeObj := opcuaClient.Node(nodeId)
Expand All @@ -401,7 +407,7 @@ func (client *Client) startBrowse() {
if err != nil {
logp.Info("Error occured")
logp.Error(err)
logp.Debug("Browse", err.Error())
logp.Debug("Browse", "%s", err.Error())
}

logp.Debug("Browse", "Found %v nodes to collect data from so far", len(client.nodesToCollect))
Expand All @@ -426,10 +432,10 @@ func (client *Client) browse(node *opcua.Node, level int, path string) error {
logp.Info("Analyse node id %v", node.ID.String())

//Collect attributes of the current node
attrs, err := node.Attributes(ua.AttributeIDDataType, ua.AttributeIDDisplayName, ua.AttributeIDBrowseName)
attrs, err := node.Attributes(context.Background(), ua.AttributeIDDataType, ua.AttributeIDDisplayName, ua.AttributeIDBrowseName)
if err != nil {
logp.Error(err)
logp.Debug("Browse", err.Error())
logp.Debug("Browse", "%s", err.Error())
}
if len(attrs) > 0 {
switch err := attrs[1].Status; err {
Expand Down Expand Up @@ -465,7 +471,7 @@ func (client *Client) browse(node *opcua.Node, level int, path string) error {
err := client.browse(child, level+1, path)
if err != nil {
logp.Error(err)
logp.Debug("Browse", err.Error())
logp.Debug("Browse", "%s", err.Error())
}

if config.Browse.MaxNodePerParent > 0 && i > config.Browse.MaxNodePerParent {
Expand All @@ -477,10 +483,10 @@ func (client *Client) browse(node *opcua.Node, level int, path string) error {
}

func findChildren(node *opcua.Node, refs uint32) []*opcua.Node {
children, err := node.Children(refs, ua.NodeClassAll)
children, err := node.Children(context.Background(), refs, ua.NodeClassAll)
if err != nil {
logp.Error(err)
logp.Debug("Browse", err.Error())
logp.Debug("Browse", "%s", err.Error())
return nil
}
logp.Debug("Browse", "Found %v new nodes for browsing with ref id %v", len(children), refs)
Expand Down Expand Up @@ -540,7 +546,7 @@ func (client *Client) closeConnection() {

client.openSubscription.Cancel(client.ctx)
client.openSubscription = nil
client.opcua.CloseSession()
client.opcua.Close()
client.opcua.CloseSession(client.ctx)
client.opcua.Close(client.ctx)
logp.Debug("Shutdown", "Shutdown successfully")
}
10 changes: 5 additions & 5 deletions module/opcua/nodevalue/nodevalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package nodevalue
import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/metricbeat/mb"

Expand Down Expand Up @@ -261,9 +261,9 @@ func publishResponses(data []*ResponseObject, report mb.ReporterV2, config *Metr
handleCounter(len(data), config.MaxTriesToReconnect, config)
for _, response := range data {
var mbEvent mb.Event
event := make(common.MapStr)
module := make(common.MapStr)
root := make(common.MapStr)
event := make(mapstr.M)
module := make(mapstr.M)
root := make(mapstr.M)

//Publish the event with the legacy field schema
if config.LegacyFields {
Expand Down
2 changes: 1 addition & 1 deletion module/plc4x/value/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package plc4xvalue

import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/apache/plc4x/plc4go/pkg/api"
"github.com/apache/plc4x/plc4go/pkg/api/drivers"
Expand Down
9 changes: 4 additions & 5 deletions module/plc4x/value/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package plc4xvalue
import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/metricbeat/mb"

"errors"
_ "fmt"

"github.com/elastic/beats/v7/libbeat/common"
)

// init registers the MetricSet with the central registry as soon as the program
Expand Down Expand Up @@ -94,7 +93,7 @@ func publishResponses(data []*ResponseObject, report mb.ReporterV2, config *Metr
logp.Info("[PLC4X] Publishing %v new events", len(data))
for _, response := range data {
var mbEvent mb.Event
root := make(common.MapStr)
root := make(mapstr.M)

root.Put("event.provider", "plc4x")
root.Put("event.url", config.Endpoint)
Expand All @@ -103,7 +102,7 @@ func publishResponses(data []*ResponseObject, report mb.ReporterV2, config *Metr

root.Put("sensor.id", response.node.ID)

event := make(common.MapStr)
event := make(mapstr.M)
event.Put("type", response.value.GetPlcValueType().String())
event.Put("value", response.value.GetString())

Expand Down