1515#include < fstream>
1616#include < stdexcept>
1717#include " ../../../../SDK/components/utilities/include/sample_log.h"
18+ #include " thread_safe_list.h"
1819
1920using namespace StackFlows ;
2021
2122int main_exit_flage = 0 ;
2223static void __sigint (int iSigNo)
2324{
24- SLOGW (" llm_sys will be exit!" );
25+ SLOGW (" llm_vlm will be exit!" );
2526 main_exit_flage = 1 ;
2627}
2728
2829static std::string base_model_path_;
2930static std::string base_model_config_path_;
3031
32+ typedef struct {
33+ cv::Mat inference_src;
34+ bool inference_bgr2rgb;
35+ } inference_async_par;
36+
3137typedef std::function<void (const std::string &data, bool finish)> task_callback_t ;
3238
3339#define CONFIG_AUTO_SET (obj, key ) \
@@ -56,6 +62,8 @@ class llm_task {
5662 task_callback_t out_callback_;
5763 bool enoutput_;
5864 bool enstream_;
65+ bool encamera_;
66+ thread_safe::list<inference_async_par> async_list_;
5967
6068 void set_output (task_callback_t out_callback)
6169 {
@@ -222,10 +230,46 @@ class llm_task {
222230 return oss_prompt.str ();
223231 }
224232
233+ int inference_async (cv::Mat &src, bool bgr2rgb = true )
234+ {
235+ if (async_list_.size () < 1 ) {
236+ inference_async_par par;
237+ par.inference_src = src.clone ();
238+ par.inference_bgr2rgb = bgr2rgb;
239+ async_list_.put (par);
240+ }
241+ return async_list_.size ();
242+ }
243+
244+ bool inference_raw_yuv (const std::string &msg)
245+ {
246+ if (msg.size () != 320 * 320 * 2 ) {
247+ throw std::string (" img size error" );
248+ }
249+ cv::Mat camera_data (320 , 320 , CV_8UC2, (void *)msg.data ());
250+ cv::Mat rgb;
251+ cv::cvtColor (camera_data, rgb, cv::COLOR_YUV2RGB_YUYV);
252+ return inference_async (rgb, true ) ? false : true ;
253+ }
254+
225255 void inference (const std::string &msg)
226256 {
227257 try {
228- if (image_data_.empty ()) {
258+ if (encamera_) {
259+ inference_async_par par;
260+ async_list_.get (); // discard buffered frames
261+ par = async_list_.get ();
262+ if (par.inference_src .empty ()) return ;
263+ if (par.inference_bgr2rgb ) {
264+ cv::Mat rgb;
265+ cv::cvtColor (par.inference_src , rgb, cv::COLOR_BGR2RGB);
266+ par.inference_src = rgb;
267+ }
268+ lLaMa_->Encode (par.inference_src , img_embed);
269+ lLaMa_->Encode (img_embed, prompt_data_, prompt_complete (msg));
270+ std::string out = lLaMa_->Run (prompt_data_);
271+ if (out_callback_) out_callback_ (out, true );
272+ } else if (image_data_.empty ()) {
229273 lLaMa_->Encode (prompt_data_, prompt_complete (msg));
230274 std::string out = lLaMa_->Run (prompt_data_);
231275 if (out_callback_) out_callback_ (out, true );
@@ -302,13 +346,13 @@ std::atomic<unsigned int> llm_task::next_port_{8090};
302346
303347#undef CONFIG_AUTO_SET
304348
305- class llm_llm : public StackFlow {
349+ class llm_vlm : public StackFlow {
306350private:
307351 int task_count_;
308352 std::unordered_map<int , std::shared_ptr<llm_task>> llm_task_;
309353
310354public:
311- llm_llm () : StackFlow(" vlm" )
355+ llm_vlm () : StackFlow(" vlm" )
312356 {
313357 task_count_ = 2 ;
314358 }
@@ -447,6 +491,23 @@ class llm_llm : public StackFlow {
447491 llm_task_obj->lLaMa_ ->Stop ();
448492 }
449493
494+ void task_camera_data (const std::weak_ptr<llm_task> llm_task_obj_weak,
495+ const std::weak_ptr<llm_channel_obj> llm_channel_weak, const std::string &data)
496+ {
497+ nlohmann::json error_body;
498+ auto llm_task_obj = llm_task_obj_weak.lock ();
499+ auto llm_channel = llm_channel_weak.lock ();
500+ if (!(llm_task_obj && llm_channel)) {
501+ SLOGE (" Model run failed." );
502+ return ;
503+ }
504+ try {
505+ llm_task_obj->inference_raw_yuv (data);
506+ } catch (...) {
507+ SLOGE (" data format error" );
508+ }
509+ }
510+
450511 int setup (const std::string &work_id, const std::string &object, const std::string &data) override
451512 {
452513 nlohmann::json error_body;
@@ -476,26 +537,38 @@ class llm_llm : public StackFlow {
476537 llm_channel->set_output (llm_task_obj->enoutput_ );
477538 llm_channel->set_stream (llm_task_obj->enstream_ );
478539
479- llm_task_obj->set_output (std::bind (&llm_llm ::task_output, this , std::weak_ptr<llm_task>(llm_task_obj),
540+ llm_task_obj->set_output (std::bind (&llm_vlm ::task_output, this , std::weak_ptr<llm_task>(llm_task_obj),
480541 std::weak_ptr<llm_channel_obj>(llm_channel), std::placeholders::_1,
481542 std::placeholders::_2));
482543
483544 for (const auto input : llm_task_obj->inputs_ ) {
484545 if (input.find (" vlm" ) != std::string::npos) {
485546 llm_channel->subscriber_work_id (
486- " " , std::bind (&llm_llm ::task_user_data, this , std::weak_ptr<llm_task>(llm_task_obj),
547+ " " , std::bind (&llm_vlm ::task_user_data, this , std::weak_ptr<llm_task>(llm_task_obj),
487548 std::weak_ptr<llm_channel_obj>(llm_channel), std::placeholders::_1,
488549 std::placeholders::_2));
489550 } else if (input.find (" asr" ) != std::string::npos) {
490551 llm_channel->subscriber_work_id (
491- input, std::bind (&llm_llm ::task_asr_data, this , std::weak_ptr<llm_task>(llm_task_obj),
552+ input, std::bind (&llm_vlm ::task_asr_data, this , std::weak_ptr<llm_task>(llm_task_obj),
492553 std::weak_ptr<llm_channel_obj>(llm_channel), std::placeholders::_1,
493554 std::placeholders::_2));
494555 } else if (input.find (" kws" ) != std::string::npos) {
495556 llm_channel->subscriber_work_id (
496- input, std::bind (&llm_llm ::kws_awake, this , std::weak_ptr<llm_task>(llm_task_obj),
557+ input, std::bind (&llm_vlm ::kws_awake, this , std::weak_ptr<llm_task>(llm_task_obj),
497558 std::weak_ptr<llm_channel_obj>(llm_channel), std::placeholders::_1,
498559 std::placeholders::_2));
560+ } else if (input.find (" camera" ) != std::string::npos) {
561+ llm_task_obj->encamera_ = true ;
562+ std::string input_url_name = input + " .out_port" ;
563+ std::string input_url = unit_call (" sys" , " sql_select" , input_url_name);
564+ if (!input_url.empty ()) {
565+ std::weak_ptr<llm_task> _llm_task_obj = llm_task_obj;
566+ std::weak_ptr<llm_channel_obj> _llm_channel = llm_channel;
567+ llm_channel->subscriber (input_url, [this , _llm_task_obj, _llm_channel](
568+ pzmq *_pzmq, const std::shared_ptr<pzmq_data> &raw) {
569+ this ->task_camera_data (_llm_task_obj, _llm_channel, raw->string ());
570+ });
571+ }
499572 }
500573 }
501574 llm_task_[work_id_num] = llm_task_obj;
@@ -513,7 +586,7 @@ class llm_llm : public StackFlow {
513586
514587 void link (const std::string &work_id, const std::string &object, const std::string &data) override
515588 {
516- SLOGI (" llm_llm ::link:%s" , data.c_str ());
589+ SLOGI (" llm_vlm ::link:%s" , data.c_str ());
517590 int ret = 1 ;
518591 nlohmann::json error_body;
519592 int work_id_num = sample_get_work_id_num (work_id);
@@ -528,15 +601,28 @@ class llm_llm : public StackFlow {
528601 if (data.find (" asr" ) != std::string::npos) {
529602 ret = llm_channel->subscriber_work_id (
530603 data,
531- std::bind (&llm_llm ::task_asr_data, this , std::weak_ptr<llm_task>(llm_task_obj),
604+ std::bind (&llm_vlm ::task_asr_data, this , std::weak_ptr<llm_task>(llm_task_obj),
532605 std::weak_ptr<llm_channel_obj>(llm_channel), std::placeholders::_1, std::placeholders::_2));
533606 llm_task_obj->inputs_ .push_back (data);
534607 } else if (data.find (" kws" ) != std::string::npos) {
535608 ret = llm_channel->subscriber_work_id (
536609 data,
537- std::bind (&llm_llm ::kws_awake, this , std::weak_ptr<llm_task>(llm_task_obj),
610+ std::bind (&llm_vlm ::kws_awake, this , std::weak_ptr<llm_task>(llm_task_obj),
538611 std::weak_ptr<llm_channel_obj>(llm_channel), std::placeholders::_1, std::placeholders::_2));
539612 llm_task_obj->inputs_ .push_back (data);
613+ } else if (data.find (" camera" ) != std::string::npos) {
614+ llm_task_obj->encamera_ = true ;
615+ std::string input_url_name = data + " .out_port" ;
616+ std::string input_url = unit_call (" sys" , " sql_select" , input_url_name);
617+ if (!input_url.empty ()) {
618+ std::weak_ptr<llm_task> _llm_task_obj = llm_task_obj;
619+ std::weak_ptr<llm_channel_obj> _llm_channel = llm_channel;
620+ llm_channel->subscriber (
621+ input_url, [this , _llm_task_obj, _llm_channel](pzmq *_pzmq, const std::shared_ptr<pzmq_data> &raw) {
622+ this ->task_camera_data (_llm_task_obj, _llm_channel, raw->string ());
623+ });
624+ }
625+ llm_task_obj->inputs_ .push_back (data);
540626 }
541627 if (ret) {
542628 error_body[" code" ] = -20 ;
@@ -550,7 +636,7 @@ class llm_llm : public StackFlow {
550636
551637 void unlink (const std::string &work_id, const std::string &object, const std::string &data) override
552638 {
553- SLOGI (" llm_llm ::unlink:%s" , data.c_str ());
639+ SLOGI (" llm_vlm ::unlink:%s" , data.c_str ());
554640 int ret = 0 ;
555641 nlohmann::json error_body;
556642 int work_id_num = sample_get_work_id_num (work_id);
@@ -575,7 +661,7 @@ class llm_llm : public StackFlow {
575661
576662 void taskinfo (const std::string &work_id, const std::string &object, const std::string &data) override
577663 {
578- SLOGI (" llm_llm ::taskinfo:%s" , data.c_str ());
664+ SLOGI (" llm_vlm ::taskinfo:%s" , data.c_str ());
579665 nlohmann::json req_body;
580666 int work_id_num = sample_get_work_id_num (work_id);
581667 if (WORK_ID_NONE == work_id_num) {
@@ -602,7 +688,7 @@ class llm_llm : public StackFlow {
602688
603689 int exit (const std::string &work_id, const std::string &object, const std::string &data) override
604690 {
605- SLOGI (" llm_llm ::exit:%s" , data.c_str ());
691+ SLOGI (" llm_vlm ::exit:%s" , data.c_str ());
606692
607693 nlohmann::json error_body;
608694 int work_id_num = sample_get_work_id_num (work_id);
@@ -621,7 +707,7 @@ class llm_llm : public StackFlow {
621707 return 0 ;
622708 }
623709
624- ~llm_llm ()
710+ ~llm_vlm ()
625711 {
626712 while (1 ) {
627713 auto iteam = llm_task_.begin ();
@@ -641,7 +727,7 @@ int main(int argc, char *argv[])
641727 signal (SIGTERM, __sigint);
642728 signal (SIGINT, __sigint);
643729 mkdir (" /tmp/llm" , 0777 );
644- llm_llm llm;
730+ llm_vlm llm;
645731 while (!main_exit_flage) {
646732 sleep (1 );
647733 }
0 commit comments