Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ public interface ChannelProcessor {

boolean enabled();

String processorInfo();
void setEnabled(boolean enabled);

ChannelProcessorInfo processorInfo();

long process(List<Channel> channels) throws JsonProcessingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.phoebus.channelfinder.processors;

import java.util.Map;

public record ChannelProcessorInfo(String name, boolean enabled, Map<String, String> properties) {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand Down Expand Up @@ -75,10 +76,13 @@ public long processorCount() {
responseCode = "200",
description = "List of processor-info",
content =
@Content(array = @ArraySchema(schema = @Schema(implementation = String.class))))
@Content(
array =
@ArraySchema(
schema = @Schema(implementation = ChannelProcessorInfo.class))))
})
@GetMapping("/info")
public List<String> processorInfo() {
@GetMapping("/processors")
public List<ChannelProcessorInfo> processorInfo() {
return channelProcessorService.getProcessorsInfo();
}

Expand Down Expand Up @@ -151,4 +155,17 @@ public long processChannels(
public void processChannels(List<Channel> channels) {
channelProcessorService.sendToProcessors(channels);
}

@Operation(summary = "Set if the processor is enabled or not")
@PutMapping(
value = "/processor/{processorName}/enabled",
produces = {"application/json"},
consumes = {"application/json"})
public void setProcessorEnabled(
@PathVariable("processorName") String processorName,
@Parameter(description = "Value of enabled to set, default value: true")
@RequestParam(required = false, name = "enabled", defaultValue = "true")
Boolean enabled) {
channelProcessorService.setProcessorEnabled(processorName, enabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -17,21 +19,35 @@ public class ChannelProcessorService {

private static final Logger logger = Logger.getLogger(ChannelProcessorService.class.getName());

@Autowired private List<ChannelProcessor> channelProcessors;
private final List<ChannelProcessor> channelProcessors;

@Autowired private TaskExecutor taskExecutor;
private final TaskExecutor taskExecutor;

@Value("${processors.chunking.size:10000}")
private int chunkSize;
private final int chunkSize;

public ChannelProcessorService(
@Autowired List<ChannelProcessor> channelProcessors,
@Autowired TaskExecutor taskExecutor,
@Value("${processors.chunking.size:10000}") int chunkSize) {
this.channelProcessors = channelProcessors;
this.taskExecutor = taskExecutor;
this.chunkSize = chunkSize;
}

long getProcessorCount() {
return channelProcessors.size();
}

List<String> getProcessorsInfo() {
return channelProcessors.stream()
.map(ChannelProcessor::processorInfo)
.collect(Collectors.toList());
List<ChannelProcessorInfo> getProcessorsInfo() {
return channelProcessors.stream().map(ChannelProcessor::processorInfo).toList();
}

void setProcessorEnabled(String name, boolean enabled) {
Optional<ChannelProcessor> processor =
channelProcessors.stream()
.filter(p -> Objects.equals(p.processorInfo().name(), name))
.findFirst();
processor.ifPresent(channelProcessor -> channelProcessor.setEnabled(enabled));
}

/**
Expand Down Expand Up @@ -60,7 +76,6 @@ public void sendToProcessors(List<Channel> channels) {
while (true) {
List<Channel> chunk = new ArrayList<>(chunkSize);
for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++) {}
;
if (chunk.isEmpty()) break;
channelProcessor.process(chunk);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.phoebus.channelfinder.entity.Channel;
import org.phoebus.channelfinder.entity.Property;
import org.phoebus.channelfinder.processors.ChannelProcessor;
import org.phoebus.channelfinder.processors.ChannelProcessorInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -64,8 +65,15 @@ public boolean enabled() {
}

@Override
public String processorInfo() {
Map<String, String> processorProperties =
public void setEnabled(boolean enabled) {
this.aaEnabled = enabled;
}

@Override
public ChannelProcessorInfo processorInfo() {
return new ChannelProcessorInfo(
"AAChannelProcessor",
aaEnabled,
Map.of(
"archiveProperty",
archivePropertyName,
Expand All @@ -74,8 +82,7 @@ public String processorInfo() {
"Archivers",
aaURLs.keySet().toString(),
"AutoPauseOn",
autoPauseOptions.toString());
return "AAChannelProcessor: ProcessProperties " + processorProperties;
autoPauseOptions.toString()));
}

/**
Expand Down Expand Up @@ -199,7 +206,7 @@ private void addChannelChange(
aaArchivePVS.get(archiverAlias).add(newArchiverPV);
}
}

private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) {
if (archiveStatus.equals("Being archived") && (pvStatus.equals(PV_STATUS_INACTIVE))) {
return ArchiveAction.PAUSE;
Expand All @@ -209,52 +216,57 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) {
&& !archiveStatus.equals("Paused")
&& pvStatus.equals(PV_STATUS_ACTIVE)) { // If archive status anything else
return ArchiveAction.ARCHIVE;

}

return ArchiveAction.NONE;
}

private Map<ArchiveAction, List<ArchivePVOptions>> getArchiveActions(
Map<String, ArchivePVOptions> archivePVS, ArchiverInfo archiverInfo) {
Map<String, ArchivePVOptions> archivePVS, ArchiverInfo archiverInfo) {
if (archiverInfo == null) {
return Map.of();
}

logger.log(Level.INFO, () -> String.format("Get archiver status in archiver %s", archiverInfo));

Map<ArchiveAction, List<ArchivePVOptions>> result = new EnumMap<>(ArchiveAction.class);
Arrays.stream(ArchiveAction.values()).forEach(archiveAction -> result.put(archiveAction, new ArrayList<>()));
Arrays.stream(ArchiveAction.values())
.forEach(archiveAction -> result.put(archiveAction, new ArrayList<>()));
// Don't request to archive an empty list.
if (archivePVS.isEmpty()) {
return result;
}
List<Map<String, String>> statuses = archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias());
List<Map<String, String>> statuses =
archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias());
logger.log(Level.FINER, "Statuses {0}", statuses);
statuses.forEach(archivePVStatusJsonMap -> {
String archiveStatus = archivePVStatusJsonMap.get("status");
String pvName = archivePVStatusJsonMap.get("pvName");
statuses.forEach(
archivePVStatusJsonMap -> {
String archiveStatus = archivePVStatusJsonMap.get("status");
String pvName = archivePVStatusJsonMap.get("pvName");

if (archiveStatus == null || pvName == null) {
logger.log(Level.WARNING, "Missing status or pvName in archivePVStatusJsonMap: {0}", archivePVStatusJsonMap);
return;
}
if (archiveStatus == null || pvName == null) {
logger.log(
Level.WARNING,
"Missing status or pvName in archivePVStatusJsonMap: {0}",
archivePVStatusJsonMap);
return;
}

ArchivePVOptions archivePVOptions = archivePVS.get(pvName);
if (archivePVOptions == null) {
logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName);
return;
}
ArchivePVOptions archivePVOptions = archivePVS.get(pvName);
if (archivePVOptions == null) {
logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName);
return;
}

String pvStatus = archivePVOptions.getPvStatus();
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);
String pvStatus = archivePVOptions.getPvStatus();
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);

List<ArchivePVOptions> archivePVOptionsList = result.get(action);
archivePVOptionsList.add(archivePVOptions);
});
List<ArchivePVOptions> archivePVOptionsList = result.get(action);
archivePVOptionsList.add(archivePVOptions);
});
return result;
}

private ArchivePVOptions createArchivePV(
List<String> policyList, Channel channel, String archiveProperty, String pvStaus) {
ArchivePVOptions newArchiverPV = new ArchivePVOptions();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.phoebus.channelfinder.processors;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;

import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.phoebus.channelfinder.CFResourceDescriptors;
import org.phoebus.channelfinder.ChannelScroll;
import org.phoebus.channelfinder.entity.Scroll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.HttpHeaders;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
import org.springframework.util.Base64Utils;

@ExtendWith(SpringExtension.class)
@WebMvcTest(ChannelProcessorManager.class)
@TestPropertySource(
value = "classpath:application_test.properties",
properties = {"elasticsearch.create.indices = false"})
class ChannelProcessorManagerIT {

protected static final String AUTHORIZATION =
"Basic " + Base64Utils.encodeToString("admin:adminPass".getBytes());

@Autowired protected MockMvc mockMvc;
@MockBean ChannelScroll channelScroll;

@Test
void testProcessorCount() throws Exception {
MockHttpServletRequestBuilder request =
get("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/count");
mockMvc.perform(request).andExpect(status().isOk()).andExpect(content().string("2"));
}

@Test
void testProcessorsInfo() throws Exception {
MockHttpServletRequestBuilder request =
get("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/processors");
mockMvc
.perform(request)
.andExpect(status().isOk())
.andExpect(
jsonPath("$[*].name", containsInAnyOrder("AAChannelProcessor", "DummyProcessor")));
}

@Test
void testProcessorEnabled() throws Exception {
MockHttpServletRequestBuilder request =
put("/"
+ CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI
+ "/processor/AAChannelProcessor/enabled")
.header(HttpHeaders.AUTHORIZATION, AUTHORIZATION)
.contentType("application/json")
.content("{\"enabled\": false}");
mockMvc.perform(request).andExpect(status().isOk());
}

@Test
void testProcessAllChannels() throws Exception {
Mockito.when(channelScroll.query(Mockito.any())).thenReturn(new Scroll("", List.of()));

MockHttpServletRequestBuilder request =
put("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/process/all")
.header(HttpHeaders.AUTHORIZATION, AUTHORIZATION);
mockMvc.perform(request).andExpect(status().isOk());
}

@Test
void testProcessQuery() throws Exception {
Mockito.when(channelScroll.query(Mockito.any())).thenReturn(new Scroll("", List.of()));
MockHttpServletRequestBuilder request =
put("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/process/query")
.header(HttpHeaders.AUTHORIZATION, AUTHORIZATION);
mockMvc.perform(request).andExpect(status().isOk());
}
}
Loading
Loading