@@ -28,6 +28,9 @@ public class UserPreferenceService {
2828 @ Value ("${kafka.topics.user-preferences:user-preferences}" )
2929 private String userPreferencesTopic ;
3030
31+ @ Value ("${user-preference.operation.timeout:1000}" )
32+ private long operationTimeoutMillis ;
33+
3134 @ CircuitBreaker (name = "userPreferences" , fallbackMethod = "getDefaultPreferences" )
3235 public Mono <UserPreference > getUserPreferences (String userId ) {
3336 log .info ("Kullanıcı tercihleri getiriliyor: userId={}" , userId );
@@ -51,15 +54,31 @@ public Mono<UserPreference> getDefaultPreferences(String userId, Throwable t) {
5154
5255 public Mono <UserPreference > saveUserPreferences (UserPreference preferences ) {
5356 log .info ("Kullanıcı tercihleri kaydediliyor: userId={}" , preferences .getUserId ());
57+
58+ // Redis'e kayıt işlemini timeout ile sarmalıyoruz
5459 return redisCacheService .saveUserPreferences (preferences )
55- .then (Mono .defer (() -> {
56- // Kafka'ya bildirim gönder
57- kafkaTemplate .send (userPreferencesTopic , preferences .getUserId (), preferences );
58- return Mono .just (preferences );
60+ .timeout (Duration .ofMillis (operationTimeoutMillis ))
61+ .doOnError (e -> {
62+ if (e instanceof TimeoutException ) {
63+ log .warn ("Redis kaydetme işlemi zaman aşımına uğradı, ancak işlem arka planda devam edecek: userId={}" ,
64+ preferences .getUserId ());
65+ } else {
66+ log .error ("Redis kaydetme işlemi sırasında hata: userId={}, error={}" ,
67+ preferences .getUserId (), e .getMessage ());
68+ }
69+ })
70+ .onErrorResume (e -> Mono .empty ())
71+ // Kafka'ya bildirimi non-blocking yapıyoruz
72+ .then (Mono .fromCallable (() -> {
73+ // Kafka'ya bildirim gönderme işlemini asenkron yapıyoruz
74+ try {
75+ kafkaTemplate .sendDefault (preferences .getUserId (), preferences );
76+ } catch (Exception e ) {
77+ log .warn ("Kafka mesajı gönderilirken hata oluştu: {}" , e .getMessage ());
78+ }
79+ return preferences ;
5980 }))
60- .doOnSuccess (pref -> log .debug ("Kullanıcı tercihleri başarıyla kaydedildi: userId={}" , preferences .getUserId ()))
61- .doOnError (e -> log .error ("Kullanıcı tercihleri kaydedilirken hata: userId={}, error={}" ,
62- preferences .getUserId (), e .getMessage ()));
81+ .doOnSuccess (pref -> log .debug ("Kullanıcı tercihleri işlemi tamamlandı: userId={}" , preferences .getUserId ()));
6382 }
6483
6584 public Mono <UserPreference > updateTheme (String userId , String theme ) {
@@ -74,20 +93,23 @@ public Mono<UserPreference> updateTheme(String userId, String theme) {
7493 pref .setTheme (theme );
7594 pref .setUpdatedAt (System .currentTimeMillis ());
7695 return redisCacheService .saveUserPreferences (pref )
77- .timeout (Duration .ofSeconds (8 ))
96+ .timeout (Duration .ofMillis (operationTimeoutMillis ))
97+ .onErrorResume (e -> {
98+ log .warn ("Redis teması güncelleme hatası (arka planda devam edecek): {}" , e .getMessage ());
99+ return Mono .just (Boolean .TRUE ); // İşlemi devam ettir
100+ })
78101 .thenReturn (pref );
79102 })
80103 .doOnNext (saved -> {
81- log .debug ("Tema başarıyla güncellendi: userId={}, theme={}" , userId , theme );
82- // Sadece tema değişikliğini event olarak yayınla
83- publishPreferenceUpdated (userId , "theme" , theme );
104+ // Tema değişikliğini asenkron olarak event olarak yayınla
105+ try {
106+ publishPreferenceUpdated (userId , "theme" , theme );
107+ } catch (Exception e ) {
108+ log .warn ("Tema değişikliği yayınlanırken hata: {}" , e .getMessage ());
109+ }
84110 })
85111 .doOnError (e -> log .error ("Tema güncellenirken hata: userId={}, theme={}, error={}" ,
86- userId , theme , e .getMessage ()))
87- .onErrorResume (e -> {
88- log .warn ("Tema güncelleme hatası için fallback çalıştırılıyor: userId={}" , userId );
89- return createFallbackThemePreference (userId , theme );
90- });
112+ userId , theme , e .getMessage ()));
91113 }
92114
93115 private Mono <UserPreference > createFallbackThemePreference (String userId , String theme ) {
@@ -221,6 +243,7 @@ private void publishPreferenceUpdated(String userId, String preferenceType, Stri
221243 event .put ("value" , value );
222244 event .put ("timestamp" , System .currentTimeMillis ());
223245
246+ // Asenkron gönderim - operasyonu bloklamıyor
224247 kafkaTemplate .send (userPreferencesTopic , userId , event )
225248 .whenComplete ((result , ex ) -> {
226249 if (ex != null ) {
0 commit comments