@@ -2136,6 +2136,141 @@ def router(msg: pulsar.Message, num_partitions: int):
21362136
21372137 client .close ()
21382138
2139+ def test_null_value_message (self ):
2140+ client = Client (self .serviceUrl )
2141+ topic = "null-value-%s" % uuid .uuid4 ()
2142+ producer = client .create_producer (topic , batching_enabled = False )
2143+ consumer = client .subscribe (topic , "sub" , initial_position = InitialPosition .Earliest )
2144+
2145+ producer .send (b"not null" , partition_key = "k1" )
2146+ producer .send (None , partition_key = "k2" )
2147+ producer .send (b"also not null" , partition_key = "k3" )
2148+
2149+ msg1 = consumer .receive (TM )
2150+ self .assertEqual (msg1 .data (), b"not null" )
2151+ self .assertFalse (msg1 .has_null_value ())
2152+
2153+ msg2 = consumer .receive (TM )
2154+ self .assertTrue (msg2 .has_null_value ())
2155+ self .assertEqual (msg2 .data (), b"" )
2156+
2157+ msg3 = consumer .receive (TM )
2158+ self .assertEqual (msg3 .data (), b"also not null" )
2159+ self .assertFalse (msg3 .has_null_value ())
2160+
2161+ consumer .close ()
2162+ producer .close ()
2163+ client .close ()
2164+
2165+ def test_null_value_vs_empty_bytes (self ):
2166+ client = Client (self .serviceUrl )
2167+ topic = "null-vs-empty-%s" % uuid .uuid4 ()
2168+ producer = client .create_producer (topic , batching_enabled = False )
2169+ consumer = client .subscribe (topic , "sub" , initial_position = InitialPosition .Earliest )
2170+
2171+ producer .send (b"" , partition_key = "k1" )
2172+ producer .send (None , partition_key = "k2" )
2173+
2174+ msg1 = consumer .receive (TM )
2175+ self .assertFalse (msg1 .has_null_value ())
2176+ self .assertEqual (msg1 .data (), b"" )
2177+
2178+ msg2 = consumer .receive (TM )
2179+ self .assertTrue (msg2 .has_null_value ())
2180+
2181+ consumer .close ()
2182+ producer .close ()
2183+ client .close ()
2184+
2185+ def test_null_value_compaction (self ):
2186+ client = Client (self .serviceUrl )
2187+ topic = "null-compact-%s" % uuid .uuid4 ()
2188+ producer = client .create_producer (topic , batching_enabled = False )
2189+
2190+ consumer = client .subscribe (topic , "my-sub1" , is_read_compacted = True )
2191+ consumer .close ()
2192+
2193+ # key1: value then tombstone -> removed after compaction
2194+ producer .send (b"hello-1" , partition_key = "key1" )
2195+ producer .send (None , partition_key = "key1" )
2196+
2197+ # key2: value only -> survives
2198+ producer .send (b"hello-2" , partition_key = "key2" )
2199+
2200+ # key3: value then tombstone -> removed
2201+ producer .send (b"hello-3" , partition_key = "key3" )
2202+ producer .send (None , partition_key = "key3" )
2203+
2204+ # key4: value only -> survives
2205+ producer .send (b"hello-4" , partition_key = "key4" )
2206+ producer .close ()
2207+
2208+ url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self .adminUrl , topic )
2209+ doHttpPut (url , "" )
2210+ while True :
2211+ s = doHttpGet (url ).decode ("utf-8" )
2212+ if "RUNNING" in s :
2213+ time .sleep (0.2 )
2214+ else :
2215+ self .assertTrue ("SUCCESS" in s )
2216+ break
2217+ time .sleep (1.0 )
2218+
2219+ consumer = client .subscribe (topic , "my-sub1" , is_read_compacted = True )
2220+ messages = []
2221+ while True :
2222+ try :
2223+ msg = consumer .receive (2000 )
2224+ messages .append (msg )
2225+ except pulsar .Timeout :
2226+ break
2227+
2228+ keys = [m .partition_key () for m in messages ]
2229+ self .assertIn ("key2" , keys )
2230+ self .assertIn ("key4" , keys )
2231+ self .assertNotIn ("key1" , keys )
2232+ self .assertNotIn ("key3" , keys )
2233+ self .assertEqual (len (messages ), 2 )
2234+
2235+ consumer .close ()
2236+ client .close ()
2237+
2238+ def test_null_value_table_view (self ):
2239+ client = Client (self .serviceUrl )
2240+ topic = "null-tv-%s" % uuid .uuid4 ()
2241+ producer = client .create_producer (topic , batching_enabled = False )
2242+
2243+ producer .send (b"hello" , partition_key = "key1" )
2244+ time .sleep (0.5 )
2245+
2246+ tv = client .create_table_view (topic )
2247+ self .assertEqual (tv .get ("key1" ), b"hello" )
2248+
2249+ producer .send (None , partition_key = "key1" )
2250+ time .sleep (1.0 )
2251+
2252+ self .assertIsNone (tv .get ("key1" ))
2253+ tv .close ()
2254+ producer .close ()
2255+ client .close ()
2256+
2257+ def test_null_value_with_properties (self ):
2258+ client = Client (self .serviceUrl )
2259+ topic = "null-props-%s" % uuid .uuid4 ()
2260+ producer = client .create_producer (topic , batching_enabled = False )
2261+ consumer = client .subscribe (topic , "sub" , initial_position = InitialPosition .Earliest )
2262+
2263+ producer .send (None , partition_key = "k1" , properties = {"action" : "delete" })
2264+
2265+ msg = consumer .receive (TM )
2266+ self .assertTrue (msg .has_null_value ())
2267+ self .assertEqual (msg .properties (), {"action" : "delete" })
2268+ self .assertEqual (msg .partition_key (), "k1" )
2269+
2270+ consumer .close ()
2271+ producer .close ()
2272+ client .close ()
2273+
21392274
21402275if __name__ == "__main__" :
21412276 main ()
0 commit comments