Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion invokeai/app/invocations/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
WithBoard,
WithMetadata,
)
from invokeai.app.invocations.primitives import ImageOutput
from invokeai.app.invocations.primitives import ImageOutput, StringOutput
from invokeai.app.services.image_records.image_records_common import ImageCategory
from invokeai.app.services.shared.invocation_context import InvocationContext
from invokeai.app.util.misc import SEED_MAX
Expand Down Expand Up @@ -581,6 +581,25 @@ def invoke(self, context: InvocationContext) -> ImageOutput:
return ImageOutput.build(image_dto)


@invocation(
"decode_watermark",
title="Decode Invisible Watermark",
tags=["image", "watermark"],
category="image",
version="1.0.0",
)
class DecodeInvisibleWatermarkInvocation(BaseInvocation):
"""Decode an invisible watermark from an image."""

image: ImageField = InputField(description="The image to decode the watermark from")
length: int = InputField(default=8, description="The expected watermark length in bytes")

def invoke(self, context: InvocationContext) -> StringOutput:
image = context.images.get_pil(self.image.image_name)
watermark = InvisibleWatermark.decode_watermark(image, self.length)
return StringOutput(value=watermark)


@invocation(
"mask_edge",
title="Mask Edge",
Expand Down
24 changes: 23 additions & 1 deletion invokeai/backend/image_util/invisible_watermark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from PIL import Image

import invokeai.backend.util.logging as logger
from invokeai.backend.image_util.imwatermark.vendor import WatermarkEncoder
from invokeai.backend.image_util.imwatermark.vendor import WatermarkDecoder, WatermarkEncoder


class InvisibleWatermark:
Expand All @@ -25,3 +25,25 @@ def add_watermark(cls, image: Image.Image, watermark_text: str) -> Image.Image:
encoder.set_watermark("bytes", watermark_text.encode("utf-8"))
bgr_encoded = encoder.encode(bgr, "dwtDct")
return Image.fromarray(cv2.cvtColor(bgr_encoded, cv2.COLOR_BGR2RGB)).convert("RGBA")

@classmethod
def decode_watermark(cls, image: Image.Image, length: int = 8) -> str:
"""Attempt to decode an invisible watermark from an image.

Args:
image: The PIL Image to decode the watermark from.
length: The expected watermark length in bytes. Must match the length used when encoding.
The WatermarkDecoder requires the length in bits; this value is multiplied by 8 internally.

Returns:
The decoded watermark text, or an empty string if no watermark is detected or decoding fails.
"""
logger.debug("Attempting to decode invisible watermark")
try:
bgr = cv2.cvtColor(np.array(image.convert("RGB")), cv2.COLOR_RGB2BGR)
decoder = WatermarkDecoder("bytes", length * 8)
watermark_bytes = decoder.decode(bgr, "dwtDct")
return watermark_bytes.decode("utf-8", errors="ignore").rstrip("\x00")
except Exception:
logger.debug("Failed to decode invisible watermark")
return ""
Loading
Loading