@@ -95,20 +95,22 @@ type OdcStatus struct {
9595}
9696
9797type OdcDeviceId uint64
98+ type OdcCollectionId uint64
9899
99100func (o OdcDeviceId ) MarshalJSON () ([]byte , error ) {
100101 return json .Marshal (strconv .FormatUint (uint64 (o ), 10 ))
101102}
102103
103104type OdcPartitionInfo struct {
104- PartitionId uid.ID `json:"-"`
105- RunNumber uint32 `json:"runNumber"`
106- State string `json:"state"`
107- EcsState sm.State `json:"ecsState"`
108- DdsSessionId string `json:"ddsSessionId"`
109- DdsSessionStatus string `json:"ddsSessionStatus"`
110- Devices map [OdcDeviceId ]* OdcDevice `json:"devices"`
111- Hosts []string `json:"hosts"`
105+ PartitionId uid.ID `json:"-"`
106+ RunNumber uint32 `json:"runNumber"`
107+ State string `json:"state"`
108+ EcsState sm.State `json:"ecsState"`
109+ DdsSessionId string `json:"ddsSessionId"`
110+ DdsSessionStatus string `json:"ddsSessionStatus"`
111+ Devices map [OdcDeviceId ]* OdcDevice `json:"devices"`
112+ Hosts []string `json:"hosts"`
113+ Collections map [OdcCollectionId ]* OdcCollection `json:"collections"`
112114}
113115
114116type OdcDevice struct {
@@ -122,6 +124,14 @@ type OdcDevice struct {
122124 Rmsjobid string `json:"rmsjobid"`
123125}
124126
127+ type OdcCollection struct {
128+ CollectionId OdcCollectionId `json:"collectionId"`
129+ State string `json:"state"`
130+ EcsState sm.State `json:"ecsState"`
131+ Path string `json:"path"`
132+ Host string `json:"host"`
133+ }
134+
125135type partitionStateChangedEventPayload struct {
126136 PartitionId uid.ID `json:"partitionId"`
127137 DdsSessionId string `json:"ddsSessionId"`
@@ -144,6 +154,17 @@ type deviceStateChangedEventPayload struct {
144154 Rmsjobid string `json:"rmsjobid"`
145155}
146156
157+ type collectionStateChangedEventPayload struct {
158+ PartitionId uid.ID `json:"partitionId"`
159+ DdsSessionId string `json:"ddsSessionId"`
160+ DdsSessionStatus string `json:"ddsSessionStatus"`
161+ State string `json:"state"`
162+ EcsState sm.State `json:"ecsState"`
163+ CollectionId OdcCollectionId `json:"collectionId"`
164+ Path string `json:"path"`
165+ Host string `json:"host"`
166+ }
167+
147168func NewPlugin (endpoint string ) integration.Plugin {
148169 u , err := url .Parse (endpoint )
149170 if err != nil {
@@ -273,6 +294,16 @@ func (p *Plugin) queryPartitionStatus() {
273294 Rmsjobid : device .Rmsjobid ,
274295 }
275296 }
297+ odcPartInfoSlice [idx ].Collections = make (map [OdcCollectionId ]* OdcCollection , len (odcPartStateRep .Collections ))
298+ for _ , collection := range odcPartStateRep .Collections {
299+ odcPartInfoSlice [idx ].Collections [OdcCollectionId (collection .Id )] = & OdcCollection {
300+ CollectionId : OdcCollectionId (collection .Id ),
301+ State : collection .State ,
302+ EcsState : fairmq .ToEcsState (collection .State , sm .UNKNOWN ),
303+ Path : collection .Path ,
304+ Host : collection .Host ,
305+ }
306+ }
276307 }(i , id )
277308 }
278309 wg .Wait ()
@@ -337,6 +368,45 @@ func (p *Plugin) queryPartitionStatus() {
337368 })
338369 }
339370
371+ // detection of collection state change + event publication
372+ for collectionId , collection := range partitionInfo .Collections {
373+ existingCollection , hasCollection := existingPartition .Collections [collectionId ]
374+
375+ oldEcsState := sm .UNKNOWN // we presume the collection didn't exist before
376+
377+ // if a collection with this ID is already known to us from before
378+ if hasCollection {
379+ // if collection state has changed
380+ if existingCollection .State != collection .State {
381+ // if the state has changed, we take note of the previous state
382+ oldEcsState = existingCollection .EcsState
383+ } else {
384+ // if the state hasn't changed, we set the old ECS state and bail
385+ collection .EcsState = existingCollection .EcsState
386+ continue
387+ }
388+ }
389+
390+ collection .EcsState = fairmq .ToEcsState (collection .State , oldEcsState )
391+
392+ payload := collectionStateChangedEventPayload {
393+ PartitionId : partitionInfo .PartitionId ,
394+ DdsSessionId : partitionInfo .DdsSessionId ,
395+ DdsSessionStatus : partitionInfo .DdsSessionStatus ,
396+ State : collection .State ,
397+ EcsState : collection .EcsState ,
398+ CollectionId : collection .CollectionId ,
399+ Path : collection .Path ,
400+ Host : collection .Host ,
401+ }
402+ payloadJson , _ := json .Marshal (payload )
403+ the .EventWriterWithTopic (TOPIC ).WriteEvent (& pb.Ev_IntegratedServiceEvent {
404+ Name : "odc.collectionStateChanged" ,
405+ EnvironmentId : id .String (),
406+ Payload : string (payloadJson [:]),
407+ })
408+ }
409+
340410 // detection of env (ODC partition) state change + event publication
341411 if existingPartition .State != partitionInfo .State {
342412 partitionInfo .EcsState = fairmq .ToEcsState (partitionInfo .State , existingPartition .EcsState )
0 commit comments