Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ClipCascade_Desktop/src/clipboard/clipboard_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,11 @@ def paste(self, payload: any, payload_type: str = "text"):
pb_image = pasteboard.Pasteboard()
pb_image.set_contents(tiff_data, pasteboard.TIFF)
elif PLATFORM.startswith(LINUX):
# Save the image to a binary buffer in PNG format
with io.BytesIO() as output:
payload.convert("RGB").save(output, format="PNG")
png_data = output.getvalue()

clipboard_monitor.enable_block_image_once() # Block image copy to prevent deadlock
clipboard_monitor.enable_block_image_once()
if XMODE and self.is_x_clipboard_owner:
ClipboardManager.execute_command(
"xclip",
Expand Down
244 changes: 236 additions & 8 deletions ClipCascade_Desktop/src/clipboard/clipboard_monitor_linux.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import logging
import os
import re
import subprocess
import threading
import time

from core.constants import *

try:
from pywayland.protocol.ext_data_control_v1.ext_data_control_manager_v1 import (
ExtDataControlManagerV1,
)
from pywayland.protocol.wayland.wl_seat import WlSeat
except (ImportError, ModuleNotFoundError) as e:
logging.warning(
f"pywayland protocol bindings not available: {e}. Falling back to polling mode."
)
ExtDataControlManagerV1 = None
WlSeat = None

_callback_update = None
_clipboard_thread = None

Expand All @@ -15,6 +28,199 @@
_run_poll = threading.Event()


class _WaylandClipboardMonitor:
def __init__(
self,
enable_image_monitoring=False,
enable_file_monitoring=False,
):
self.enable_image_monitoring = enable_image_monitoring
self.enable_file_monitoring = enable_file_monitoring
self.display = None
self.data_control_manager = None
self.data_device = None
self.seat = None
self.current_offer = None
self.current_offer_mime_types = []
self.running = False
self.previous_clipboard = None

def _handle_data_offer(self, offer):
self.current_offer_mime_types = []

def on_offer(offer_obj, mime_type):
self.current_offer_mime_types.append(mime_type)

offer.dispatcher["offer"] = on_offer

def _handle_selection(self, offer):
global _block_image_once

if self.current_offer is not None:
try:
self.current_offer.destroy()
except Exception:
pass

self.current_offer = offer

if offer is None:
return

mime_types = self.current_offer_mime_types.copy()
type_ = convert_mime_to_generic_type(mime_types)

if type_ == "text":
data = self._receive_data(offer, "text/plain;charset=utf-8")
if data is None:
data = self._receive_data(offer, "text/plain")
if data:
text = data.decode("utf-8")
if text and text != self.previous_clipboard:
self.previous_clipboard = text
if _callback_update:
_callback_update("text", text)

elif type_ == "image" and self.enable_image_monitoring:
png_mime = next((m for m in mime_types if m.startswith("image/png")), None)
if png_mime:
data = self._receive_data(offer, png_mime)
else:
data = self._receive_data(offer, mime_types[0])

if data and data != self.previous_clipboard:
self.previous_clipboard = data
if _callback_update and not _block_image_once:
_callback_update("image", data)
else:
_block_image_once = False

elif type_ == "files" and self.enable_file_monitoring:
data = self._receive_data(offer, "text/uri-list")
if data:
files = data.decode("utf-8")
files = files.replace("\r\n", "\n").replace("\r", "\n").split("\n")
files = [f.strip() for f in files if f.strip()]
if files and files != self.previous_clipboard:
self.previous_clipboard = files
if _callback_update:
_callback_update("files", files)

def _receive_data(self, offer, mime_type):
read_fd, write_fd = os.pipe()
try:
offer.receive(mime_type, write_fd)
os.close(write_fd)
self.display.roundtrip()

data = b""
while True:
chunk = os.read(read_fd, 4096)
if not chunk:
break
data += chunk
return data if data else None
except Exception as e:
logging.error(f"Failed to receive clipboard data: {e}")
return None
finally:
try:
os.close(read_fd)
except Exception:
pass

def start(self):
try:
from pywayland.client import Display

self.display = Display()
self.display.connect()

registry = self.display.get_registry()

def on_global(reg, name, interface, version):
if interface == "ext_data_control_manager_v1":
self.data_control_manager = reg.bind(
name, ExtDataControlManagerV1, version
)
elif interface == "wl_seat":
self.seat = reg.bind(name, WlSeat, version)

registry.dispatcher["global"] = on_global

self.display.roundtrip()

if self.data_control_manager is None:
logging.error(
"ext_data_control_v1 protocol not available. "
"Falling back to polling mode."
)
self.stop()
return False

if self.seat is None:
logging.error("No wl_seat available. Falling back to polling mode.")
self.stop()
return False

self.data_device = self.data_control_manager.get_data_device(self.seat)

def on_data_offer(device, offer_id):
self._handle_data_offer(offer_id)

def on_selection(device, offer):
self._handle_selection(offer)

def on_primary_selection(device, offer):
pass

self.data_device.dispatcher["data_offer"] = on_data_offer
self.data_device.dispatcher["selection"] = on_selection
self.data_device.dispatcher["primary_selection"] = on_primary_selection

self.running = True
self.display.roundtrip()

while self.running and _run_poll.is_set():
self.display.dispatch(block=True)

return True
except Exception as e:
logging.error(f"Failed to start Wayland clipboard monitor: {e}")
self.stop()
return False

def stop(self):
self.running = False
if self.current_offer is not None:
try:
self.current_offer.destroy()
except Exception:
pass
self.current_offer = None
if self.data_device is not None:
try:
self.data_device.destroy()
except Exception:
pass
self.data_device = None
if self.data_control_manager is not None:
try:
self.data_control_manager.destroy()
except Exception:
pass
self.data_control_manager = None
if self.display is not None:
try:
self.display.disconnect()
except Exception:
pass
self.display = None


_wayland_monitor = None


def _on_clipboard_changed(
clipboard, event=None, enable_image_monitoring=False, enable_file_monitoring=False
):
Expand Down Expand Up @@ -58,7 +264,7 @@ def _monitor_x_wl_clipboard(
):
global _block_image_once
last_error = None
previous_clipboard: str | bytes | None = None
previous_clipboard: str | bytes | list | None = None
ignore_patterns = [
r"target .+ not available", # xclip pattern
r"no suitable type of content copied", # wl-clipboard pattern
Expand Down Expand Up @@ -250,16 +456,29 @@ def _start_clipboard_polling(enable_image_monitoring, enable_file_monitoring):


def _runner(enable_image_monitoring=False, enable_file_monitoring=False):
global _is_gdk_running, _run_poll
global _is_gdk_running, _run_poll, _wayland_monitor
_run_poll.set()

if EXT_DATA_CONTROL_SUPPORT:
_wayland_monitor = _WaylandClipboardMonitor(
enable_image_monitoring=enable_image_monitoring,
enable_file_monitoring=enable_file_monitoring,
)
if _wayland_monitor.start():
return
_wayland_monitor = None
logging.warning(
"ext_data_control_v1 unavailable. Falling back to polling mode."
)

try:
_run_poll.set()
import gi

gi.require_version("Gtk", "3.0")
gi.require_version("Gdk", "3.0")
from gi.repository import Gtk, Gdk
from gi.repository import Gdk, Gtk

if "x11" in str(type(Gdk.Display.get_default())).lower(): # X11
if "x11" in str(type(Gdk.Display.get_default())).lower():
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.connect(
"owner-change",
Expand Down Expand Up @@ -293,9 +512,18 @@ def _start(enable_image_monitoring=False, enable_file_monitoring=False):


def stop():
global _clipboard_thread, _callback_update, _block_image_once, _run_poll, _is_gdk_running
global \
_clipboard_thread, \
_callback_update, \
_block_image_once, \
_run_poll, \
_is_gdk_running, \
_wayland_monitor
if _clipboard_thread:
if _is_gdk_running:
if _wayland_monitor is not None:
_wayland_monitor.stop()
_wayland_monitor = None
elif _is_gdk_running:
import gi

gi.require_version("Gtk", "3.0")
Expand All @@ -304,7 +532,7 @@ def stop():
Gtk.main_quit()
_is_gdk_running = False
_run_poll.clear()
_clipboard_thread.join() # Wait for the thread to finish
_clipboard_thread.join()
_clipboard_thread = None
_callback_update = None
_block_image_once = False
Expand Down
36 changes: 28 additions & 8 deletions ClipCascade_Desktop/src/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ def detect_linux_display_server():
wayland_display = os.environ.get("WAYLAND_DISPLAY")
x_display = os.environ.get("DISPLAY")

# Priority detection order: X11 > XWayland > Hyprland > Wayland > Unknown

if session_type == "x11":
return "X11"

Expand All @@ -55,17 +53,39 @@ def detect_linux_display_server():
return "Unknown"


def check_ext_data_control_support():
if os.environ.get("XDG_SESSION_TYPE", "").lower().strip() != "wayland":
return False

try:
from pywayland.client import Display

display = Display()
if display:
display.disconnect()
return True
except Exception:
pass
return False


PLATFORM = get_os_and_display_server()

XMODE = False
EXT_DATA_CONTROL_SUPPORT = False

if PLATFORM.startswith(LINUX):
if (
detect_linux_display_server() == "X11"
or detect_linux_display_server() == "XWayland"
or detect_linux_display_server() == "Unknown"
):
display_server = detect_linux_display_server()
if display_server == "X11":
XMODE = True
else:
elif display_server == "XWayland":
XMODE = True
EXT_DATA_CONTROL_SUPPORT = check_ext_data_control_support()
elif display_server in ("Wayland", "Hyprland"):
XMODE = False
EXT_DATA_CONTROL_SUPPORT = check_ext_data_control_support()
else:
XMODE = True


# App version
Expand Down
Loading