158158import org .cloudfoundry .logcache .v1 .LogCacheClient ;
159159import org .cloudfoundry .logcache .v1 .ReadRequest ;
160160import org .cloudfoundry .logcache .v1 .ReadResponse ;
161+ import org .cloudfoundry .logcache .v1 .EnvelopeBatch ;
162+ import org .cloudfoundry .logcache .v1 .Envelope ;
161163import org .cloudfoundry .operations .util .OperationsLogging ;
162164import org .cloudfoundry .util .DateUtils ;
163165import org .cloudfoundry .util .DelayTimeoutException ;
@@ -248,7 +250,7 @@ public DefaultApplications(
248250 this .spaceId = spaceId ;
249251 }
250252
251- @ Override
253+ @ Override
252254 public Mono <Void > copySource (CopySourceApplicationRequest request ) {
253255 return Mono .zip (this .cloudFoundryClient , this .spaceId )
254256 .flatMap (
@@ -541,8 +543,9 @@ public Flux<Task> listTasks(ListApplicationTasksRequest request) {
541543 .checkpoint ();
542544 }
543545
546+ @ Deprecated
544547 @ Override
545- public Flux <Log > logs (LogsRequest request ) {
548+ public Flux <LogMessage > logs (LogsRequest request ) {
546549 return Mono .zip (this .cloudFoundryClient , this .spaceId )
547550 .flatMap (
548551 function (
@@ -551,107 +554,37 @@ public Flux<Log> logs(LogsRequest request) {
551554 cloudFoundryClient , request .getName (), spaceId )))
552555 .flatMapMany (
553556 applicationId ->
554- getRecentLogs (this .logCacheClient , applicationId ))
557+ getLogs (this .dopplerClient , applicationId , request . getRecent () ))
555558 .transform (OperationsLogging .log ("Get Application Logs" ))
556559 .checkpoint ();
557560 }
558561
559562 @ Override
560- public Flux <ApplicationLog > logs (ApplicationLogsRequest request ) {
561- return logs (LogsRequest .builder ()
562- .name (request .getName ())
563- .recent (request .getRecent ())
564- .build ())
565- .map (
566- logMessage ->
567- ApplicationLog .builder ()
568- .sourceId (logMessage .getApplicationId ())
569- .sourceType (logMessage .getSourceType ())
570- .instanceId (logMessage .getSourceInstance ())
571- .message (logMessage .getMessage ())
572- .timestamp (logMessage .getTimestamp ())
573- .logType (
574- ApplicationLogType .from (
575- logMessage .getMessageType ().name ()))
576- .build ());
577- }
578-
579- @ SuppressWarnings ("deprecation" )
580- @ Test
581- void logsRecent_doppler () {
582- requestApplications (
583- this .cloudFoundryClient ,
584- "test-application-name" ,
585- TEST_SPACE_ID ,
586- "test-metadata-id" );
587- requestLogsRecent (this .dopplerClient , "test-metadata-id" );
588- this .applications
589- .logs (LogsRequest .builder ().name ("test-application-name" ).recent (true ).build ())
590- .as (StepVerifier ::create )
591- .expectNextMatches (log -> log .getMessage ().equals ("test-log-message-message" ))
592- .expectComplete ()
593- .verify (Duration .ofSeconds (5 ));
594- }
595-
596- @ SuppressWarnings ("deprecation" )
597- @ Test
598- void logsNoApp_doppler () {
599- requestApplicationsEmpty (this .cloudFoundryClient , "test-application-name" , TEST_SPACE_ID );
600-
601- this .applications
602- .verify (Duration .ofSeconds (5 ));
603- }
604-
605- @ SuppressWarnings ("deprecation" )
606- @ Test
607- void logs_doppler () {
608- requestApplications (
609- this .cloudFoundryClient ,
610- "test-application-name" ,
611- TEST_SPACE_ID ,
612- "test-metadata-id" );
613- requestLogsStream (this .dopplerClient , "test-metadata-id" );
614- this .applications
615- .logs (LogsRequest .builder ().name ("test-application-name" ).recent (false ).build ())
616- .as (StepVerifier ::create )
617- .expectNextMatches (log -> log .getMessage ().equals ("test-log-message-message" ))
618- .expectComplete ()
619- .verify (Duration .ofSeconds (5 ));
620- }
621-
622- @ Test
623- void logsRecent_LogCache () {
624- requestApplications (
625- this .cloudFoundryClient ,
626- "test-application-name" ,
627- TEST_SPACE_ID ,
628- "test-metadata-id" );
629- requestLogsRecentLogCache (this .logCacheClient , "test-metadata-id" , "test-payload" );
630- this .applications
631- .logsRecent (ReadRequest .builder ().sourceId ("test-application-name" ).build ())
632- .as (StepVerifier ::create )
633- .expectNext (fill (Log .builder ()).type (LogType .OUT ).build ())
634- .expectComplete ()
635- .verify (Duration .ofSeconds (5 ));
563+ public Flux <Log > logsRecent (ReadRequest request ) {
564+ return getRecentLogsLogCache (this .logCacheClient , request )
565+ .transform (OperationsLogging .log ("Get Application Logs" ))
566+ .checkpoint ();
636567 }
637568
638- @ SuppressWarnings ("deprecation" )
639- @ Test
640- void logsRecentNotSet_doppler () {
641- requestApplications (
642- this .cloudFoundryClient ,
643- "test-application-name" ,
644- TEST_SPACE_ID ,
645- "test-metadata-id" );
646- requestLogsStream (this .dopplerClient , "test-metadata-id" );
647-
648- this .applications
649- .logs (LogsRequest .builder ().name ("test-application-name" ).build ())
650- .as (StepVerifier ::create )
651- .expectNext (fill (LogMessage .builder (), "log-message-" ).build ())
652- .expectComplete ()
653- .verify (Duration .ofSeconds (5 ));
654- }
569+ // @Override
570+ // public Flux<ApplicationLog> logs(ApplicationLogsRequest request) {
571+ // return logs(LogsRequest.builder()
572+ // .name(request.getName())
573+ // .recent(request.getRecent())
574+ // .build())
575+ // .map(
576+ // logMessage ->
577+ // ApplicationLog.builder()
578+ // .sourceId(logMessage.getApplicationId())
579+ // .sourceType(logMessage.getSourceType())
580+ // .instanceId(logMessage.getSourceInstance())
581+ // .message(logMessage.getMessage())
582+ // .timestamp(logMessage.getTimestamp())
583+ // .logType(
584+ // ApplicationLogType.from(
585+ // logMessage.getMessageType().name()))
586+ // .build());
587+ // }
655588
656589 @ Override
657590 @ SuppressWarnings ("deprecation" )
@@ -1705,8 +1638,13 @@ private static Flux<LogMessage> getLogs(
17051638 }
17061639 }
17071640
1708- private static Flux <Log > getRecentLogs (Mono <LogCacheClient > logCacheClient , String applicationId ) {
1709- return requestLogsRecentLogCache (logCacheClient , applicationId )
1641+ private static Flux <Log > getRecentLogsLogCache (
1642+ Mono <LogCacheClient > logCacheClient , ReadRequest readRequest ) {
1643+ return requestLogsRecentLogCache (logCacheClient , readRequest )
1644+ .map (EnvelopeBatch ::getBatch )
1645+ .map (List ::stream )
1646+ .flatMapIterable (envelopeStream -> envelopeStream .collect (Collectors .toList ()))
1647+ .filter (e -> e .getLog () != null )
17101648 .sort (LOG_MESSAGE_COMPARATOR_LOG_CACHE )
17111649 .map (org .cloudfoundry .logcache .v1 .Envelope ::getLog );
17121650 }
@@ -2646,6 +2584,7 @@ private static Flux<TaskResource> requestListTasks(
26462584 .build ()));
26472585 }
26482586
2587+ @ Deprecated
26492588 private static Flux <Envelope > requestLogsRecent (
26502589 Mono <DopplerClient > dopplerClient , String applicationId ) {
26512590 return dopplerClient .flatMapMany (
@@ -2654,30 +2593,12 @@ private static Flux<Envelope> requestLogsRecent(
26542593 RecentLogsRequest .builder ().applicationId (applicationId ).build ()));
26552594 }
26562595
2657- private static Flux < org . cloudfoundry . logcache . v1 . Envelope > requestLogsRecentLogCache (
2658- Mono <LogCacheClient > logCacheClient , String applicationId ) {
2659- return logCacheClient .flatMapMany (
2596+ private static Mono < EnvelopeBatch > requestLogsRecentLogCache (
2597+ Mono <LogCacheClient > logCacheClient , ReadRequest readRequest ) {
2598+ return logCacheClient .flatMap (
26602599 client ->
2661- client .recentLogs (
2662- ReadRequest .builder ()
2663- .sourceId (applicationId )
2664- .envelopeType (EnvelopeType .LOG )
2665- .limit (100 )
2666- .build ()
2667- )
2668- .flatMap (
2669- response ->
2670- Mono .justOrEmpty (
2671- response .getEnvelopes ().getBatch ().stream ().findFirst ()
2672- )
2673- )
2674- .repeatWhenEmpty (
2675- exponentialBackOff (
2676- Duration .ofSeconds (1 ),
2677- Duration .ofSeconds (5 ),
2678- Duration .ofMinutes (1 ))
2679- )
2680- );
2600+ client .recentLogs (readRequest )
2601+ .flatMap (response -> Mono .justOrEmpty (response .getEnvelopes ())));
26812602 }
26822603
26832604 private static Flux <Envelope > requestLogsStream (
0 commit comments