Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 140 additions & 124 deletions api/import_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import json
import logging
import typing
from collections.abc import Iterator
from dataclasses import dataclass
from tempfile import TemporaryFile
from typing import TYPE_CHECKING

import boto3

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from django.core import serializers
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models import F, Model, Q
Expand All @@ -25,6 +29,7 @@
MultivariateFeatureStateValue,
)
from features.versioning.models import EnvironmentFeatureVersion
from import_export.utils import S3MultipartUploadWriter
from integrations.datadog.models import DataDogConfiguration
from integrations.heap.models import HeapConfiguration
from integrations.mixpanel.models import MixpanelConfiguration
Expand Down Expand Up @@ -53,8 +58,8 @@


class S3OrganisationExporter:
def __init__(self, s3_client=None): # type: ignore[no-untyped-def]
self.s3_client = s3_client or boto3.client("s3")
def __init__(self, s3_client: "S3Client | None" = None) -> None:
self.s3_client: "S3Client" = s3_client or boto3.client("s3")

def export_to_s3(
self,
Expand All @@ -63,43 +68,51 @@ def export_to_s3(
key: str,
) -> None:
data = full_export(organisation_id)
logger.debug("Got data export for organisation.")
logger.debug("Starting streaming export for organisation.")

file = TemporaryFile()
file.write(json.dumps(data, cls=DjangoJSONEncoder).encode("utf-8"))
file.seek(0)
logger.debug("Wrote data export to temporary file.")
with S3MultipartUploadWriter(self.s3_client, bucket_name, key) as writer:
writer.write(b"[")
first = True
for item in data:
if not first:
writer.write(b",")
first = False
writer.write(json.dumps(item, cls=DjangoJSONEncoder).encode("utf-8"))
writer.write(b"]")

self.s3_client.upload_fileobj(file, bucket_name, key)
logger.info("Finished writing data export to s3.")
logger.info("Finished streaming data export to S3.")


def full_export(organisation_id: int) -> typing.List[dict]: # type: ignore[type-arg]
return [
*export_organisation(organisation_id),
*export_projects(organisation_id),
*export_environments(organisation_id),
*export_identities(organisation_id),
*export_features(organisation_id),
*export_metadata(organisation_id),
*export_edge_identities(organisation_id),
]
def full_export(
organisation_id: int,
) -> Iterator[dict[str, typing.Any]]:
yield from export_organisation(organisation_id)
yield from export_projects(organisation_id)
yield from export_environments(organisation_id)
yield from export_identities(organisation_id)
yield from export_features(organisation_id)
yield from export_metadata(organisation_id)
yield from export_edge_identities(organisation_id)


def export_organisation(organisation_id: int) -> typing.List[dict]: # type: ignore[type-arg]
def export_organisation(
organisation_id: int,
) -> Iterator[dict[str, typing.Any]]:
"""
Serialize an organisation and all its related objects.
"""
return _export_entities(
yield from _export_entities(
_EntityExportConfig(Organisation, Q(id=organisation_id)),
_EntityExportConfig(InviteLink, Q(organisation__id=organisation_id)),
_EntityExportConfig(OrganisationWebhook, Q(organisation__id=organisation_id)),
_EntityExportConfig(Subscription, Q(organisation__id=organisation_id)),
)


def export_metadata(organisation_id: int) -> typing.List[dict]: # type: ignore[type-arg]
return _export_entities(
def export_metadata(
organisation_id: int,
) -> Iterator[dict[str, typing.Any]]:
yield from _export_entities(
_EntityExportConfig(MetadataField, Q(organisation__id=organisation_id)),
_EntityExportConfig(
MetadataModelField, Q(field__organisation__id=organisation_id)
Expand All @@ -116,56 +129,55 @@ def export_metadata(organisation_id: int) -> typing.List[dict]: # type: ignore[

def export_projects(
organisation_id: int,
) -> typing.List[dict]: # type: ignore[type-arg]
) -> Iterator[dict[str, typing.Any]]:
default_filter = Q(project__organisation__id=organisation_id)

exported_projects = _export_entities(
for project in _export_entities(
_EntityExportConfig(Project, Q(organisation__id=organisation_id)),
)
for project in exported_projects:
):
project["fields"]["enable_dynamo_db"] = False
yield project

return [
*exported_projects,
*_export_entities(
_EntityExportConfig(
Segment,
Q(project__organisation__id=organisation_id, id=F("version_of")),
),
_EntityExportConfig(
SegmentRule,
Q(
segment__project__organisation__id=organisation_id,
segment_id=F("segment__version_of"),
)
| Q(
rule__segment__project__organisation__id=organisation_id,
rule__segment_id=F("rule__segment__version_of"),
),
yield from _export_entities(
_EntityExportConfig(
Segment,
Q(project__organisation__id=organisation_id, id=F("version_of")),
),
_EntityExportConfig(
SegmentRule,
Q(
segment__project__organisation__id=organisation_id,
segment_id=F("segment__version_of"),
)
| Q(
rule__segment__project__organisation__id=organisation_id,
rule__segment_id=F("rule__segment__version_of"),
),
_EntityExportConfig(
Condition,
Q(
rule__segment__project__organisation__id=organisation_id,
rule__segment_id=F("rule__segment__version_of"),
)
| Q(
rule__rule__segment__project__organisation__id=organisation_id,
rule__rule__segment_id=F("rule__rule__segment__version_of"),
),
),
_EntityExportConfig(
Condition,
Q(
rule__segment__project__organisation__id=organisation_id,
rule__segment_id=F("rule__segment__version_of"),
)
| Q(
rule__rule__segment__project__organisation__id=organisation_id,
rule__rule__segment_id=F("rule__rule__segment__version_of"),
),
_EntityExportConfig(Tag, default_filter),
_EntityExportConfig(DataDogConfiguration, default_filter),
_EntityExportConfig(NewRelicConfiguration, default_filter),
_EntityExportConfig(SlackConfiguration, default_filter),
),
]
_EntityExportConfig(Tag, default_filter),
_EntityExportConfig(DataDogConfiguration, default_filter),
_EntityExportConfig(NewRelicConfiguration, default_filter),
_EntityExportConfig(SlackConfiguration, default_filter),
)


def export_environments(organisation_id: int) -> typing.List[dict]: # type: ignore[type-arg]
def export_environments(
organisation_id: int,
) -> Iterator[dict[str, typing.Any]]:
default_filter = Q(environment__project__organisation__id=organisation_id)

return _export_entities(
yield from _export_entities(
_EntityExportConfig(Environment, Q(project__organisation__id=organisation_id)),
_EntityExportConfig(EnvironmentAPIKey, default_filter),
_EntityExportConfig(Webhook, default_filter),
Expand All @@ -178,18 +190,26 @@ def export_environments(organisation_id: int) -> typing.List[dict]: # type: ign
)


def export_identities(organisation_id: int) -> typing.List[dict]: # type: ignore[type-arg]
traits = _export_entities(
_EntityExportConfig(
Trait,
Q(
identity__environment__project__organisation__id=organisation_id,
identity__environment__project__enable_dynamo_db=False,
def export_identities(
organisation_id: int,
) -> Iterator[dict[str, typing.Any]]:
# We export the traits first so that we take a 'snapshot' before exporting the
# identities, otherwise we end up with issues where new traits are created for new
# identities during the export process and the identity doesn't exist in the import.
# We then need to reverse the order so that the identities are imported first.
traits = list(
_export_entities(
_EntityExportConfig(
Trait,
Q(
identity__environment__project__organisation__id=organisation_id,
identity__environment__project__enable_dynamo_db=False,
),
),
),
)
)

identities = _export_entities(
yield from _export_entities(
_EntityExportConfig(
Identity,
Q(
Expand All @@ -199,36 +219,33 @@ def export_identities(organisation_id: int) -> typing.List[dict]: # type: ignor
),
)

# We export the traits first so that we take a 'snapshot' before exporting the
# identities, otherwise we end up with issues where new traits are created for new
# identities during the export process and the identity doesn't exist in the import.
# We then need to reverse the order so that the identities are imported first.
return [*identities, *traits]
yield from traits


def export_edge_identities(organisation_id: int) -> typing.List[dict]: # type: ignore[type-arg]
identities = []
traits = []
identity_overrides = []
def export_edge_identities(
organisation_id: int,
) -> Iterator[dict[str, typing.Any]]:
for environment in Environment.objects.filter(
project__organisation__id=organisation_id, project__enable_dynamo_db=True
):
exported_identities, exported_traits, exported_overrides = (
export_edge_identity_and_overrides(environment.api_key)
)
identities.extend(exported_identities)
traits.extend(exported_traits)
identity_overrides.extend(exported_overrides)

return [*identities, *traits, *identity_overrides]
yield from exported_identities
yield from exported_traits
yield from exported_overrides


def export_features(organisation_id: int) -> typing.List[dict]: # type: ignore[type-arg]
def export_features(
organisation_id: int,
) -> Iterator[dict[str, typing.Any]]:
"""
Export all features and related entities, except ChangeRequests.
"""

feature_states = []
# Buffer feature states because we need to modify them (remove change_request FK)
# and they need to be imported after Feature, EnvironmentFeatureVersion, etc.
feature_states: list[dict[str, typing.Any]] = []
for feature_state in _export_entities(
_EntityExportConfig(
FeatureState, Q(feature__project__organisation__id=organisation_id)
Expand All @@ -240,38 +257,39 @@ def export_features(organisation_id: int) -> typing.List[dict]: # type: ignore[
feature_state["fields"]["change_request"] = None
feature_states.append(feature_state)

return (
_export_entities(
_EntityExportConfig(
Feature,
Q(project__organisation__id=organisation_id),
exclude_fields=["owners", "group_owners"],
),
_EntityExportConfig(
EnvironmentFeatureVersion,
Q(feature__project__organisation__id=organisation_id),
exclude_fields=["created_by", "published_by"],
),
_EntityExportConfig(
MultivariateFeatureOption,
Q(feature__project__organisation__id=organisation_id),
),
_EntityExportConfig(
FeatureSegment,
Q(feature__project__organisation__id=organisation_id),
),
)
+ feature_states # feature states need to be imported in correct order
+ _export_entities(
_EntityExportConfig(
FeatureStateValue,
Q(feature_state__feature__project__organisation__id=organisation_id),
),
_EntityExportConfig(
MultivariateFeatureStateValue,
Q(feature_state__feature__project__organisation__id=organisation_id),
),
)
yield from _export_entities(
_EntityExportConfig(
Feature,
Q(project__organisation__id=organisation_id),
exclude_fields=["owners", "group_owners"],
),
_EntityExportConfig(
EnvironmentFeatureVersion,
Q(feature__project__organisation__id=organisation_id),
exclude_fields=["created_by", "published_by"],
),
_EntityExportConfig(
MultivariateFeatureOption,
Q(feature__project__organisation__id=organisation_id),
),
_EntityExportConfig(
FeatureSegment,
Q(feature__project__organisation__id=organisation_id),
),
)

# Feature states need to be imported in correct order (after features)
yield from feature_states

yield from _export_entities(
_EntityExportConfig(
FeatureStateValue,
Q(feature_state__feature__project__organisation__id=organisation_id),
),
_EntityExportConfig(
MultivariateFeatureStateValue,
Q(feature_state__feature__project__organisation__id=organisation_id),
),
)


Expand All @@ -284,19 +302,17 @@ class _EntityExportConfig:

def _export_entities(
*export_configs: _EntityExportConfig,
) -> typing.List[dict]: # type: ignore[type-arg]
entities = []
) -> Iterator[dict[str, typing.Any]]:
for config in export_configs:
args = ("python", config.model_class.objects.filter(config.qs_filter))
kwargs = {}
kwargs: dict[str, typing.Any] = {}
if config.exclude_fields:
kwargs["fields"] = [
f.name
for f in config.model_class._meta.get_fields()
if f.name not in config.exclude_fields
]
entities.extend(_serialize_natural(*args, **kwargs)) # type: ignore[arg-type]
return entities
yield from _serialize_natural(*args, **kwargs)


_serialize_natural = functools.partial(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Command(BaseCommand):
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
super().__init__(*args, **kwargs)
self.exporter = S3OrganisationExporter() # type: ignore[no-untyped-call]
self.exporter = S3OrganisationExporter()

def add_arguments(self, parser: CommandParser): # type: ignore[no-untyped-def]
parser.add_argument(
Expand Down
Loading
Loading