PluginManager pluginManager = kubernetesJobManagerParameters.getPlugin();
//get all factories
Iterator<KubernetesStepDecorator> factoryIteratorSPI = ServiceLoader.load(KubernetesStepDecorator.class).iterator();
Iterator<KubernetesStepDecorator> factoryIteratorPlugins =
pluginManager != null
? pluginManager.load(KubernetesStepDecorator.class)
: Collections.emptyIterator();
Iterator<KubernetesStepDecorator> factoryIterator = Iterators.concat(factoryIteratorPlugins, factoryIteratorSPI);
while (factoryIterator.hasNext()) {
try {
KubernetesStepDecorator factory = factoryIterator.next();
factory.mergePriority(kubernetesJobManagerParameters);
String factoryClassName = factory.getClass().getName();
KubernetesStepDecorator existingFactory = decoratorFactories.get(factoryClassName);
if (existingFactory == null) {
decoratorFactories.put(factoryClassName, factory);
Collections.addAll(extendedStepDecorators, factory);
}
} catch (Exception e) {
e.printStackTrace();
}
}
ArrayList<KubernetesStepDecorator> stepDecorators = new ArrayList<>();
Collections.addAll(stepDecorators,
new InitJobManagerDecorator(kubernetesJobManagerParameters),
new EnvSecretsDecorator(kubernetesJobManagerParameters),
new MountSecretsDecorator(kubernetesJobManagerParameters),
new CmdJobManagerDecorator(kubernetesJobManagerParameters),
new InternalServiceDecorator(kubernetesJobManagerParameters),
new ExternalServiceDecorator(kubernetesJobManagerParameters),
new HadoopConfMountDecorator(kubernetesJobManagerParameters),
new KerberosMountDecorator(kubernetesJobManagerParameters),
new FlinkConfMountDecorator(kubernetesJobManagerParameters),
new PodTemplateMountDecorator(kubernetesJobManagerParameters));
stepDecorators.addAll(extendedStepDecorators);
for (KubernetesStepDecorator stepDecorator : stepDecorators) {
preAccompanyingResources.addAll(stepDecorator.buildPreAccompanyingKubernetesResources());
flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
}
// Add all prepared AccompanyingResources to refresh owner reference
accompanyingResources.addAll(preAccompanyingResources);
final Deployment deployment =
createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);
return new KubernetesJobManagerSpecification(deployment, accompanyingResources,
preAccompanyingResources);
}
public class KubernetesJobManagerParameters extends AbstractKubernetesParameters {
private final ClusterSpecification clusterSpecification;
PluginManager pluginManager;
public KubernetesJobManagerParameters(
Configuration flinkConfig, ClusterSpecification clusterSpecification) {
super(flinkConfig);
this.clusterSpecification = checkNotNull(clusterSpecification);
this.pluginManager =
PluginUtils.createPluginManagerFromRootFolder(flinkConfig);
}
@Override
public Map<String, String> getLabels() {
final Map<String, String> labels = new HashMap<>();
labels.putAll(
flinkConfig
.getOptional(KubernetesConfigOptions.JOB_MANAGER_LABELS)
.orElse(Collections.emptyMap()));
labels.putAll(getSelectors());
return Collections.unmodifiableMap(labels);
}
public PluginManager getPlugin() {
return this.pluginManager;
}