Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions include/atframe/atapp.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ ATBUS_MACRO_NAMESPACE_END
LIBATAPP_MACRO_NAMESPACE_BEGIN

class etcd_module;
class service_discovery_module;
class worker_pool_module;
class atapp_connector_atbus;
class atapp_connector_loopback;
Expand Down Expand Up @@ -394,7 +395,8 @@ class app {

LIBATAPP_MACRO_API void pack(atapp::protocol::atapp_discovery &out) const;

LIBATAPP_MACRO_API const std::shared_ptr<::atframework::atapp::etcd_module> &get_etcd_module() const noexcept;
LIBATAPP_MACRO_API const std::shared_ptr<::atframework::atapp::service_discovery_module> &
get_service_discovery_module() const noexcept;

LIBATAPP_MACRO_API const std::shared_ptr<::atframework::atapp::worker_pool_module> &get_worker_pool_module()
const noexcept;
Expand Down Expand Up @@ -601,6 +603,8 @@ class app {

int setup_atbus();

int setup_curl_multi();

static void close_timer(timer_ptr_t &t);

int setup_tick_timer();
Expand Down Expand Up @@ -663,8 +667,6 @@ class app {
int command_handler_stop(atfw::util::cli::callback_param params);
int command_handler_reload(atfw::util::cli::callback_param params);
int command_handler_invalid(atfw::util::cli::callback_param params);
int command_handler_disable_etcd(atfw::util::cli::callback_param params);
int command_handler_enable_etcd(atfw::util::cli::callback_param params);
int command_handler_list_discovery(atfw::util::cli::callback_param params);

private:
Expand Down Expand Up @@ -786,9 +788,12 @@ class app {
};
stats_data_t stats_;

bool curl_global_init_;
atfw::util::network::http_request::curl_m_bind_ptr_t curl_multi_;

// inner modules
std::shared_ptr<::atframework::atapp::worker_pool_module> internal_module_worker_pool_;
std::shared_ptr<::atframework::atapp::etcd_module> internal_module_etcd_;
std::shared_ptr<::atframework::atapp::service_discovery_module> internal_module_service_discovery_;
etcd_discovery_set internal_empty_discovery_set_;

// inner endpoints
Expand Down
2 changes: 1 addition & 1 deletion include/atframe/atapp_conf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ message atapp_configure {

atbus_configure bus = 301;

atapp_etcd etcd = 401;
atapp_etcd service_discovery_etcd = 401;
Comment thread
yousongyang marked this conversation as resolved.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOW
·
api_compatibility

Proto field renamed from etcd to service_discovery_etcd (field 401). This breaks backward compatibility for existing configurations. Users with configs using the old etcd field will get parse errors or silently ignored fields. While intentional for refactoring, migration documentation is needed.

Location: include/atframe/atapp_conf.proto:579

Referenced code (include/atframe/atapp_conf.proto:579):

atbus_configure bus = 301;

atapp_etcd service_discovery_etcd = 401;

atapp_worker_pool worker_pool = 501;


atapp_worker_pool worker_pool = 501;
}
Expand Down
7 changes: 5 additions & 2 deletions include/atframe/etcdcli/etcd_discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ class etcd_discovery_node {
ATFW_UTIL_FORCEINLINE const atapp::protocol::atapp_discovery &get_discovery_info() const { return node_info_; }
ATFW_UTIL_FORCEINLINE const node_version &get_version() const noexcept { return node_version_; }

LIBATAPP_MACRO_API void copy_from(const atapp::protocol::atapp_discovery &input, const node_version &version);
LIBATAPP_MACRO_API void update_version(const node_version &version, bool upgrade);
LIBATAPP_MACRO_API void copy_from(const atapp::protocol::atapp_discovery &input, const node_version &version,
uintptr_t context_ptr);
LIBATAPP_MACRO_API void update_version(const node_version &version, bool upgrade, uintptr_t context_ptr);
LIBATAPP_MACRO_API void copy_to(atapp::protocol::atapp_discovery &output) const;
LIBATAPP_MACRO_API void copy_key_to(atapp::protocol::atapp_discovery &output) const;
LIBATAPP_MACRO_API uintptr_t get_context_ptr() const noexcept { return context_ptr_; }

ATFW_UTIL_FORCEINLINE const std::pair<uint64_t, uint64_t> &get_name_hash() const noexcept { return name_hash_; }

Expand All @@ -78,6 +80,7 @@ class etcd_discovery_node {
private:
atapp::protocol::atapp_discovery node_info_;
node_version node_version_;
uintptr_t context_ptr_;

std::pair<uint64_t, uint64_t> name_hash_;
union {
Expand Down
260 changes: 21 additions & 239 deletions include/atframe/modules/etcd_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,267 +32,49 @@
#include <unordered_map>

LIBATAPP_MACRO_NAMESPACE_BEGIN
class etcd_module : public ::atframework::atapp::module_impl {
public:
// ============ 服务发现数据和事件相关定义 ============
using node_action_t = etcd_discovery_action_t;

struct LIBATAPP_MACRO_API_HEAD_ONLY node_info_t {
atapp::protocol::atapp_discovery node_discovery;
node_action_t action;
};

struct LIBATAPP_MACRO_API_HEAD_ONLY node_list_t {
std::list<node_info_t> nodes;
};

struct LIBATAPP_MACRO_API_HEAD_ONLY discovery_watcher_sender_list_t {
std::reference_wrapper<etcd_module> atapp_module;
std::reference_wrapper<const ::atframework::atapp::etcd_response_header> etcd_header;
std::reference_wrapper<const ::atframework::atapp::etcd_watcher::response_t> etcd_body;
std::reference_wrapper<const ::atframework::atapp::etcd_watcher::event_t> event;
std::reference_wrapper<const node_info_t> node;

inline discovery_watcher_sender_list_t(etcd_module &m, const ::atframework::atapp::etcd_response_header &h,
const ::atframework::atapp::etcd_watcher::response_t &b,
const ::atframework::atapp::etcd_watcher::event_t &e, const node_info_t &n)
: atapp_module(std::ref(m)),
etcd_header(std::cref(h)),
etcd_body(std::cref(b)),
event(std::cref(e)),
node(std::cref(n)) {}
};

using discovery_watcher_list_callback_t = std::function<void(discovery_watcher_sender_list_t &)>;

using discovery_snapshot_event_callback_t = std::function<void(const etcd_module &)>;
using discovery_snapshot_event_callback_list_t = std::list<discovery_snapshot_event_callback_t>;
using discovery_snapshot_event_callback_handle_t = discovery_snapshot_event_callback_list_t::iterator;

using node_event_callback_t = std::function<void(node_action_t, const etcd_discovery_node::ptr_t &)>;
using node_event_callback_list_t = std::list<node_event_callback_t>;
using node_event_callback_handle_t = node_event_callback_list_t::iterator;

// ============ 拓扑数据和事件相关定义 ============
using topology_action_t = etcd_watch_event;
using atapp_topology_info_ptr_t = atfw::util::memory::strong_rc_ptr<atapp::protocol::atapp_topology_info>;

struct LIBATAPP_MACRO_API_HEAD_ONLY topology_storage_t {
atapp_topology_info_ptr_t info;
etcd_data_version version;
};

struct LIBATAPP_MACRO_API_HEAD_ONLY topology_info_t {
topology_storage_t storage;
topology_action_t action;
};

struct LIBATAPP_MACRO_API_HEAD_ONLY topology_list_t {
std::list<topology_info_t> topologies;
};

struct LIBATAPP_MACRO_API_HEAD_ONLY topology_watcher_sender_list_t {
std::reference_wrapper<etcd_module> atapp_module;
std::reference_wrapper<const ::atframework::atapp::etcd_response_header> etcd_header;
std::reference_wrapper<const ::atframework::atapp::etcd_watcher::response_t> etcd_body;
std::reference_wrapper<const ::atframework::atapp::etcd_watcher::event_t> event;
std::reference_wrapper<const topology_info_t> topology;

inline topology_watcher_sender_list_t(etcd_module &m, const ::atframework::atapp::etcd_response_header &h,
const ::atframework::atapp::etcd_watcher::response_t &b,
const ::atframework::atapp::etcd_watcher::event_t &e,
const topology_info_t &n)
: atapp_module(std::ref(m)),
etcd_header(std::cref(h)),
etcd_body(std::cref(b)),
event(std::cref(e)),
topology(std::cref(n)) {}
};

using topology_watcher_list_callback_t = std::function<void(topology_watcher_sender_list_t &)>;

using topology_snapshot_event_callback_t = std::function<void(const etcd_module &)>;
using topology_snapshot_event_callback_list_t = std::list<topology_snapshot_event_callback_t>;
using topology_snapshot_event_callback_handle_t = topology_snapshot_event_callback_list_t::iterator;

using topology_info_event_callback_t =
std::function<void(topology_action_t, const atapp_topology_info_ptr_t &, const etcd_data_version &)>;
using topology_info_event_callback_list_t = std::list<topology_info_event_callback_t>;
using topology_info_event_callback_handle_t = topology_info_event_callback_list_t::iterator;

class etcd_module {
public:
LIBATAPP_MACRO_API etcd_module();
LIBATAPP_MACRO_API virtual ~etcd_module();

public:
LIBATAPP_MACRO_API int init(atframework::atapp::app &app, const atapp::protocol::atapp_etcd &conf,
const atapp::protocol::atapp_log *ATFW_UTIL_MACRO_NULLABLE log_conf);
LIBATAPP_MACRO_API int reload(const atapp::protocol::atapp_etcd &conf,
const atapp::protocol::atapp_log *ATFW_UTIL_MACRO_NULLABLE log_conf);
LIBATAPP_MACRO_API int stop();
LIBATAPP_MACRO_API void reset();

LIBATAPP_MACRO_API int init() override;

private:
void update_keepalive_topology_value();
void update_keepalive_discovery_value();
int init_keepalives();
int init_watchers();
LIBATAPP_MACRO_API int tick();

public:
LIBATAPP_MACRO_API int reload() override;

LIBATAPP_MACRO_API int stop() override;

LIBATAPP_MACRO_API int timeout() override;

LIBATAPP_MACRO_API const char *ATFW_UTIL_MACRO_NONNULL name() const override;

LIBATAPP_MACRO_API int tick() override;

LIBATAPP_MACRO_API const std::string &get_conf_custom_data() const;
LIBATAPP_MACRO_API void set_conf_custom_data(const std::string &v);

LIBATAPP_MACRO_API bool is_etcd_enabled() const;
LIBATAPP_MACRO_API void enable_etcd();
LIBATAPP_MACRO_API void disable_etcd();
LIBATAPP_MACRO_API void set_maybe_update_keepalive_topology_value();
LIBATAPP_MACRO_API void set_maybe_update_keepalive_discovery_value();
LIBATAPP_MACRO_API void set_maybe_update_keepalive_discovery_area();
LIBATAPP_MACRO_API void set_maybe_update_keepalive_discovery_metadata();
LIBATAPP_MACRO_API atapp::etcd_cluster &get_etcd_cluster();
LIBATAPP_MACRO_API const atapp::etcd_cluster &get_etcd_cluster() const;
LIBATAPP_MACRO_API const std::string &get_configure_path() const;
LIBATAPP_MACRO_API const atapp::protocol::atapp_etcd &get_configure() const;

LIBATAPP_MACRO_API static bool check_keepalive_actor_start_success(
atframework::atapp::app *ATFW_UTIL_MACRO_NONNULL app,
LIBATAPP_MACRO_API bool check_keepalive_actor_start_success(
gsl::span<const std::list<etcd_keepalive::ptr_t> *> keepalive_actors);

LIBATAPP_MACRO_API const atfw::util::network::http_request::curl_m_bind_ptr_t &get_shared_curl_multi_context() const;

LIBATAPP_MACRO_API std::string get_discovery_by_id_path() const;
LIBATAPP_MACRO_API std::string get_discovery_by_name_path() const;
LIBATAPP_MACRO_API std::string get_topology_path() const;

LIBATAPP_MACRO_API std::string get_discovery_by_id_watcher_path() const;
LIBATAPP_MACRO_API std::string get_discovery_by_name_watcher_path() const;
LIBATAPP_MACRO_API std::string get_topology_watcher_path() const;

LIBATAPP_MACRO_API int add_discovery_watcher_by_id(const discovery_watcher_list_callback_t &fn);
LIBATAPP_MACRO_API int add_discovery_watcher_by_name(const discovery_watcher_list_callback_t &fn);
LIBATAPP_MACRO_API int add_topology_watcher(const topology_watcher_list_callback_t &fn);

LIBATAPP_MACRO_API const ::atframework::atapp::etcd_cluster &get_raw_etcd_ctx() const;
LIBATAPP_MACRO_API ::atframework::atapp::etcd_cluster &get_raw_etcd_ctx();

LIBATAPP_MACRO_API const ::atframework::atapp::etcd_response_header &get_last_etcd_event_topology_header()
const noexcept;
LIBATAPP_MACRO_API const ::atframework::atapp::etcd_response_header &get_last_etcd_event_discovery_header()
const noexcept;

LIBATAPP_MACRO_API const atapp::protocol::atapp_etcd &get_configure() const;
LIBATAPP_MACRO_API const std::string &get_configure_path() const;
LIBATAPP_MACRO_API static std::string generate_etcd_path(const std::string &path);

LIBATAPP_MACRO_API atapp::etcd_keepalive::ptr_t add_keepalive_actor(std::string &val, const std::string &node_path);

LIBATAPP_MACRO_API bool remove_keepalive_actor(const atapp::etcd_keepalive::ptr_t &keepalive);

LIBATAPP_MACRO_API node_event_callback_handle_t add_on_node_discovery_event(const node_event_callback_t &fn);
LIBATAPP_MACRO_API void remove_on_node_event(node_event_callback_handle_t &handle);

LIBATAPP_MACRO_API topology_info_event_callback_handle_t
add_on_topology_info_event(const topology_info_event_callback_t &fn);
LIBATAPP_MACRO_API void remove_on_topology_info_event(topology_info_event_callback_handle_t &handle);

LIBATAPP_MACRO_API etcd_discovery_set &get_global_discovery() noexcept;
LIBATAPP_MACRO_API const etcd_discovery_set &get_global_discovery() const noexcept;

LIBATAPP_MACRO_API const std::unordered_map<uint64_t, topology_storage_t> &get_topology_info_set() const noexcept;

LIBATAPP_MACRO_API bool has_discovery_snapshot() const noexcept;

LIBATAPP_MACRO_API discovery_snapshot_event_callback_handle_t
add_on_load_discovery_snapshot(const discovery_snapshot_event_callback_t &fn);
LIBATAPP_MACRO_API void remove_on_load_discovery_snapshot(discovery_snapshot_event_callback_handle_t &handle);
LIBATAPP_MACRO_API discovery_snapshot_event_callback_handle_t
add_on_discovery_snapshot_loaded(const discovery_snapshot_event_callback_t &fn);
LIBATAPP_MACRO_API void remove_on_discovery_snapshot_loaded(discovery_snapshot_event_callback_handle_t &handle);

LIBATAPP_MACRO_API bool has_topology_snapshot() const noexcept;

LIBATAPP_MACRO_API topology_snapshot_event_callback_handle_t
add_on_load_topology_snapshot(const topology_snapshot_event_callback_t &fn);
LIBATAPP_MACRO_API void remove_on_load_topology_snapshot(topology_snapshot_event_callback_handle_t &handle);
LIBATAPP_MACRO_API topology_snapshot_event_callback_handle_t
add_on_topology_snapshot_loaded(const topology_snapshot_event_callback_t &fn);
LIBATAPP_MACRO_API void remove_on_topology_snapshot_loaded(topology_snapshot_event_callback_handle_t &handle);

LIBATAPP_MACRO_API etcd_watcher::watch_event_fn_t_ptr create_discovery_watcher_callback_list_wrapper();
LIBATAPP_MACRO_API etcd_watcher::watch_event_fn_t_ptr create_topology_watcher_callback_list_wrapper();
LIBATAPP_MACRO_API bool is_etcd_enable() const;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFO
·
convention

Method renamed from is_etcd_enabled() (old) to is_etcd_enable() (new). Grammatically awkward - should be is_etcd_enabled or etcd_is_enabled. Not a correctness issue but creates naming inconsistency across API.

Location: include/atframe/modules/etcd_module.h:61

Referenced code (include/atframe/modules/etcd_module.h:61):

LIBATAPP_MACRO_API static std::string generate_etcd_path(const std::string &path);

LIBATAPP_MACRO_API bool is_etcd_enable() const;

private:


private:
static bool unpack(node_info_t &out, const std::string &path, const std::string &json, bool reset_data);
static void pack(const node_info_t &src, std::string &json);
static bool unpack(topology_info_t &out, const std::string &path, const std::string &json, bool reset_data);
static void pack(const atapp::protocol::atapp_topology_info &src, std::string &json);

static int http_callback_on_etcd_closed(atfw::util::network::http_request &req);
void load_cluster_conf(const atapp::protocol::atapp_etcd &conf,
const atapp::protocol::atapp_log *ATFW_UTIL_MACRO_NULLABLE log_conf = nullptr);

struct topology_watcher_callback_list_wrapper_t;
struct discovery_watcher_callback_list_wrapper_t;

bool update_internal_watcher_event(node_info_t &node, const etcd_discovery_node::node_version &version);
bool update_internal_watcher_event(topology_info_t &topology_info);
void reset_internal_watchers_and_keepalives();
bool enable_;
atfw::util::network::http_request::ptr_t cleanup_request_;

struct watcher_internal_access_t;
atframework::atapp::app *ATFW_UTIL_MACRO_NULLABLE atapp_;
atapp::etcd_cluster cluster_;

private:
// Config
std::string conf_path_cache_;
std::string custom_data_;
atfw::util::network::http_request::curl_m_bind_ptr_t curl_multi_;
atfw::util::network::http_request::ptr_t cleanup_request_;
bool etcd_ctx_enabled_;
bool maybe_update_internal_keepalive_topology_value_;
bool maybe_update_internal_keepalive_discovery_value_;
bool maybe_update_internal_keepalive_discovery_area_;
bool maybe_update_internal_keepalive_discovery_metadata_;
atapp_topology_info_ptr_t last_submmited_topology_data_;
atapp::protocol::atapp_area last_submmited_discovery_data_area_;
atapp::protocol::atapp_metadata last_submmited_discovery_data_metadata_;
atapp::protocol::atapp_etcd conf_cache_;
atfw::util::time::time_utility::raw_time_t tick_next_timepoint_;
std::chrono::system_clock::duration tick_interval_;
atapp::etcd_cluster etcd_ctx_;
::atframework::atapp::etcd_response_header last_etcd_event_topology_header_;
::atframework::atapp::etcd_response_header last_etcd_event_discovery_header_;

std::list<etcd_keepalive::ptr_t> internal_topology_keepalive_actors_;
std::list<etcd_keepalive::ptr_t> internal_discovery_keepalive_actors_;
std::string internal_keepalive_topology_value_;
std::string internal_keepalive_discovery_value_;

discovery_snapshot_event_callback_list_t discovery_on_load_snapshot_callbacks_;
discovery_snapshot_event_callback_list_t discovery_on_snapshot_loaded_callbacks_;
std::set<int64_t> discovery_watcher_snapshot_index_;
int64_t discovery_watcher_snapshot_index_allocator_;

mutable std::recursive_mutex discovery_watcher_callback_lock_;
std::list<discovery_watcher_list_callback_t> discovery_watcher_by_id_callbacks_;
std::list<discovery_watcher_list_callback_t> discovery_watcher_by_name_callbacks_;

topology_snapshot_event_callback_list_t topology_on_load_snapshot_callbacks_;
topology_snapshot_event_callback_list_t topology_on_snapshot_loaded_callbacks_;
std::set<int64_t> topology_watcher_snapshot_index_;
int64_t topology_watcher_snapshot_index_allocator_;

mutable std::recursive_mutex topology_watcher_callback_lock_;
std::list<topology_watcher_list_callback_t> topology_watcher_callbacks_;

etcd_watcher::ptr_t internal_topology_watcher_;
etcd_watcher::ptr_t internal_discovery_watcher_by_name_;
etcd_watcher::ptr_t internal_discovery_watcher_by_id_;

etcd_discovery_set global_discovery_;
std::unordered_map<uint64_t, topology_storage_t> internal_topology_info_set_;

mutable std::recursive_mutex node_event_lock_;
node_event_callback_list_t node_event_callbacks_;

mutable std::recursive_mutex topology_info_event_lock_;
topology_info_event_callback_list_t topology_info_event_callbacks_;
};
LIBATAPP_MACRO_NAMESPACE_END
Loading
Loading