Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/hotfix-message/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::parts::{Body, Header, Part, RepeatingGroup, Trailer};
use crate::session_fields::{BEGIN_STRING, BODY_LENGTH, CHECK_SUM, MSG_TYPE};
use hotfix_dictionary::{FieldLocation, IsFieldDefinition};

#[derive(Clone)]
pub struct Message {
pub(crate) header: Header,
pub(crate) body: Body,
Expand Down
2 changes: 1 addition & 1 deletion crates/hotfix-message/src/parts/body.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::field_map::FieldMap;
use crate::parts::Part;

#[derive(Default)]
#[derive(Clone, Default)]
pub struct Body {
pub(crate) fields: FieldMap,
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hotfix-message/src/parts/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::field_map::FieldMap;
use crate::parts::Part;
use crate::session_fields;

#[derive(Default)]
#[derive(Clone, Default)]
pub struct Header {
pub fields: FieldMap,
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hotfix-message/src/parts/trailer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::parts::Part;
use crate::session_fields;
use hotfix_dictionary::IsFieldDefinition;

#[derive(Default)]
#[derive(Clone, Default)]
pub struct Trailer {
pub(crate) fields: FieldMap,
}
Expand Down
31 changes: 27 additions & 4 deletions crates/hotfix/src/application.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
use crate::message::OutboundMessage;
use hotfix_message::message::Message;

#[async_trait::async_trait]
/// The application users of HotFIX can implement to hook into the engine.
pub trait Application<Inbound, Outbound>: Send + Sync + 'static {
pub trait Application: Send + Sync + 'static {
type Outbound: OutboundMessage;

/// Called when a message is sent to the engine to be sent to the counterparty.
///
/// This is invoked before the raw message is persisted in the message store.
async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision;
async fn on_outbound_message(&self, msg: &Self::Outbound) -> OutboundDecision;
/// Called when a message is received from the counterparty.
///
/// This is invoked after the message is verified and parsed into a typed message.
async fn on_inbound_message(&self, msg: Inbound) -> InboundDecision;
/// This is invoked after the message is verified by the session layer.
async fn on_inbound_message(&self, msg: &Message) -> InboundDecision;
/// Called when the session is logged out.
async fn on_logout(&mut self, reason: &str);
/// Called when the session is logged on.
async fn on_logon(&mut self);
}

/// Standard FIX Business Reject Reason values (tag 380).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u32)]
pub enum BusinessRejectReason {
Other = 0,
UnknownId = 1,
UnknownSecurity = 2,
UnsupportedMessageType = 3,
ApplicationNotAvailable = 4,
ConditionallyRequiredFieldMissing = 5,
NotAuthorized = 6,
DeliverToFirmNotAvailable = 7,
}

pub enum InboundDecision {
Accept,
Reject {
reason: BusinessRejectReason,
text: Option<String>,
},
TerminateSession,
}

Expand Down
31 changes: 12 additions & 19 deletions crates/hotfix/src/initiator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, warn};

use crate::application::Application;
use crate::config::SessionConfig;
use crate::message::{InboundMessage, OutboundMessage};
use crate::message::OutboundMessage;
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
use crate::session::{InternalSessionRef, SessionHandle};
use crate::store::MessageStore;
Expand All @@ -27,9 +27,9 @@ pub struct Initiator<Outbound> {
}

impl<Outbound: OutboundMessage> Initiator<Outbound> {
pub async fn start<Inbound: InboundMessage>(
pub async fn start(
config: SessionConfig,
application: impl Application<Inbound, Outbound>,
application: impl Application<Outbound = Outbound>,
store: impl MessageStore + 'static,
) -> Result<Self, SessionCreationError> {
let session_ref = InternalSessionRef::new(config.clone(), application, store)?;
Expand Down Expand Up @@ -157,10 +157,10 @@ async fn establish_connection<Outbound: OutboundMessage>(
mod tests {
use super::*;
use crate::application::{Application, InboundDecision, OutboundDecision};
use crate::message::generate_message;
use crate::message::logon::{Logon, ResetSeqNumConfig};
use crate::message::logout::Logout;
use crate::message::parser::Parser;
use crate::message::{InboundMessage, generate_message};
use crate::store::in_memory::InMemoryMessageStore;
use hotfix_message::Part;
use hotfix_message::message::Message;
Expand All @@ -180,21 +180,17 @@ mod tests {
}
}

impl InboundMessage for DummyMessage {
fn parse(_message: &Message) -> Self {
DummyMessage
}
}

// No-op application
struct NoOpApp;

#[async_trait::async_trait]
impl Application<DummyMessage, DummyMessage> for NoOpApp {
impl Application for NoOpApp {
type Outbound = DummyMessage;

async fn on_outbound_message(&self, _msg: &DummyMessage) -> OutboundDecision {
OutboundDecision::Send
}
async fn on_inbound_message(&self, _msg: DummyMessage) -> InboundDecision {
async fn on_inbound_message(&self, _msg: &Message) -> InboundDecision {
InboundDecision::Accept
}
async fn on_logout(&mut self, _reason: &str) {}
Expand Down Expand Up @@ -366,13 +362,10 @@ mod tests {
let mut config = create_test_config("127.0.0.1", port);
config.reconnect_interval = 1; // Short interval for test

let _initiator = Initiator::<DummyMessage>::start::<DummyMessage>(
config,
NoOpApp,
InMemoryMessageStore::default(),
)
.await
.unwrap();
let _initiator =
Initiator::<DummyMessage>::start(config, NoOpApp, InMemoryMessageStore::default())
.await
.unwrap();

// Accept first connection
let (conn1, _) = tokio::time::timeout(Duration::from_secs(2), listener.accept())
Expand Down
5 changes: 1 addition & 4 deletions crates/hotfix/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) use hotfix_message::message::{Config, Message};
use hotfix_message::session_fields::{MSG_SEQ_NUM, SENDER_COMP_ID, SENDING_TIME, TARGET_COMP_ID};
pub use hotfix_message::{Part, RepeatingGroup};

pub mod business_reject;
pub mod heartbeat;
pub mod logon;
pub mod logout;
Expand All @@ -25,10 +26,6 @@ pub trait OutboundMessage: Clone + Send + 'static {
fn message_type(&self) -> &str;
}

pub trait InboundMessage: Clone + Send + 'static {
fn parse(message: &Message) -> Self;
}

pub fn generate_message(
begin_string: &str,
sender_comp_id: &str,
Expand Down
212 changes: 212 additions & 0 deletions crates/hotfix/src/message/business_reject.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use crate::application::BusinessRejectReason;
use crate::message::OutboundMessage;
use hotfix_message::dict::{FieldLocation, FixDatatype};
use hotfix_message::message::Message;
use hotfix_message::session_fields::{REF_MSG_TYPE, REF_SEQ_NUM, TEXT};
use hotfix_message::{Buffer, FieldType, HardCodedFixFieldDefinition, Part};

const BUSINESS_REJECT_REASON: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition {
name: "BusinessRejectReason",
tag: 380,
data_type: FixDatatype::Int,
location: FieldLocation::Body,
};

impl<'a> FieldType<'a> for BusinessRejectReason {
type Error = ();
type SerializeSettings = ();

fn serialize_with<B>(&self, buffer: &mut B, _settings: Self::SerializeSettings) -> usize
where
B: Buffer,
{
let value = *self as u32;
value.serialize(buffer)
}

fn deserialize(data: &'a [u8]) -> Result<Self, Self::Error> {
let value = u32::deserialize(data).map_err(|_| ())?;
match value {
0 => Ok(Self::Other),
1 => Ok(Self::UnknownId),
2 => Ok(Self::UnknownSecurity),
3 => Ok(Self::UnsupportedMessageType),
4 => Ok(Self::ApplicationNotAvailable),
5 => Ok(Self::ConditionallyRequiredFieldMissing),
6 => Ok(Self::NotAuthorized),
7 => Ok(Self::DeliverToFirmNotAvailable),
_ => Err(()),
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct BusinessReject {
ref_msg_type: String,
reason: BusinessRejectReason,
ref_seq_num: Option<u64>,
text: Option<String>,
}

impl BusinessReject {
pub(crate) fn new(ref_msg_type: &str, reason: BusinessRejectReason) -> Self {
Self {
ref_msg_type: ref_msg_type.to_string(),
reason,
ref_seq_num: None,
text: None,
}
}

pub(crate) fn ref_seq_num(mut self, ref_seq_num: u64) -> Self {
self.ref_seq_num = Some(ref_seq_num);
self
}

pub(crate) fn text(mut self, text: &str) -> Self {
self.text = Some(text.to_string());
self
}

#[cfg(test)]
fn parse(message: &Message) -> Self {
Self {
#[allow(clippy::expect_used)]
ref_msg_type: message
.get::<&str>(REF_MSG_TYPE)
.expect("ref_msg_type should be present")
.to_string(),
#[allow(clippy::expect_used)]
reason: message
.get(BUSINESS_REJECT_REASON)
.expect("reason should be present"),
ref_seq_num: message.get(REF_SEQ_NUM).ok(),
text: message.get::<&str>(TEXT).ok().map(|s| s.to_string()),
}
}
}

impl OutboundMessage for BusinessReject {
fn write(&self, msg: &mut Message) {
msg.set(REF_MSG_TYPE, self.ref_msg_type.as_str());
msg.set(BUSINESS_REJECT_REASON, self.reason);

if let Some(ref_seq_num) = self.ref_seq_num {
msg.set(REF_SEQ_NUM, ref_seq_num);
}
if let Some(text) = &self.text {
msg.set(TEXT, text.as_str());
}
}

fn message_type(&self) -> &str {
"j"
}
}

#[cfg(test)]
mod tests {
use super::*;
use hotfix_message::message::Message;

#[test]
fn test_write_business_reject_with_required_fields_only() {
let reject = BusinessReject::new("D", BusinessRejectReason::UnsupportedMessageType);

let mut msg = Message::new("FIX.4.4", "j");
reject.write(&mut msg);

assert_eq!(msg.get::<&str>(REF_MSG_TYPE).unwrap(), "D");
assert_eq!(
msg.get::<BusinessRejectReason>(BUSINESS_REJECT_REASON)
.unwrap(),
BusinessRejectReason::UnsupportedMessageType
);
assert!(msg.get::<u64>(REF_SEQ_NUM).is_err());
assert!(msg.get::<&str>(TEXT).is_err());
}

#[test]
fn test_write_business_reject_with_all_fields() {
let reject = BusinessReject::new("8", BusinessRejectReason::NotAuthorized)
.ref_seq_num(456)
.text("Not authorized for execution reports");

let mut msg = Message::new("FIX.4.4", "j");
reject.write(&mut msg);

assert_eq!(msg.get::<&str>(REF_MSG_TYPE).unwrap(), "8");
assert_eq!(
msg.get::<BusinessRejectReason>(BUSINESS_REJECT_REASON)
.unwrap(),
BusinessRejectReason::NotAuthorized
);
assert_eq!(msg.get::<u64>(REF_SEQ_NUM).unwrap(), 456);
assert_eq!(
msg.get::<&str>(TEXT).unwrap(),
"Not authorized for execution reports"
);
}

#[test]
fn test_round_trip_serialization() {
let original =
BusinessReject::new("D", BusinessRejectReason::ConditionallyRequiredFieldMissing)
.ref_seq_num(789)
.text("ClOrdID is required");

let mut msg = Message::new("FIX.4.4", "j");
original.write(&mut msg);

let parsed = BusinessReject::parse(&msg);

assert_eq!(parsed.ref_msg_type, original.ref_msg_type);
assert_eq!(parsed.reason, original.reason);
assert_eq!(parsed.ref_seq_num, original.ref_seq_num);
assert_eq!(parsed.text, original.text);
}

#[test]
fn test_round_trip_with_minimal_fields() {
let original = BusinessReject::new("0", BusinessRejectReason::Other);

let mut msg = Message::new("FIX.4.4", "j");
original.write(&mut msg);

let parsed = BusinessReject::parse(&msg);

assert_eq!(parsed.ref_msg_type, original.ref_msg_type);
assert_eq!(parsed.reason, original.reason);
assert_eq!(parsed.ref_seq_num, original.ref_seq_num);
assert_eq!(parsed.text, original.text);
}

#[test]
fn test_message_type() {
let reject = BusinessReject::new("D", BusinessRejectReason::Other);
assert_eq!(reject.message_type(), "j");
}

#[test]
fn test_all_reject_reasons_round_trip() {
let reasons = [
BusinessRejectReason::Other,
BusinessRejectReason::UnknownId,
BusinessRejectReason::UnknownSecurity,
BusinessRejectReason::UnsupportedMessageType,
BusinessRejectReason::ApplicationNotAvailable,
BusinessRejectReason::ConditionallyRequiredFieldMissing,
BusinessRejectReason::NotAuthorized,
BusinessRejectReason::DeliverToFirmNotAvailable,
];

for reason in reasons {
let reject = BusinessReject::new("D", reason);
let mut msg = Message::new("FIX.4.4", "j");
reject.write(&mut msg);

let parsed = BusinessReject::parse(&msg);
assert_eq!(parsed.reason, reason, "Round-trip failed for {reason:?}");
}
}
}
Loading