Skip to content

test #5

@bzhaoopenstack

Description

@bzhaoopenstack
        PluginManager pluginManager = kubernetesJobManagerParameters.getPlugin();

        //get all factories
        Iterator<KubernetesStepDecorator> factoryIteratorSPI = ServiceLoader.load(KubernetesStepDecorator.class).iterator();
        Iterator<KubernetesStepDecorator> factoryIteratorPlugins =
                pluginManager != null
                ? pluginManager.load(KubernetesStepDecorator.class)
                        : Collections.emptyIterator();
        Iterator<KubernetesStepDecorator> factoryIterator = Iterators.concat(factoryIteratorPlugins, factoryIteratorSPI);
        while (factoryIterator.hasNext()) {
            try {
                KubernetesStepDecorator factory = factoryIterator.next();
                factory.mergePriority(kubernetesJobManagerParameters);
                String factoryClassName = factory.getClass().getName();
                KubernetesStepDecorator existingFactory = decoratorFactories.get(factoryClassName);
                if (existingFactory == null) {
                    decoratorFactories.put(factoryClassName, factory);
                    Collections.addAll(extendedStepDecorators, factory);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        ArrayList<KubernetesStepDecorator> stepDecorators = new ArrayList<>();
        Collections.addAll(stepDecorators,
                new InitJobManagerDecorator(kubernetesJobManagerParameters),
                new EnvSecretsDecorator(kubernetesJobManagerParameters),
                new MountSecretsDecorator(kubernetesJobManagerParameters),
                new CmdJobManagerDecorator(kubernetesJobManagerParameters),
                new InternalServiceDecorator(kubernetesJobManagerParameters),
                new ExternalServiceDecorator(kubernetesJobManagerParameters),
                new HadoopConfMountDecorator(kubernetesJobManagerParameters),
                new KerberosMountDecorator(kubernetesJobManagerParameters),
                new FlinkConfMountDecorator(kubernetesJobManagerParameters),
                new PodTemplateMountDecorator(kubernetesJobManagerParameters));

        stepDecorators.addAll(extendedStepDecorators);
        for (KubernetesStepDecorator stepDecorator : stepDecorators) {
            preAccompanyingResources.addAll(stepDecorator.buildPreAccompanyingKubernetesResources());
            flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
            accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
        }
        // Add all prepared AccompanyingResources to refresh owner reference
        accompanyingResources.addAll(preAccompanyingResources);

        final Deployment deployment =
                createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);

        return new KubernetesJobManagerSpecification(deployment, accompanyingResources,
                preAccompanyingResources);
    }
public class KubernetesJobManagerParameters extends AbstractKubernetesParameters {

    private final ClusterSpecification clusterSpecification;

    PluginManager pluginManager;
    public KubernetesJobManagerParameters(
            Configuration flinkConfig, ClusterSpecification clusterSpecification) {
        super(flinkConfig);
        this.clusterSpecification = checkNotNull(clusterSpecification);
        this.pluginManager =
                PluginUtils.createPluginManagerFromRootFolder(flinkConfig);
    }

    @Override
    public Map<String, String> getLabels() {
        final Map<String, String> labels = new HashMap<>();
        labels.putAll(
                flinkConfig
                        .getOptional(KubernetesConfigOptions.JOB_MANAGER_LABELS)
                        .orElse(Collections.emptyMap()));
        labels.putAll(getSelectors());
        return Collections.unmodifiableMap(labels);
    }

    public PluginManager getPlugin() {
        return this.pluginManager;
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions