-
-
Notifications
You must be signed in to change notification settings - Fork 188
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
308 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
# SPDX-License-Identifier: LGPL-2.1-or-later | ||
# | ||
# This file is formatted with Python Black | ||
|
||
from tests.templates import init_template_logger | ||
import dbus.service | ||
import dbus | ||
import tempfile | ||
|
||
from gi.repository import GLib | ||
|
||
BUS_NAME = "org.freedesktop.impl.portal.Test" | ||
MAIN_OBJ = "/org/freedesktop/portal/desktop" | ||
SYSTEM_BUS = False | ||
MAIN_IFACE = "org.freedesktop.impl.portal.Clipboard" | ||
VERSION = 1 | ||
|
||
logger = init_template_logger(__name__) | ||
|
||
|
||
def load(mock, parameters=None): | ||
logger.debug(f"Loading parameters: {parameters}") | ||
|
||
mock.delay: int = parameters.get("delay", 200) | ||
mock.response: int = parameters.get("response", 0) | ||
mock.expect_close: bool = parameters.get("expect-close", False) | ||
|
||
mock.AddProperties( | ||
MAIN_IFACE, | ||
dbus.Dictionary( | ||
{ | ||
"version": dbus.UInt32(parameters.get("version", VERSION)), | ||
} | ||
), | ||
) | ||
|
||
|
||
@dbus.service.method( | ||
MAIN_IFACE, | ||
in_signature="oa{sv}", | ||
out_signature="", | ||
async_callbacks=("cb_success", "cb_error"), | ||
) | ||
def RequestClipboard(self, session_handle, options, cb_success, cb_error): | ||
try: | ||
logger.debug(f"RequestClipboard({session_handle}, {options})") | ||
|
||
if self.expect_close: | ||
cb_success() | ||
else: | ||
logger.debug(f"scheduling delay of {self.delay}") | ||
GLib.timeout_add(self.delay, cb_success) | ||
except Exception as e: | ||
logger.critical(e) | ||
cb_error(e) | ||
|
||
|
||
@dbus.service.method( | ||
MAIN_IFACE, | ||
in_signature="oa{sv}", | ||
out_signature="", | ||
async_callbacks=("cb_success", "cb_error"), | ||
) | ||
def SetSelection(self, session_handle, options, cb_success, cb_error): | ||
try: | ||
logger.debug(f"SetSelection({session_handle}, {options})") | ||
|
||
if self.expect_close: | ||
cb_success() | ||
else: | ||
logger.debug(f"scheduling delay of {self.delay}") | ||
GLib.timeout_add(self.delay, cb_success) | ||
except Exception as e: | ||
logger.critical(e) | ||
cb_error(e) | ||
|
||
|
||
@dbus.service.method( | ||
MAIN_IFACE, | ||
in_signature="ou", | ||
out_signature="h", | ||
async_callbacks=("cb_success", "cb_error"), | ||
) | ||
def SelectionWrite(self, session_handle, serial, cb_success, cb_error): | ||
try: | ||
logger.debug(f"SelectionWrite({session_handle}, {serial})") | ||
|
||
temp_file = tempfile.TemporaryFile() | ||
fd = dbus.types.UnixFd(temp_file.fileno()) | ||
|
||
if self.expect_close: | ||
cb_success(fd) | ||
else: | ||
|
||
def reply(): | ||
cb_success(fd) | ||
|
||
logger.debug(f"scheduling delay of {self.delay}") | ||
GLib.timeout_add(self.delay, reply) | ||
except Exception as e: | ||
logger.critical(e) | ||
cb_error(e) | ||
|
||
|
||
@dbus.service.method( | ||
MAIN_IFACE, | ||
in_signature="oub", | ||
out_signature="", | ||
async_callbacks=("cb_success", "cb_error"), | ||
) | ||
def SelectionWriteDone(self, session_handle, serial, success, cb_success, cb_error): | ||
try: | ||
logger.debug(f"SelectionWriteDone({session_handle}, {serial}, {success})") | ||
|
||
if self.expect_close: | ||
cb_success() | ||
else: | ||
logger.debug(f"scheduling delay of {self.delay}") | ||
GLib.timeout_add(self.delay, cb_success) | ||
except Exception as e: | ||
logger.critical(e) | ||
cb_error(e) | ||
|
||
|
||
@dbus.service.method( | ||
MAIN_IFACE, | ||
in_signature="os", | ||
out_signature="h", | ||
async_callbacks=("cb_success", "cb_error"), | ||
) | ||
def SelectionRead(self, session_handle, mime_type, cb_success, cb_error): | ||
try: | ||
logger.debug(f"SelectionRead({session_handle}, {mime_type})") | ||
|
||
temp_file = tempfile.TemporaryFile() | ||
fd = dbus.types.UnixFd(temp_file.fileno()) | ||
|
||
if self.expect_close: | ||
cb_success(fd) | ||
else: | ||
|
||
def reply(): | ||
cb_success(fd) | ||
|
||
logger.debug(f"scheduling delay of {self.delay}") | ||
GLib.timeout_add(self.delay, reply) | ||
except Exception as e: | ||
logger.critical(e) | ||
cb_error(e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# SPDX-License-Identifier: LGPL-2.1-or-later | ||
# | ||
# This file is formatted with Python Black | ||
|
||
from logging import debug | ||
from tests import Request, PortalTest, Session | ||
from gi.repository import GLib | ||
import dbus | ||
import os | ||
|
||
|
||
class TestClipboard(PortalTest): | ||
def test_version(self): | ||
self.check_version(1) | ||
|
||
def start_session(self, params={}): | ||
self.start_impl_portal(params=params) | ||
self.add_template(portal="RemoteDesktop", params=params) | ||
self.start_xdp() | ||
|
||
remote_desktop_interface = self.get_dbus_interface("RemoteDesktop") | ||
clipboard_interface = self.get_dbus_interface() | ||
|
||
create_session_request = Request(self.dbus_con, remote_desktop_interface) | ||
create_session_response = create_session_request.call( | ||
"CreateSession", options={"session_handle_token": "1234"} | ||
) | ||
assert create_session_response.response == 0 | ||
assert str(create_session_response.results["session_handle"]) | ||
|
||
session = Session.from_response(self.dbus_con, create_session_response) | ||
|
||
clipboard_interface.RequestClipboard(session.handle, {}) | ||
|
||
start_session_request = Request(self.dbus_con, remote_desktop_interface) | ||
start_session_response = start_session_request.call( | ||
"Start", session_handle=session.handle, parent_window="", options={} | ||
) | ||
|
||
assert start_session_response.response == 0 | ||
|
||
return (session, start_session_response.results.get("clipboard_enabled")) | ||
|
||
def test_request_clipboard_and_start_session(self): | ||
params = {"force-clipboard-enabled": True} | ||
_, clipboard_enabled = self.start_session(params) | ||
|
||
assert clipboard_enabled | ||
|
||
def test_clipboard_checks_clipboard_enabled(self): | ||
session, clipboard_enabled = self.start_session() | ||
clipboard_interface = self.get_dbus_interface() | ||
|
||
self.assertFalse(clipboard_enabled) | ||
|
||
with self.assertRaises(dbus.exceptions.DBusException): | ||
clipboard_interface.SetSelection(session.handle, {}) | ||
|
||
def test_clipboard_set_selection(self): | ||
params = {"force-clipboard-enabled": True} | ||
session, _ = self.start_session(params) | ||
clipboard_interface = self.get_dbus_interface() | ||
|
||
clipboard_interface.SetSelection(session.handle, {}) | ||
|
||
def test_clipboard_selection_write(self): | ||
params = {"force-clipboard-enabled": True} | ||
session, _ = self.start_session(params) | ||
clipboard_interface = self.get_dbus_interface() | ||
|
||
fd_object: dbus.types.UnixFd = clipboard_interface.SelectionWrite( | ||
session.handle, 1234 | ||
) | ||
assert fd_object | ||
|
||
fd = fd_object.take() | ||
assert fd | ||
|
||
bytes_written = os.write(fd, b"Clipboard") | ||
assert bytes_written > 0 | ||
|
||
clipboard_interface.SelectionWriteDone(session.handle, 1234, True) | ||
|
||
def test_clipboard_selection_read(self): | ||
params = {"force-clipboard-enabled": True} | ||
session, _ = self.start_session(params) | ||
clipboard_interface = self.get_dbus_interface() | ||
|
||
fd_object: dbus.types.UnixFd = clipboard_interface.SelectionRead( | ||
session.handle, "mimetype" | ||
) | ||
assert fd_object | ||
|
||
fd = fd_object.take() | ||
assert fd | ||
|
||
clipboard = os.read(fd, 1000) | ||
assert str(clipboard) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters