@@ -361,15 +361,16 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
361361 if (input.header == nullptr ) {
362362 continue ;
363363 }
364- auto dh = o2::header::get<DataHeader*>(input.header );
364+ auto const * dh = DataRefUtils::getHeader<DataHeader*>(input);
365+ auto payloadSize = DataRefUtils::getPayloadSize (input);
365366 if (dh->serialization != o2::header::gSerializationMethodArrow ) {
366367 LOGP (debug, " Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory" , dh->dataOrigin , dh->dataDescription );
367368 continue ;
368369 }
369370 auto dph = o2::header::get<DataProcessingHeader*>(input.header );
370371 bool forwarded = false ;
371372 for (auto const & forward : ctx.services ().get <DeviceSpec const >().forwards ) {
372- if (DataSpecUtils::match (forward.matcher , dh-> dataOrigin , dh-> dataDescription , dh-> subSpecification )) {
373+ if (DataSpecUtils::match (forward.matcher , *dh )) {
373374 forwarded = true ;
374375 break ;
375376 }
@@ -378,8 +379,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
378379 LOGP (debug, " Message {}/{} is forwarded so we are not returning its memory." , dh->dataOrigin , dh->dataDescription );
379380 continue ;
380381 }
381- LOGP (debug, " Message {}/{} is being deleted. We will return {}MB." , dh->dataOrigin , dh->dataDescription , dh-> payloadSize / 1000000 .);
382- totalBytes += dh-> payloadSize ;
382+ LOGP (debug, " Message {}/{} is being deleted. We will return {}MB." , dh->dataOrigin , dh->dataDescription , payloadSize / 1000000 .);
383+ totalBytes += payloadSize;
383384 totalMessages += 1 ;
384385 }
385386 arrow->updateBytesDestroyed (totalBytes);
0 commit comments