11package io .numaproj .numaflow .examples .source .simple ;
22
3- import com .google .common .primitives .Longs ;
43import io .numaproj .numaflow .sourcer .AckRequest ;
54import io .numaproj .numaflow .sourcer .Message ;
5+ import io .numaproj .numaflow .sourcer .NackRequest ;
66import io .numaproj .numaflow .sourcer .Offset ;
77import io .numaproj .numaflow .sourcer .OutputObserver ;
88import io .numaproj .numaflow .sourcer .ReadRequest ;
99import io .numaproj .numaflow .sourcer .Server ;
1010import io .numaproj .numaflow .sourcer .Sourcer ;
1111import lombok .extern .slf4j .Slf4j ;
1212
13+ import java .nio .ByteBuffer ;
1314import java .time .Instant ;
1415import java .util .HashMap ;
1516import java .util .List ;
1617import java .util .Map ;
1718import java .util .UUID ;
1819import java .util .concurrent .ConcurrentHashMap ;
20+ import java .util .concurrent .atomic .AtomicInteger ;
1921
2022/**
2123 * SimpleSource is a simple implementation of Sourcer.
2628
2729@ Slf4j
2830public class SimpleSource extends Sourcer {
29- private final Map <Long , Boolean > messages = new ConcurrentHashMap <>();
30- private long readIndex = 0 ;
31+ private final Map <Integer , Boolean > yetToBeAcked = new ConcurrentHashMap <>();
32+ Map <Integer , Boolean > nacked = new ConcurrentHashMap <>();
33+ private final AtomicInteger readIndex = new AtomicInteger (0 );
3134
3235 public static void main (String [] args ) throws Exception {
3336 Server server = new Server (new SimpleSource ());
@@ -42,7 +45,18 @@ public static void main(String[] args) throws Exception {
4245 @ Override
4346 public void read (ReadRequest request , OutputObserver observer ) {
4447 long startTime = System .currentTimeMillis ();
45- if (messages .entrySet ().size () > 0 ) {
48+
49+ // if there are messages which got nacked, we should read them first.
50+ if (!nacked .isEmpty ()) {
51+ for (int i = 0 ; i < nacked .size (); i ++) {
52+ Integer index = readIndex .incrementAndGet ();
53+ yetToBeAcked .put (index , true );
54+ observer .send (constructMessage (index ));
55+ }
56+ nacked .clear ();
57+ }
58+
59+ if (!yetToBeAcked .isEmpty ()) {
4660 // if there are messages not acknowledged, return
4761 return ;
4862 }
@@ -52,41 +66,57 @@ public void read(ReadRequest request, OutputObserver observer) {
5266 return ;
5367 }
5468
55- Map <String , String > headers = new HashMap <>();
56- headers .put ("x-txn-id" , UUID .randomUUID ().toString ());
57-
58- // create a message with increasing offset
59- Offset offset = new Offset (Longs .toByteArray (readIndex ));
60- Message message = new Message (
61- Long .toString (readIndex ).getBytes (),
62- offset ,
63- Instant .now (),
64- headers );
69+ Integer index = readIndex .incrementAndGet ();
6570 // send the message to the observer
66- observer .send (message );
71+ observer .send (constructMessage ( index ) );
6772 // keep track of the messages read and not acknowledged
68- messages .put (readIndex , true );
69- readIndex += 1 ;
73+ yetToBeAcked .put (index , true );
7074 }
7175 }
7276
7377 @ Override
7478 public void ack (AckRequest request ) {
7579 for (Offset offset : request .getOffsets ()) {
76- Long decoded_offset = Longs . fromByteArray (offset .getValue ());
80+ Integer decoded_offset = ByteBuffer . wrap (offset .getValue ()). getInt ( );
7781 // remove the acknowledged messages from the map
78- messages .remove (decoded_offset );
82+ yetToBeAcked .remove (decoded_offset );
83+ }
84+ }
85+
86+ @ Override
87+ public void nack (NackRequest request ) {
88+ // put them to nacked offsets so that they will be retried immediately.
89+ for (Offset offset : request .getOffsets ()) {
90+ Integer decoded_offset = ByteBuffer .wrap (offset .getValue ()).getInt ();
91+ yetToBeAcked .remove (decoded_offset );
92+ nacked .put (decoded_offset , true );
93+ readIndex .decrementAndGet ();
7994 }
8095 }
8196
8297 @ Override
8398 public long getPending () {
8499 // number of messages not acknowledged yet
85- return messages .size ();
100+ return yetToBeAcked .size ();
86101 }
87102
88103 @ Override
89104 public List <Integer > getPartitions () {
90105 return Sourcer .defaultPartitions ();
91106 }
107+
108+ private Message constructMessage (Integer readIndex ) {
109+ Map <String , String > headers = new HashMap <>();
110+ headers .put ("x-txn-id" , UUID .randomUUID ().toString ());
111+
112+ // create a message with increasing offset
113+ ByteBuffer buffer = ByteBuffer .allocate (Integer .BYTES );
114+ buffer .putInt (readIndex );
115+ Offset offset = new Offset (buffer .array ());
116+ return new Message (
117+ Integer .toString (readIndex ).getBytes (),
118+ offset ,
119+ Instant .now (),
120+ headers );
121+ }
92122}
0 commit comments