Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ kafka-connector.yaml:
- from_.yaml
- to.yaml
- config-kafka-connector.yaml
- resetter_values.yaml
kafka-sink-connector.yaml: []
kafka-source-connector.yaml:
- from_-kafka-source-connector.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ kpops_components_fields:
- to
- config
- state
- resetter_namespace
- resetter_values
kafka-sink-connector:
- name
- enabled
Expand All @@ -45,8 +43,6 @@ kpops_components_fields:
- to
- config
- state
- resetter_namespace
- resetter_values
kafka-source-connector:
- name
- enabled
Expand All @@ -55,8 +51,6 @@ kpops_components_fields:
- to
- config
- state
- resetter_namespace
- resetter_values
- offset_topic
kubernetes-app:
- name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,16 @@ kafka-connector.yaml:
- from_.yaml
- to.yaml
- config-kafka-connector.yaml
- resetter_values.yaml
kafka-sink-connector.yaml:
- prefix.yaml
- from_.yaml
- to.yaml
- config-kafka-connector.yaml
- resetter_values.yaml
kafka-source-connector.yaml:
- prefix.yaml
- from_-kafka-source-connector.yaml
- to.yaml
- config-kafka-connector.yaml
- resetter_values.yaml
- offset_topic-kafka-source-connector.yaml
kubernetes-app.yaml:
- prefix.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,3 @@
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,3 @@
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
# offset.storage.topic
# https://kafka.apache.org/documentation/#connect_running
offset_topic: offset_topic
8 changes: 0 additions & 8 deletions docs/docs/resources/pipeline-components/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
# Kafka source connector
- type: kafka-source-connector # required
name: kafka-source-connector # required
Expand Down Expand Up @@ -281,10 +277,6 @@
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
# offset.storage.topic
# https://kafka.apache.org/documentation/#connect_running
offset_topic: offset_topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,3 @@ kafka-connector:
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
4 changes: 0 additions & 4 deletions docs/docs/resources/pipeline-defaults/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ kafka-connector:
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
# Kafka sink connector
#
# Child of: KafkaConnector
Expand Down
51 changes: 0 additions & 51 deletions docs/docs/schema/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -1371,23 +1371,6 @@
"title": "Prefix",
"type": "string"
},
"resetter_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed",
"title": "Resetter Namespace"
},
"resetter_values": {
"$ref": "#/$defs/HelmAppValues",
"description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc."
},
"state": {
"anyOf": [
{
Expand Down Expand Up @@ -1522,23 +1505,6 @@
"title": "Prefix",
"type": "string"
},
"resetter_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed",
"title": "Resetter Namespace"
},
"resetter_values": {
"$ref": "#/$defs/HelmAppValues",
"description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc."
},
"state": {
"anyOf": [
{
Expand Down Expand Up @@ -1628,23 +1594,6 @@
"title": "Prefix",
"type": "string"
},
"resetter_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed",
"title": "Resetter Namespace"
},
"resetter_values": {
"$ref": "#/$defs/HelmAppValues",
"description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc."
},
"state": {
"anyOf": [
{
Expand Down
34 changes: 0 additions & 34 deletions docs/docs/schema/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -1327,23 +1327,6 @@
"title": "Prefix",
"type": "string"
},
"resetter_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed",
"title": "Resetter Namespace"
},
"resetter_values": {
"$ref": "#/$defs/HelmAppValues",
"description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc."
},
"state": {
"anyOf": [
{
Expand Down Expand Up @@ -1433,23 +1416,6 @@
"title": "Prefix",
"type": "string"
},
"resetter_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed",
"title": "Resetter Namespace"
},
"resetter_values": {
"$ref": "#/$defs/HelmAppValues",
"description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc."
},
"state": {
"anyOf": [
{
Expand Down
16 changes: 16 additions & 0 deletions kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@
response = await self._client.post(
"/connectors", json=payload.model_dump(exclude_none=True)
)
if response.status_code == httpx.codes.CREATED:

Check warning on line 66 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to False since the types "int" and "tuple[Literal[201], Literal['Created']]" have no overlap (reportUnnecessaryComparison)
log.info(f"Connector {connector_config.name} created.")
log.debug(response.json())
return ConnectorResponse.model_validate(response.json())
if response.status_code == httpx.codes.CONFLICT:

Check warning on line 70 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to False since the types "int" and "tuple[Literal[409], Literal['Conflict']]" have no overlap (reportUnnecessaryComparison)
log.warning(
"Rebalancing in progress while creating a connector... Retrying..."
)
Expand All @@ -83,12 +83,12 @@
:return: Information about the connector.
"""
response = await self._client.get(f"/connectors/{connector_name}")
if response.status_code == httpx.codes.OK:

Check warning on line 86 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to False since the types "int" and "tuple[Literal[200], Literal['OK']]" have no overlap (reportUnnecessaryComparison)
log.debug(response.json())
return ConnectorResponse.model_validate(response.json())
if response.status_code == httpx.codes.NOT_FOUND:

Check warning on line 89 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to False since the types "int" and "tuple[Literal[404], Literal['Not Found']]" have no overlap (reportUnnecessaryComparison)
raise ConnectorNotFoundException
if response.status_code == httpx.codes.CONFLICT:

Check warning on line 91 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to False since the types "int" and "tuple[Literal[409], Literal['Conflict']]" have no overlap (reportUnnecessaryComparison)
log.warning(
"Rebalancing in progress while getting a connector... Retrying..."
)
Expand All @@ -106,10 +106,10 @@
:return: Status of the connector.
"""
response = await self._client.get(f"/connectors/{connector_name}/status")
if response.status_code == httpx.codes.OK:

Check warning on line 109 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to False since the types "int" and "tuple[Literal[200], Literal['OK']]" have no overlap (reportUnnecessaryComparison)
log.debug(response.json())
return ConnectorStatusResponse.model_validate(response.json())
if response.status_code == httpx.codes.NOT_FOUND:

Check warning on line 112 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to False since the types "int" and "tuple[Literal[404], Literal['Not Found']]" have no overlap (reportUnnecessaryComparison)
raise ConnectorNotFoundException
raise KafkaConnectError(response)

Expand All @@ -120,7 +120,7 @@
:param connector_name: Name of the connector
"""
response = await self._client.put(f"/connectors/{connector_name}/pause")
if response.status_code != httpx.codes.ACCEPTED:

Check warning on line 123 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to True since the types "int" and "tuple[Literal[202], Literal['Accepted']]" have no overlap (reportUnnecessaryComparison)
raise KafkaConnectError(response)
log.info(f"Connector {connector_name} paused.")

Expand All @@ -131,7 +131,7 @@
:param connector_name: Name of the connector
"""
response = await self._client.put(f"/connectors/{connector_name}/resume")
if response.status_code != httpx.codes.ACCEPTED:

Check warning on line 134 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to True since the types "int" and "tuple[Literal[202], Literal['Accepted']]" have no overlap (reportUnnecessaryComparison)
raise KafkaConnectError(response)
log.info(f"Connector {connector_name} resumed.")

Expand All @@ -142,7 +142,7 @@
:param connector_name: Name of the connector
"""
response = await self._client.put(f"/connectors/{connector_name}/stop")
if response.status_code != httpx.codes.NO_CONTENT:

Check warning on line 145 in kpops/component_handlers/kafka_connect/connect_wrapper.py

View workflow job for this annotation

GitHub Actions / Lint (ubuntu-24.04, 3.11)

Condition will always evaluate to True since the types "int" and "tuple[Literal[204], Literal['No Content']]" have no overlap (reportUnnecessaryComparison)
raise KafkaConnectError(response)
log.info(f"Connector {connector_name} stopped.")

Expand Down Expand Up @@ -232,3 +232,19 @@
await asyncio.sleep(1)
return await self.delete_connector(connector_name)
raise KafkaConnectError(response)

async def reset_offset(self, connector_name: str) -> None:
"""Reset the offsets for a connector; the connector must exist, and must be in the STOPPED state.

API Reference:
https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-connector-offsets
:param connector_name: Configuration parameters for the connector.
:raises ConnectorNotFoundException: Connector not found
"""
response = await self._client.delete(f"/connectors/{connector_name}/offsets")
if response.status_code == httpx.codes.OK:
log.info(f"Connector {connector_name} offsets reset.")
return
if response.status_code == httpx.codes.NOT_FOUND:
raise ConnectorNotFoundException
raise KafkaConnectError(response)
18 changes: 18 additions & 0 deletions kpops/component_handlers/kafka_connect/kafka_connect_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ async def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None
f"Connector Destruction: the connector {connector_name} does not exist. Skipping."
)

async def reset_connector(self, connector_name: str, *, dry_run: bool) -> None:
"""Reset connector offsets.

:param connector_name: The connector name.
:param dry_run: Whether the connector reset should be run in dry run mode.
"""
if dry_run:
pass # TODO
else:
try:
await self._connect_wrapper.stop_connector(connector_name)
await self._connect_wrapper.reset_offset(connector_name)

except ConnectorNotFoundException:
log.warning(
f"Connector reset: the connector {connector_name} does not exist. Skipping."
)

async def __dry_run_connector_creation(
self,
connector_config: KafkaConnectorConfig,
Expand Down
Loading
Loading