11package com .park .utmstack .service ;
22
33import com .park .utmstack .config .Constants ;
4+ import com .park .utmstack .domain .correlation .config .UtmTenantConfig ;
45import com .park .utmstack .domain .datainput_ingestion .UtmDataInputStatus ;
56import com .park .utmstack .domain .UtmServerModule ;
67import com .park .utmstack .domain .application_events .enums .ApplicationEventType ;
1718import com .park .utmstack .repository .correlation .config .UtmDataTypesRepository ;
1819import com .park .utmstack .repository .network_scan .UtmNetworkScanRepository ;
1920import com .park .utmstack .service .application_events .ApplicationEventService ;
21+ import com .park .utmstack .service .correlation .config .UtmTenantConfigService ;
2022import com .park .utmstack .service .elasticsearch .ElasticsearchService ;
2123import com .park .utmstack .service .elasticsearch .SearchUtil ;
2224import com .park .utmstack .service .logstash_pipeline .response .statistic .StatisticDocument ;
2325import com .park .utmstack .service .network_scan .DataSourceConstants ;
2426import com .park .utmstack .service .network_scan .UtmNetworkScanService ;
2527import com .park .utmstack .util .enums .AlertSeverityEnum ;
2628import com .park .utmstack .util .enums .AlertStatus ;
29+ import com .park .utmstack .util .exceptions .ApiException ;
2730import lombok .RequiredArgsConstructor ;
2831import org .apache .http .conn .util .InetAddressUtils ;
2932import org .opensearch .client .json .JsonData ;
3437import org .slf4j .LoggerFactory ;
3538import org .springframework .data .domain .Page ;
3639import org .springframework .data .domain .Pageable ;
40+ import org .springframework .http .HttpStatus ;
3741import org .springframework .scheduling .annotation .Scheduled ;
3842import org .springframework .stereotype .Service ;
3943import org .springframework .transaction .annotation .Transactional ;
@@ -68,6 +72,7 @@ public class UtmDataInputStatusService {
6872 private final UtmDataTypesRepository dataTypesRepository ;
6973 private final UtmNetworkScanRepository networkScanRepository ;
7074 private final UtmDataInputStatusCheckpointRepository checkpointRepository ;
75+ private final UtmTenantConfigService utmTenantConfigService ;
7176
7277
7378 /**
@@ -174,7 +179,7 @@ private void checkDataInputStatus(List<UtmDataInputStatus> inputs, String server
174179 }
175180 }
176181
177- @ Scheduled (fixedDelay = 15000 , initialDelay = 30000 )
182+ @ Scheduled (fixedDelay = 1000 , initialDelay = 2000 )
178183 public void syncDataInputStatus () {
179184 final String ctx = CLASSNAME + ".syncDataInputStatus" ;
180185
@@ -193,7 +198,7 @@ public void syncDataInputStatus() {
193198 latestStats .forEach ((key , stat ) -> {
194199 try {
195200 String dataType = stat .getDataType ();
196- String dataSource = stat .getDataSource ();
201+ String dataSource = getDataSource ( stat .getDataSource () );
197202 long timestamp = Instant .parse (stat .getTimestamp ()).getEpochSecond ();
198203
199204 String compositeKey = dataType + "-" + dataSource ;
@@ -210,7 +215,9 @@ public void syncDataInputStatus() {
210215 .median (86400L )
211216 .build ();
212217 changed = true ;
218+
213219 } else if (status .getTimestamp () != timestamp ) {
220+ status .setSource (dataSource );
214221 status .setTimestamp (timestamp );
215222 changed = true ;
216223 }
@@ -497,4 +504,71 @@ private Map<String, StatisticDocument> getLatestStatisticsByDataSource() {
497504 return result ;
498505 }
499506
507+ private String getDataSource (String assetName ) {
508+ final String ctx = CLASSNAME + ".getDataSource" ;
509+
510+ Optional <UtmTenantConfig > tenantConfig = this .utmTenantConfigService .findByAssetName (assetName );
511+
512+ if (tenantConfig .isEmpty ()) {
513+ return assetName ;
514+ }
515+
516+ List <String > sources = buildSourcesList (tenantConfig .get ());
517+
518+ if (CollectionUtils .isEmpty (sources )) {
519+ return assetName ;
520+ }
521+
522+ Optional <UtmDataInputStatus > dataInputStatus = this .findDataInputBySource (sources );
523+
524+ return dataInputStatus
525+ .map (UtmDataInputStatus ::getSource )
526+ .orElse (assetName );
527+ }
528+
529+ /**
530+ * Builds a combined list of hostnames and IPs from tenant configuration
531+ *
532+ * @param tenantConfig the tenant configuration
533+ * @return combined list of sources, or empty list if none available
534+ */
535+ private List <String > buildSourcesList (UtmTenantConfig tenantConfig ) {
536+ List <String > sources = new ArrayList <>();
537+
538+ if (!CollectionUtils .isEmpty (tenantConfig .getAssetHostnameList ())) {
539+ sources .addAll (tenantConfig .getAssetHostnameList ());
540+ }
541+
542+ if (!CollectionUtils .isEmpty (tenantConfig .getAssetIpList ())) {
543+ sources .addAll (tenantConfig .getAssetIpList ());
544+ }
545+
546+ return sources ;
547+ }
548+
549+ /**
550+ * Finds a data input status by searching in the provided list of sources.
551+ * Returns the first available source from the database.
552+ *
553+ * @param sources list of source hostnames/IPs to search for
554+ * @return Optional containing the data input status if found
555+ */
556+ public Optional <UtmDataInputStatus > findDataInputBySource (List <String > sources ) {
557+ final String ctx = CLASSNAME + ".findDataInputBySource" ;
558+
559+ if (CollectionUtils .isEmpty (sources )) {
560+ return Optional .empty ();
561+ }
562+
563+ try {
564+ return this .dataInputStatusRepository .findBySourceIsIn (sources );
565+
566+ } catch (Exception ex ) {
567+ log .error ("{}: Error finding data input status by source {} - {}" ,
568+ ctx , sources , ex .getMessage (), ex );
569+ return Optional .empty ();
570+ }
571+ }
572+
573+
500574}
0 commit comments