Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 78 additions & 8 deletions core/integration/odc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,22 @@ type OdcStatus struct {
}

type OdcDeviceId uint64
type OdcCollectionId uint64

func (o OdcDeviceId) MarshalJSON() ([]byte, error) {
return json.Marshal(strconv.FormatUint(uint64(o), 10))
}

type OdcPartitionInfo struct {
PartitionId uid.ID `json:"-"`
RunNumber uint32 `json:"runNumber"`
State string `json:"state"`
EcsState sm.State `json:"ecsState"`
DdsSessionId string `json:"ddsSessionId"`
DdsSessionStatus string `json:"ddsSessionStatus"`
Devices map[OdcDeviceId]*OdcDevice `json:"devices"`
Hosts []string `json:"hosts"`
PartitionId uid.ID `json:"-"`
RunNumber uint32 `json:"runNumber"`
State string `json:"state"`
EcsState sm.State `json:"ecsState"`
DdsSessionId string `json:"ddsSessionId"`
DdsSessionStatus string `json:"ddsSessionStatus"`
Devices map[OdcDeviceId]*OdcDevice `json:"devices"`
Hosts []string `json:"hosts"`
Collections map[OdcCollectionId]*OdcCollection `json:"collections"`
}

type OdcDevice struct {
Expand All @@ -122,6 +124,14 @@ type OdcDevice struct {
Rmsjobid string `json:"rmsjobid"`
}

type OdcCollection struct {
CollectionId OdcCollectionId `json:"collectionId"`
State string `json:"state"`
EcsState sm.State `json:"ecsState"`
Path string `json:"path"`
Host string `json:"host"`
}

type partitionStateChangedEventPayload struct {
PartitionId uid.ID `json:"partitionId"`
DdsSessionId string `json:"ddsSessionId"`
Expand All @@ -144,6 +154,17 @@ type deviceStateChangedEventPayload struct {
Rmsjobid string `json:"rmsjobid"`
}

type collectionStateChangedEventPayload struct {
PartitionId uid.ID `json:"partitionId"`
DdsSessionId string `json:"ddsSessionId"`
DdsSessionStatus string `json:"ddsSessionStatus"`
State string `json:"state"`
EcsState sm.State `json:"ecsState"`
CollectionId OdcCollectionId `json:"collectionId"`
Path string `json:"path"`
Host string `json:"host"`
}

func NewPlugin(endpoint string) integration.Plugin {
u, err := url.Parse(endpoint)
if err != nil {
Expand Down Expand Up @@ -273,6 +294,16 @@ func (p *Plugin) queryPartitionStatus() {
Rmsjobid: device.Rmsjobid,
}
}
odcPartInfoSlice[idx].Collections = make(map[OdcCollectionId]*OdcCollection, len(odcPartStateRep.Collections))
for _, collection := range odcPartStateRep.Collections {
odcPartInfoSlice[idx].Collections[OdcCollectionId(collection.Id)] = &OdcCollection{
CollectionId: OdcCollectionId(collection.Id),
State: collection.State,
EcsState: fairmq.ToEcsState(collection.State, sm.UNKNOWN),
Path: collection.Path,
Host: collection.Host,
}
}
}(i, id)
}
wg.Wait()
Expand Down Expand Up @@ -337,6 +368,45 @@ func (p *Plugin) queryPartitionStatus() {
})
}

// detection of collection state change + event publication
for collectionId, collection := range partitionInfo.Collections {
existingCollection, hasCollection := existingPartition.Collections[collectionId]

oldEcsState := sm.UNKNOWN // we presume the collection didn't exist before

// if a collection with this ID is already known to us from before
if hasCollection {
// if collection state has changed
if existingCollection.State != collection.State {
// if the state has changed, we take note of the previous state
oldEcsState = existingCollection.EcsState
} else {
// if the state hasn't changed, we set the old ECS state and bail
collection.EcsState = existingCollection.EcsState
continue
}
}

collection.EcsState = fairmq.ToEcsState(collection.State, oldEcsState)

payload := collectionStateChangedEventPayload{
PartitionId: partitionInfo.PartitionId,
DdsSessionId: partitionInfo.DdsSessionId,
DdsSessionStatus: partitionInfo.DdsSessionStatus,
State: collection.State,
EcsState: collection.EcsState,
CollectionId: collection.CollectionId,
Path: collection.Path,
Host: collection.Host,
}
payloadJson, _ := json.Marshal(payload)
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: "odc.collectionStateChanged",
EnvironmentId: id.String(),
Payload: string(payloadJson[:]),
})
}

// detection of env (ODC partition) state change + event publication
if existingPartition.State != partitionInfo.State {
partitionInfo.EcsState = fairmq.ToEcsState(partitionInfo.State, existingPartition.EcsState)
Expand Down
Loading