@@ -93,20 +93,22 @@ type OdcStatus struct {
9393}
9494
9595type OdcDeviceId uint64
96+ type OdcCollectionId uint64
9697
9798func (o OdcDeviceId ) MarshalJSON () ([]byte , error ) {
9899 return json .Marshal (strconv .FormatUint (uint64 (o ), 10 ))
99100}
100101
101102type OdcPartitionInfo struct {
102- PartitionId uid.ID `json:"-"`
103- RunNumber uint32 `json:"runNumber"`
104- State string `json:"state"`
105- EcsState sm.State `json:"ecsState"`
106- DdsSessionId string `json:"ddsSessionId"`
107- DdsSessionStatus string `json:"ddsSessionStatus"`
108- Devices map [OdcDeviceId ]* OdcDevice `json:"devices"`
109- Hosts []string `json:"hosts"`
103+ PartitionId uid.ID `json:"-"`
104+ RunNumber uint32 `json:"runNumber"`
105+ State string `json:"state"`
106+ EcsState sm.State `json:"ecsState"`
107+ DdsSessionId string `json:"ddsSessionId"`
108+ DdsSessionStatus string `json:"ddsSessionStatus"`
109+ Devices map [OdcDeviceId ]* OdcDevice `json:"devices"`
110+ Hosts []string `json:"hosts"`
111+ Collections map [OdcCollectionId ]* OdcCollection `json:"collections"`
110112}
111113
112114type OdcDevice struct {
@@ -120,6 +122,14 @@ type OdcDevice struct {
120122 Rmsjobid string `json:"rmsjobid"`
121123}
122124
125+ type OdcCollection struct {
126+ CollectionId OdcCollectionId `json:"collectionId"`
127+ State string `json:"state"`
128+ EcsState sm.State `json:"ecsState"`
129+ Path string `json:"path"`
130+ Host string `json:"host"`
131+ }
132+
123133type partitionStateChangedEventPayload struct {
124134 PartitionId uid.ID `json:"partitionId"`
125135 DdsSessionId string `json:"ddsSessionId"`
@@ -142,6 +152,17 @@ type deviceStateChangedEventPayload struct {
142152 Rmsjobid string `json:"rmsjobid"`
143153}
144154
155+ type collectionStateChangedEventPayload struct {
156+ PartitionId uid.ID `json:"partitionId"`
157+ DdsSessionId string `json:"ddsSessionId"`
158+ DdsSessionStatus string `json:"ddsSessionStatus"`
159+ State string `json:"state"`
160+ EcsState sm.State `json:"ecsState"`
161+ CollectionId OdcCollectionId `json:"collectionId"`
162+ Path string `json:"path"`
163+ Host string `json:"host"`
164+ }
165+
145166func NewPlugin (endpoint string ) integration.Plugin {
146167 u , err := url .Parse (endpoint )
147168 if err != nil {
@@ -271,6 +292,16 @@ func (p *Plugin) queryPartitionStatus() {
271292 Rmsjobid : device .Rmsjobid ,
272293 }
273294 }
295+ odcPartInfoSlice [idx ].Collections = make (map [OdcCollectionId ]* OdcCollection , len (odcPartStateRep .Collections ))
296+ for _ , collection := range odcPartStateRep .Collections {
297+ odcPartInfoSlice [idx ].Collections [OdcCollectionId (collection .Id )] = & OdcCollection {
298+ CollectionId : OdcCollectionId (collection .Id ),
299+ State : collection .State ,
300+ EcsState : fairmq .ToEcsState (collection .State , sm .UNKNOWN ),
301+ Path : collection .Path ,
302+ Host : collection .Host ,
303+ }
304+ }
274305 }(i , id )
275306 }
276307 wg .Wait ()
@@ -335,6 +366,45 @@ func (p *Plugin) queryPartitionStatus() {
335366 })
336367 }
337368
369+ // detection of collection state change + event publication
370+ for collectionId , collection := range partitionInfo .Collections {
371+ existingCollection , hasCollection := existingPartition .Collections [collectionId ]
372+
373+ oldEcsState := sm .UNKNOWN // we presume the collection didn't exist before
374+
375+ // if a collection with this ID is already known to us from before
376+ if hasCollection {
377+ // if collection state has changed
378+ if existingCollection .State != collection .State {
379+ // if the state has changed, we take note of the previous state
380+ oldEcsState = existingCollection .EcsState
381+ } else {
382+ // if the state hasn't changed, we set the old ECS state and bail
383+ collection .EcsState = existingCollection .EcsState
384+ continue
385+ }
386+ }
387+
388+ collection .EcsState = fairmq .ToEcsState (collection .State , oldEcsState )
389+
390+ payload := collectionStateChangedEventPayload {
391+ PartitionId : partitionInfo .PartitionId ,
392+ DdsSessionId : partitionInfo .DdsSessionId ,
393+ DdsSessionStatus : partitionInfo .DdsSessionStatus ,
394+ State : collection .State ,
395+ EcsState : collection .EcsState ,
396+ CollectionId : collection .CollectionId ,
397+ Path : collection .Path ,
398+ Host : collection .Host ,
399+ }
400+ payloadJson , _ := json .Marshal (payload )
401+ the .EventWriterWithTopic (TOPIC ).WriteEvent (& pb.Ev_IntegratedServiceEvent {
402+ Name : "odc.collectionStateChanged" ,
403+ EnvironmentId : id .String (),
404+ Payload : string (payloadJson [:]),
405+ })
406+ }
407+
338408 // detection of env (ODC partition) state change + event publication
339409 if existingPartition .State != partitionInfo .State {
340410 partitionInfo .EcsState = fairmq .ToEcsState (partitionInfo .State , existingPartition .EcsState )
0 commit comments