Skip to content

Conversation

@featzhang
Copy link
Member

@featzhang featzhang commented Feb 12, 2026

What is the purpose of the change

This PR FLINK-39074 introduces a comprehensive AI inference framework for PyFlink, making it easy to perform machine learning inference on streaming data with automatic lifecycle management.

Currently, PyFlink users face significant challenges when implementing AI inference:

  • Manual model management (loading, initialization, cleanup)
  • Manual batching logic for efficient inference
  • Manual resource and concurrency control
  • Lack of lifecycle hooks for warmup and optimization

This PR addresses these pain points with a simple, declarative API.

Brief change log

  • Add DataStream.infer() method for easy AI inference
  • Implement ModelLifecycleManager for model loading and warmup
  • Implement BatchInferenceExecutor for efficient batch inference
  • Add support for multiple task types (embedding, classification, generation)
  • Integrate with HuggingFace Transformers ecosystem
  • Add comprehensive metrics tracking
  • Include examples and unit tests

Example Usage

# Text embedding
embeddings = data_stream.infer(
    model="sentence-transformers/all-MiniLM-L6-v2",
    input_col="text",
    output_col="embedding",
    batch_size=32,
    device="cuda:0"
)

# Sentiment classification
sentiments = data_stream.infer(
    model="distilbert-base-uncased-finetuned-sst-2-english",
    input_col="text",
    output_col="sentiment",
    task_type="classification"
)

Verifying this change

  • Added unit tests for all core components (config, lifecycle, executor, metrics, function)
  • Added integration tests
  • Added example programs demonstrating different use cases
  • Tested with multiple HuggingFace models
  • Verified resource cleanup and memory management

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency)
  • The public API (DataStream.infer() is a new public API)
  • The serializers
  • The runtime per-record code paths
  • Anything that affects deployment or recovery: JobManager, checkpointing, etc.
  • The S3 file system connector

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs/PyDocs and examples)

Additional Context

This is an initial MVP implementation focusing on core functionality. Future enhancements could include:

  • Integration with AsyncBatchFunction for true async batching
  • Support for additional model frameworks (TensorFlow, ONNX)
  • Model versioning and A/B testing
  • Advanced batching strategies (dynamic batch size)
  • Distributed model serving

The implementation follows Flink's design principles and integrates naturally with existing PyFlink APIs.

…er() API

This PR introduces a comprehensive AI inference framework for PyFlink, making it
easy to perform ML inference on streaming data with automatic lifecycle management.

Key Features:
- Simple infer() API for DataStream
- Automatic model lifecycle management (load, warmup, unload)
- Batch inference support with configurable batching
- Multi-task support (embedding, classification, generation)
- Resource optimization (GPU support, FP16, memory management)
- HuggingFace model integration
- Comprehensive metrics and monitoring

Example usage:
  embeddings = data_stream.infer(
      model="sentence-transformers/all-MiniLM-L6-v2",
      input_col="text",
      output_col="embedding",
      device="cuda:0"
  )

Components:
- ModelLifecycleManager: Handles model loading and warmup
- BatchInferenceExecutor: Executes batch inference with metrics
- InferenceFunction: MapFunction integration
- InferenceConfig: Configuration management
- InferenceMetrics: Performance tracking

This addresses the pain points of manual model management, batching,
and resource control in PyFlink AI applications.
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 12, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@featzhang featzhang changed the title [FLINK-XXXXX][PyFlink] Add AI inference lifecycle management with infer() API [FLINK-39074][PyFlink] Add AI inference lifecycle management with infer() API Feb 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants