@@ -2136,6 +2136,146 @@ def router(msg: pulsar.Message, num_partitions: int):
21362136
21372137 client .close ()
21382138
2139+ def test_null_value_message (self ):
2140+ client = Client (self .serviceUrl )
2141+ topic = "null-value-%s" % uuid .uuid4 ()
2142+ producer = client .create_producer (topic , batching_enabled = False )
2143+ consumer = client .subscribe (topic , "sub" , initial_position = InitialPosition .Earliest )
2144+
2145+ producer .send (b"not null" , partition_key = "k1" )
2146+ producer .send (None , partition_key = "k2" )
2147+ producer .send (b"also not null" , partition_key = "k3" )
2148+
2149+ msg1 = consumer .receive (TM )
2150+ self .assertEqual (msg1 .data (), b"not null" )
2151+ self .assertFalse (msg1 .has_null_value ())
2152+
2153+ msg2 = consumer .receive (TM )
2154+ self .assertTrue (msg2 .has_null_value ())
2155+ self .assertEqual (msg2 .data (), b"" )
2156+
2157+ msg3 = consumer .receive (TM )
2158+ self .assertEqual (msg3 .data (), b"also not null" )
2159+ self .assertFalse (msg3 .has_null_value ())
2160+
2161+ consumer .close ()
2162+ producer .close ()
2163+ client .close ()
2164+
2165+ def test_null_value_vs_empty_bytes (self ):
2166+ client = Client (self .serviceUrl )
2167+ topic = "null-vs-empty-%s" % uuid .uuid4 ()
2168+ producer = client .create_producer (topic , batching_enabled = False )
2169+ consumer = client .subscribe (topic , "sub" , initial_position = InitialPosition .Earliest )
2170+
2171+ producer .send (b"" , partition_key = "k1" )
2172+ producer .send (None , partition_key = "k2" )
2173+
2174+ msg1 = consumer .receive (TM )
2175+ self .assertFalse (msg1 .has_null_value ())
2176+ self .assertEqual (msg1 .data (), b"" )
2177+
2178+ msg2 = consumer .receive (TM )
2179+ self .assertTrue (msg2 .has_null_value ())
2180+
2181+ consumer .close ()
2182+ producer .close ()
2183+ client .close ()
2184+
2185+ def test_null_value_compaction (self ):
2186+ client = Client (self .serviceUrl )
2187+ topic = "null-compact-%s" % uuid .uuid4 ()
2188+ producer = client .create_producer (topic , batching_enabled = False )
2189+
2190+ consumer = client .subscribe (topic , "my-sub1" , is_read_compacted = True )
2191+ consumer .close ()
2192+
2193+ # key1: value then tombstone -> removed after compaction
2194+ producer .send (b"hello-1" , partition_key = "key1" )
2195+ producer .send (None , partition_key = "key1" )
2196+
2197+ # key2: value only -> survives
2198+ producer .send (b"hello-2" , partition_key = "key2" )
2199+
2200+ # key3: value then tombstone -> removed
2201+ producer .send (b"hello-3" , partition_key = "key3" )
2202+ producer .send (None , partition_key = "key3" )
2203+
2204+ # key4: value only -> survives
2205+ producer .send (b"hello-4" , partition_key = "key4" )
2206+ producer .close ()
2207+
2208+ url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self .adminUrl , topic )
2209+ doHttpPut (url , "" )
2210+ while True :
2211+ s = doHttpGet (url ).decode ("utf-8" )
2212+ if "RUNNING" in s :
2213+ time .sleep (0.2 )
2214+ else :
2215+ self .assertTrue ("SUCCESS" in s )
2216+ break
2217+
2218+ # The compacted ledger cursor update is async on the broker side,
2219+ # wait for it to be persisted before reading.
2220+ time .sleep (1.0 )
2221+
2222+ consumer = client .subscribe (topic , "my-sub1" , is_read_compacted = True )
2223+ messages = []
2224+ while True :
2225+ try :
2226+ msg = consumer .receive (2000 )
2227+ messages .append (msg )
2228+ except pulsar .Timeout :
2229+ break
2230+
2231+ keys = [m .partition_key () for m in messages ]
2232+ self .assertIn ("key2" , keys )
2233+ self .assertIn ("key4" , keys )
2234+ self .assertNotIn ("key1" , keys )
2235+ self .assertNotIn ("key3" , keys )
2236+ self .assertEqual (len (messages ), 2 )
2237+
2238+ consumer .close ()
2239+ client .close ()
2240+
2241+ def test_null_value_table_view (self ):
2242+ client = Client (self .serviceUrl )
2243+ topic = "null-tv-%s" % uuid .uuid4 ()
2244+ producer = client .create_producer (topic , batching_enabled = False )
2245+
2246+ producer .send (b"hello" , partition_key = "key1" )
2247+
2248+ tv = client .create_table_view (topic )
2249+ self .assertEqual (tv .get ("key1" ), b"hello" )
2250+
2251+ producer .send (None , partition_key = "key1" )
2252+ for _ in range (50 ):
2253+ if tv .get ("key1" ) is None :
2254+ break
2255+ time .sleep (0.1 )
2256+ self .assertIsNone (tv .get ("key1" ))
2257+
2258+ tv .close ()
2259+ producer .close ()
2260+ client .close ()
2261+
2262+ def test_null_value_with_properties (self ):
2263+ client = Client (self .serviceUrl )
2264+ topic = "null-props-%s" % uuid .uuid4 ()
2265+ producer = client .create_producer (topic , batching_enabled = False )
2266+ consumer = client .subscribe (topic , "sub" , initial_position = InitialPosition .Earliest )
2267+
2268+ producer .send (None , partition_key = "k1" , properties = {"action" : "delete" })
2269+
2270+ msg = consumer .receive (TM )
2271+ self .assertTrue (msg .has_null_value ())
2272+ self .assertEqual (msg .properties (), {"action" : "delete" })
2273+ self .assertEqual (msg .partition_key (), "k1" )
2274+
2275+ consumer .close ()
2276+ producer .close ()
2277+ client .close ()
2278+
21392279
21402280if __name__ == "__main__" :
21412281 main ()
0 commit comments