diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 494fac0..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "Python: Current File", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal", - "justMyCode": false - }, - { - "name": "Python: Current File", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal", - "justMyCode": false, - "args": [ - "--package", - "usb_cam", - "--name", - "--all", - "--node", - "--model-path", - "/home/divya/ros2_ws/runner_op", - "--ws", - "/home/divya/ros2_ws", - "--repo", - "https://github.com/ros-drivers/usb_cam.git -b ros2", - "--path-to-src", - //"~/ros2_ws/src/ros_tutorials/turtlesim" - "~/ros2_ws/src/usb_cam" - ] - } - ] -} \ No newline at end of file diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/__init__.py b/rosHumble_model_extractor_scripts/ros_metamodels/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/__init__.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index ab21f3a..0000000 Binary files a/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/ros_metamodel_core.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/ros_metamodel_core.cpython-310.pyc deleted file mode 100644 index 1e9ac92..0000000 Binary files a/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/ros_metamodel_core.cpython-310.pyc and /dev/null differ diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/ros_metamodel_core.py b/rosHumble_model_extractor_scripts/ros_metamodels/ros_metamodel_core.py deleted file mode 100644 index 21b6f86..0000000 --- a/rosHumble_model_extractor_scripts/ros_metamodels/ros_metamodel_core.py +++ /dev/null @@ -1,324 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2021 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -## ROS MODEL METAMODEL ## - -class RosModel(object): - def __init__(self): - self.packages = list() - - def add_package(self, package): - self.packages.append(package) - - def dump_xtext_model(self): - ros_model_str = "PackageSet {\n" - for package in self.packages: - ros_model_str += package.dump_xtext_model() + ",\n" - ros_model_str = ros_model_str[:-2] - ros_model_str += "}" - return ros_model_str - -class Package(object): - def __init__(self, name, repo=None, pkg_type="CatkinPackage"): - self.name = name - self.artifacts=ArtifactSet() - self.repo = GitRepo(repo) - if pkg_type not in ["CatkinPackage","AmentPackage","Package"]: - print("\n ERROR: Invalid package type given, supported types are CatkinPackage(for ROS1), AmentPackage(for ROS2) or Package(for non-ROS packages)\n") - return - else: - self.pkg_type = pkg_type - - def add_artifact(self, artifact): - self.artifacts.add(artifact) - - def add_repo(self, repo): - self.repo = repo - - def dump_xtext_model(self): - ros_model_str = " "+self.pkg_type+" "+self.name+" {\n" - ros_model_str += self.repo.dump_xtext_model() - ros_model_str += self.artifacts.dump_xtext_model() - ros_model_str += "}" - return ros_model_str - -class ArtifactSet(set): - def get_list(self): - return [x.get_dict() for x in self] - - def dump_xtext_model(self): - if len(self) == 0: - return "" - str_ = "" - for elem in self: - str_ += elem.dump_xtext_model() + ",\n" - str_ = str_[:-2] - return str_ - -class Artifact(object): - def __init__(self, name, node): - self.name = name - self.node=node - - def dump_xtext_model(self): - ros_model_str = " Artifact "+self.name+" {\n" - ros_model_str += self.node.dump_xtext_model() - ros_model_str += "}" - return ros_model_str - -class GitRepo(object): - def __init__(self, repo): - self.repo=repo - - def dump_xtext_model(self): - if (self.repo != None): - ros_model_str = ' FromGitRepo "'+self.repo+'" \n' - return ros_model_str - else: - return "" - -class Node(object): - def __init__(self, name): - self.name = name - self.action_clients = InterfaceSet() - self.action_servers = InterfaceSet() - self.publishers = InterfaceSet() - self.subscribers = InterfaceSet() - self.service_clients = InterfaceSet() - self.service_servers = InterfaceSet() - self.params = ParameterSet() - - def add_publisher(self, name, topic_type): - self.publishers.add(Interface(name,topic_type)) - def add_subscriber(self, name, topic_type): - self.subscribers.add(Interface(name, topic_type)) - - def add_service_server(self, name, srv_type): - self.service_servers.add(Interface(name, srv_type)) - def add_service_client(self, name, srv_type): - self.service_clients.add(Interface(name, srv_type)) - - def add_action_client(self, name, act_type): - self.action_clients.add(Interface(name, act_type)) - def add_action_server(self, name, act_type): - self.action_servers.add(Interface(name, act_type)) - - def add_parameter(self, name, value, type, default, set_value=True): - self.params.add(Parameter(name, value, type, default, print_value=set_value)) - - def dump_xtext_model(self): - ros_model_str = " Node { name " + self.name - ros_model_str += self.service_servers.dump_xtext_model( - " ", "ServiceServer", "service", "ServiceServers") - ros_model_str += self.service_clients.dump_xtext_model( - " ", "ServiceClient", "service", "ServiceClients") - ros_model_str += self.publishers.dump_xtext_model( - " ", "Publisher", "message", "Publishers") - ros_model_str += self.subscribers.dump_xtext_model( - " ", "Subscriber", "message", "Subscribers") - ros_model_str += self.action_servers.dump_xtext_model( - " ", "ActionServer", "action", "ActionServers") - ros_model_str += self.action_clients.dump_xtext_model( - " ", "ActionClient", "action", "ActionClients") - ros_model_str += self.params.dump_xtext_model( - " ", "Parameters", "Parameters") - ros_model_str += "}\n" - return ros_model_str - -class Interface(object): - def __init__(self, name, type, namespace=""): - self.fullname = name - self.namespace = namespace - self.name = name[len(self.namespace)-1:] - self.type = type - - def dump_xtext_model(self, indent, name_type, interface_type): - return ("%s%s { name '%s' %s '%s'}") % ( - indent, name_type, self.fullname, interface_type, self.type.replace("/", ".")) - -class Parameter(object): - def __init__(self, name, value=None, type=None, default=None, namespace="", print_value=True): - self.fullname = name - self.namespace = namespace - self.name = name[len(self.namespace)-1:] - self.value = value - self.default = default - self.type = self.get_type(value, default, type) - self.count = 0 - self.print_value = print_value - - def get_type(self, value, default=None, given_type=None): - if given_type != None: - return given_type - elif value!=None: - return self.get_type_from_value(value) - elif default!=None: - return self.get_type_from_value(default) - else: - return '' - - def get_type_from_value(self, value): - param_type = type(value) - param_type = (str(param_type)).replace("<", "").replace("class", "").replace("type", "").replace(" '", "").replace("'>", "") - if param_type == 'float': - return 'Double' - elif param_type == 'bool': - return 'Boolean' - elif param_type == 'int': - return 'Integer' - elif param_type == 'str': - return 'String' - elif param_type == 'list': - if type(value[0]) == dict: - return 'Struc' - else: - return 'List' - elif param_type == 'dict': - return 'Struc' - else: - return param_type - - def set_value(self, value, indent): - str_param_value = "" - if self.type == "String": - str_param_value += "'"+value+"'" - elif self.type == "Boolean": - str_param_value += str(value).lower() - elif self.type == "List": - str_param_value += str(value).replace( - "[", "{").replace("]", "}") - elif self.type == 'Struc': - str_param_value += self.value_struct(value[0], indent+" ") - else: - str_param_value += str(value) - return str_param_value - - def types_struct(self, struct_dict, indent): - str_param = "{\n" - indent_new = indent+" " - for struct_element in struct_dict: - sub_name = struct_element - sub_value = struct_dict[struct_element] - sub_type = self.get_type(sub_value) - str_param += "%s'%s' %s" % (indent_new, sub_name, sub_type) - if sub_type == 'List': - str_param += self.form_list(sub_value) - if isinstance(sub_value, dict): - str_param += self.types_struct( - struct_dict[struct_element], indent_new) - str_param += ",\n" - str_param = str_param[:-2] - str_param += "}" - indent_new = "" - return str_param - - def value_struct(self, struct_dict, indent): - str_param = "{\n" - indent_new = indent+" " - for struct_element in struct_dict: - sub_name = struct_element - sub_value = struct_dict[struct_element] - sub_type = self.get_type(sub_value) - str_param += "%s{ '%s' { value " % (indent_new, sub_name) - if sub_type == "String": - sub_value = "'"+sub_value+"'" - if sub_type == 'List': - sub_value = str(sub_value).replace( - "[", "{").replace("]", "}").replace("{{", "{").replace("}}", "}") - if sub_type == "Boolean": - sub_value = str(sub_value).lower() - if isinstance(sub_value, dict): - str_param += self.value_struct( - struct_dict[struct_element], indent_new) - self.count = self.count + 1 - else: - str_param += "%s}}" % (sub_value) - str_param += ",\n" - str_param = str_param[:-2] - str_param += "}" - if self.count == 1: - str_param += "}}" - self.count = self.count - 1 - indent_new = "" - return str_param - - def form_list(self, value_in): - str_param = "{" - for i in value_in: - str_param += self.get_type(i) - if self.get_type(i) == "List": - str_param += self.form_list(i) - str_param += "," - str_param = str_param[:-1] - str_param += "}" - return str_param - - def get_dict(self): - return {"Value": self.value, "Fullname": self.fullname, - "Namespace": self.namespace, "Name": self.name} - - def dump_xtext_model(self, indent="", value=""): - str_param = "%sParameter { name '%s' type %s " % ( - indent, self.fullname, self.type) - if self.type == 'Struct': - str_param += self.types_struct(self.value[0], indent) - #str_param = str_param[:-2] - if self.type == 'List': - if self.value: - str_param += self.form_list(self.value) - elif self.default: - str_param += self.form_list(self.default) - if self.default and self.print_value: - str_param += ' default ' + self.set_value(self.default, indent) - if self.value and self.print_value: - str_param += 'value ' + self.set_value(self.value, indent) - str_param += "}" - return str_param - -class InterfaceSet(set): - def get_list(self): - return [x.get_dict() for x in self] - - def dump_xtext_model(self, indent="", name_type="", interface_type="", name_block=""): - if len(self) == 0: - return "" - str_ = ("\n%s%s {\n") % (indent, name_block) - for elem in self: - str_ += elem.dump_xtext_model(indent+" ", name_type, interface_type) + ",\n" - str_ = str_[:-2] - str_ += "}" - return str_ - -class ParameterSet(set): - def get_list(self): - return [x.get_dict() for x in self] - - def iteritems(self): - return [(x.fullname, x.type) for x in self] - - def iterkeys(self): - return [x.fullname for x in self] - - def dump_xtext_model(self, indent="", value="", name_block=""): - if len(self) == 0: - return "" - str_ = ("\n%s%s {\n") % (indent, name_block) - for elem in self: - str_ += elem.dump_xtext_model(indent+" ", value) + ",\n" - str_ = str_[:-2] - str_ += "}" - return str_ - diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/rossystem_metamodel_core.py b/rosHumble_model_extractor_scripts/ros_metamodels/rossystem_metamodel_core.py deleted file mode 100644 index 41d38ae..0000000 --- a/rosHumble_model_extractor_scripts/ros_metamodels/rossystem_metamodel_core.py +++ /dev/null @@ -1,219 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ros_metamodels.ros_metamodel_core import ParameterSet, Parameter - -## ROS SYSTEM METAMODEL ## -class RosSystem(object): - def __init__(self, name, package): - self.name = name - self.package = package - self.components = ComponentSet() - self.params = ParameterSet() - - def dump_xtext_model(self): - system_model_str = "RosSystem { Name '%s'\n" % self.name - system_model_str += " RosComponents ( \n" - system_model_str += self.components.dump_xtext_model(self.package) - system_model_str = system_model_str[:-2] - system_model_str += "}\n)" - system_model_str += self.params.dump_xtext_model( - " ", "Parameters", "Parameters") - system_model_str += "\n}" - - return system_model_str - - -class Component(object): - def __init__(self, name): - self.name = name - self.action_clients = RosInterfaceSet() - self.action_servers = RosInterfaceSet() - self.publishers = RosInterfaceSet() - self.subscribers = RosInterfaceSet() - self.service_clients = RosInterfaceSet() - self.service_servers = RosInterfaceSet() - self.params = RosParameterSet() - - def dump_xtext_model(self, package=""): - system_model_str = " ComponentInterface { name '" + \ - self.name+"'\n" - system_model_str += self.publishers.dump_xtext_model( - " ", "Publishers", "Publisher", self.name, package) - system_model_str += self.subscribers.dump_xtext_model( - " ", "Subscribers", "Subscriber", self.name, package) - system_model_str += self.service_servers.dump_xtext_model( - " ", "SrvServers", "Server", self.name, package, "ServiceServer") - system_model_str += self.action_servers.dump_xtext_model( - " ", "ActionServers", "Server", self.name, package, "ActionServer") - system_model_str += self.action_clients.dump_xtext_model( - " ", "ActionClients", "Client", self.name, package, "ActionClient") - system_model_str += self.params.dump_xtext_model( - " ", "Parameters", "Parameter", self.name, package) - system_model_str += " },\n" - return system_model_str - -class RosInterface(object): - def __init__(self, name, reference, namespace=""): - self.resolved = name - self.namespace = namespace - self.minimal = name[len(self.namespace)-1:] - self.reference = reference - - def dump_xtext_model(self, indent, name_type, interface_type): - return ("%s%s { name '%s' %s '%s'}") % ( - indent, name_type, self.resolved, interface_type, self.reference.replace("/", ".")) - -class RosParameter(object): - def __init__(self, name, reference, value, namespace=""): - self.resolved = name - self.namespace = namespace - self.minimal = name[len(self.namespace)-1:] - self.value = value - self.reference = reference - self.count = 0 - - def get_type(self, value): - itype = type(value) - itype = (str(itype)).replace("<", "").replace("class", "").replace("type", "").replace(" '", "").replace("'>", "") - if itype == 'float': - return 'Double' - elif itype == 'bool': - return 'Boolean' - elif itype == 'int': - return 'Integer' - elif itype == 'str': - return 'String' - elif itype == 'list': - if type(value[0]) == dict: - return 'Struc' - else: - return 'List' - elif itype == 'dict': - return 'Struc' - else: - return itype - - def set_value(self, value, indent): - itype = self.get_type(value) - str_param_value = "" - if itype == "String": - str_param_value += "'"+self.value+"'" - elif itype == "Boolean": - str_param_value += str(self.value).lower() - elif itype == "List": - str_param_value += str(self.value).replace( - "[", "{").replace("]", "}") - elif itype == 'Struc': - str_param_value += self.value_struct(self.value[0], indent+" ") - else: - str_param_value += str(value) - return str_param_value - - def value_struct(self, struct_dict, indent): - str_param = "{\n" - indent_new = indent+" " - for struct_element in struct_dict: - sub_name = struct_element - sub_value = struct_dict[struct_element] - sub_type = self.get_type(sub_value) - str_param += "%s{ '%s' { value " % (indent_new, sub_name) - if sub_type == "String": - sub_value = "'"+sub_value+"'" - if sub_type == 'List': - sub_value = str(sub_value).replace( - "[", "{").replace("]", "}").replace("{{", "{").replace("}}", "}") - if sub_type == "Boolean": - sub_value = str(sub_value).lower() - if isinstance(sub_value, dict): - str_param += self.value_struct( - struct_dict[struct_element], indent_new) - self.count = self.count + 1 - else: - str_param += "%s}}" % (sub_value) - str_param += ",\n" - str_param = str_param[:-2] - str_param += "}" - if self.count == 1: - str_param += "}}" - self.count = self.count - 1 - indent_new = "" - return str_param - -class ComponentSet(set): - def get_list(self): - return [x.get_dict() for x in self] - - def iteritems(self): - return [(x.resolved, x.itype) for x in self] - - def iterkeys(self): - return [x.resolved for x in self] - - def dump_xtext_model(self, package="", indent="", value="", name_block=""): - if len(self) == 0: - return "" - str_ = ("\n%s%s") % (indent, name_block) - for elem in self: - str_ += elem.dump_xtext_model(package) + "\n" - str_ = str_[:-2] - return str_ - -class RosInterfaceSet(set): - def get_list(self): - return [x.get_dict() for x in self] - - def iteritems(self): - return [(x.resolved, x.reference) for x in self] - - def iterkeys(self): - return [x.resolved for x in self] - - def dump_xtext_model(self, indent="", name_type="", name_type2="", node_name="", pkg_name="", name_type3=""): - if len(self) == 0: - return "" - if not name_type3: - name_type3 = name_type2 - str_ = ("%sRos%s {\n") % (indent, name_type) - for elem in self: - str_ += ("%s Ros%s '%s' {Ref%s '%s.%s.%s.%s'},\n") % ( - indent, name_type3, elem.resolved, name_type2, pkg_name, node_name, node_name, elem.resolved) - str_ = str_[:-2] - str_ += "}\n" - return str_ - -class RosParameterSet(set): - def get_list(self): - return [x.get_dict() for x in self] - - def iteritems(self): - return [(x.resolved, x.reference) for x in self] - - def iterkeys(self): - return [x.resolved for x in self] - - def dump_xtext_model(self, indent="", name_type="", name_type2="", node_name="", pkg_name="", name_type3=""): - if len(self) == 0: - return "" - if not name_type3: - name_type3 = name_type2 - str_ = ("%sRos%s {\n") % (indent, name_type) - for elem in self: - str_ += ("%s Ros%s '%s' {Ref%s '%s.%s.%s.%s' value %s},\n") % ( - indent, name_type3, elem.resolved, name_type2, pkg_name, node_name, node_name, elem.resolved, elem.set_value(elem.value, indent)) - str_ = str_[:-2] - str_ += "}\n" - return str_ diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/__init__.py b/rosHumble_model_extractor_scripts/ros_model_generator/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/__init__.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 0424860..0000000 Binary files a/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/rosmodel_generator.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/rosmodel_generator.cpython-310.pyc deleted file mode 100644 index 02d7277..0000000 Binary files a/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/rosmodel_generator.cpython-310.pyc and /dev/null differ diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/rosmodel_generator.py b/rosHumble_model_extractor_scripts/ros_model_generator/rosmodel_generator.py deleted file mode 100644 index 3a00d78..0000000 --- a/rosHumble_model_extractor_scripts/ros_model_generator/rosmodel_generator.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pprint -from pyparsing import * -import ros_metamodels.ros_metamodel_core as model - -class RosModelGenerator(object): - def __init__(self): - self.ros_model = model.RosModel() - - def create_model_from_node(self, package_name, artifact_name, node, repo=None, pkg_type="CatkinPackage"): - package = model.Package(package_name, repo, pkg_type) - gitrepo=model.GitRepo(repo) - artifact = model.Artifact(artifact_name, node) - package.add_artifact(artifact) - package.add_repo(gitrepo) - self.ros_model.add_package(package) - - def generate_ros_model(self, ros_model_file): - sucess, ros_model_str = self.create_ros_model() - with open(ros_model_file, 'w') as outfile: - outfile.write(ros_model_str) - - def generate_ros_model_list(self, components, ros_model_file): - sucess, ros_model_str = self.create_ros_model_list(components) - with open(ros_model_file, 'w') as outfile: - outfile.write(ros_model_str) - - def generate_ros_model_from_system(self, rossystem, package, ros_model_file, print_param_value=True): - sucess, ros_model_str = self.create_ros_model_from_system(package, rossystem, print_param_value) - with open(ros_model_file, 'w') as outfile: - outfile.write(ros_model_str) - - def create_ros_model(self): - ros_model_str = self.ros_model.dump_xtext_model() - return True, ros_model_str - - def create_ros_model_from_system(self, package_name, rossystem, print_param_value=True): - package = model.Package(package_name) - for component in rossystem.components: - node = model.Node(component.name) - - for param in component.params: - node.add_parameter(param.resolved, None, None, param.value, print_param_value) - - for pub, pub_type in component.publishers.iteritems(): - node.add_publisher(pub, pub_type) - - for sub, sub_type in component.subscribers.iteritems(): - node.add_subscriber(sub, sub_type) - - for serv, serv_type in component.service_servers.iteritems(): - node.add_service_server(serv, serv_type) - - for serv, serv_type in component.service_clients.iteritems(): - node.add_service_client(serv, serv_type) - - for action, action_type in component.action_clients.iteritems(): - node.add_action_client(action, action_type) - - for action, action_type in component.action_servers.iteritems(): - node.add_action_server(action, action_type) - - artifact = model.Artifact(node.name, node) - package.add_artifact(artifact) - - self.ros_model.add_package(package) - return self.create_ros_model() - - def create_ros_model_list(self, components): - for name in components: - if name == 'global_parameters': - continue - node = model.Node(name) - - if 'parameters' in components[name]: - parameters = components[name]['parameters'] - for param_name, param in parameters.items(): - node.add_parameter(param_name, None, None, param[0]) - - if 'publishers' in components[name]: - publishers = components[name]['publishers'] - for pub, pub_type in publishers.items(): - node.add_publisher(pub, pub_type) - - if 'subscribers' in components[name]: - subscribers = components[name]['subscribers'] - for sub, sub_type in subscribers.items(): - node.add_subscriber(sub, sub_type) - - if 'service_servers' in components[name]: - service_servers = components[name]['service_servers'] - for serv, serv_type in service_servers.items(): - node.add_service_server(serv, serv_type) - - if 'service_clients' in components[name]: - service_clients = components[name]['service_clients'] - for serv, serv_type in service_clients.items(): - node.add_service_client(serv, serv_type) - - if 'action_clients' in components[name]: - action_clients = components[name]['action_clients'] - for action, action_type in action_clients.items(): - node.add_action_client(action, action_type) - - if 'action_servers' in components[name]: - action_servers = components[name]['action_servers'] - for action, action_type in action_servers.items(): - node.add_action_server(action, action_type) - - self.create_model_from_node('my_ros_pkg',"test",node) - - return self.create_ros_model() - - -if __name__ == "__main__": - generator = RosModelGenerator() - try: - generator.generate_ros_model("/tmp/test") - except Exception as e: - print(e.args) diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/rossystem_generator.py b/rosHumble_model_extractor_scripts/ros_model_generator/rossystem_generator.py deleted file mode 100644 index 6c4901d..0000000 --- a/rosHumble_model_extractor_scripts/ros_model_generator/rossystem_generator.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pprint -from pyparsing import * -import ros_metamodels.ros_metamodel_core as model -import ros_metamodels.rossystem_metamodel_core as system_model -import ros_model_generator.rosmodel_generator as model_generator - - -class RosSystemModelGenerator(object): - def __init__(self,name="", package=""): - self.system = system_model.RosSystem(name, package); - - def setSystemName(self, name): - self.system.name = name; - - def addParameter(self, name, value): - self.system.params.add(model.Parameter(name, value)) - - def addComponent(self, name): - self.system.components.add(system_model.Component(name)) - - def addComponent(self, component): - self.system.components.add(component) - - def dump_ros_system_model(self, rosystem_model_file): - sucess, ros_system_model_str = self.create_ros_system_model() - with open(rosystem_model_file, 'w') as outfile: - outfile.write(ros_system_model_str) - - def dump_ros_system_model_list(self, components, rosystem_model_file): - sucess, ros_system_model_str = self.create_ros_system_model_list(components) - with open(rosystem_model_file, 'w') as outfile: - outfile.write(ros_system_model_str) - - def create_ros_system_model(self): - ros_system_model_str = self.system.dump_xtext_model() - return True, ros_system_model_str - - def create_ros_system_model_list(self, components): - for name in components: - if name == 'global_parameters': - continue - component = system_model.Component(name) - - if 'parameters' in components[name]: - parameters = components[name]['parameters'] - for param_name, param in parameters.items(): - component.params.add(system_model.RosParameter(param_name, param[1], param[0])) - - if 'publishers' in components[name]: - publishers = components[name]['publishers'] - for pub, pub_type in publishers.items(): - component.publishers.add(system_model.RosInterface(pub, pub_type)) - - if 'subscribers' in components[name]: - subscribers = components[name]['subscribers'] - for sub, sub_type in subscribers.items(): - component.subscribers.add(system_model.RosInterface(sub, sub_type)) - - if 'service_servers' in components[name]: - service_servers = components[name]['service_servers'] - for serv, serv_type in service_servers.items(): - component.service_servers.add(system_model.RosInterface(serv, serv_type)) - - if 'service_clients' in components[name]: - service_clients = components[name]['service_clients'] - for serv, serv_type in service_clients.items(): - component.service_clients.add(system_model.RosInterface(serv, serv_type)) - - if 'action_clients' in components[name]: - action_clients = components[name]['action_clients'] - for action, action_type in action_clients.items(): - component.action_clients.add(system_model.RosInterface(action, action_type)) - - if 'action_servers' in components[name]: - action_servers = components[name]['action_servers'] - for action, action_type in action_servers.items(): - component.action_servers.add(system_model.RosInterface(action, action_type)) - - self.addComponent(component) - - if 'global_parameters' in components: - parameters = components['global_parameters'] - for name, param in parameters.items(): - self.addParameter(name, param[0]) - - return self.create_ros_system_model() - - def generate_ros_system_model(self, ros_system_model_file): - sucess, ros_system_model_str = self.create_ros_system_model() - with open(ros_system_model_file, 'w') as outfile: - outfile.write(ros_system_model_str) - - def generate_ros_system_model_list(self, components, ros_system_model_file, ros_model_file="", print_param_value=True): - sucess, ros_system_model_str = self.create_ros_system_model_list(components) - with open(ros_system_model_file, 'w') as outfile: - outfile.write(ros_system_model_str) - - if ros_model_file: - rosmodel_generator = model_generator.RosModelGenerator() - rosmodel_generator.generate_ros_model_from_system(self.system, self.system.package, ros_model_file, print_param_value) - - -if __name__ == "__main__": - generator = RosSystemModelGenerator() - try: - generator.dump_ros_system_model("/tmp/test") - except Exception as e: - print(e.args) - diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/__init__.py b/rosHumble_model_extractor_scripts/ros_model_parser/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/model_comparator.py b/rosHumble_model_extractor_scripts/ros_model_parser/model_comparator.py deleted file mode 100644 index 13a15e2..0000000 --- a/rosHumble_model_extractor_scripts/ros_model_parser/model_comparator.py +++ /dev/null @@ -1,89 +0,0 @@ -from pyparsing import * - - -def strip_slash(string): - return '{}'.format(string[1:] if string.startswith('/') else string) - -# find out missing and additional interfaces -# if both lists are empty, system is running fine -def compare_rossystem_models(model_ref, model_current): - # not sure of the performance of this method - set_ref = set((strip_slash(x.interface_name[0])) - for x in model_ref.interfaces) - set_current = set((strip_slash(x.interface_name[0])) - for x in model_current.interfaces) - - # similarly for all interfaces within the node? - # or only for topic connections? - # does LED's code capture topic connections? - ref_params = dict() - for interface in model_ref.interfaces: - for param in interface.parameters: - key = strip_slash(param.param_name[0]) - ref_params[key] = [param.param_value[0], - interface.interface_name[0]] - - current_params = dict() - for interface in model_current.interfaces: - for param in interface.parameters: - key = strip_slash(param.param_name[0]) - current_params[key] = [ - param.param_value[0], interface.interface_name[0]] - - incorrect_params = dict() - for key, value in ref_params.items(): - try: - current_value = current_params[key][0] - ref_value = ref_params[key][0] - - if (type(current_value) is ParseResults) & (type(ref_value) is ParseResults): - current_value = current_value.asList() - ref_value = ref_value.asList() - if (type(current_value) is str) & (type(ref_value) is str): - current_value = re.sub( - r"[\n\t\s]*", "", strip_slash(current_value)) - ref_value = re.sub( - r"[\n\t\s]*", "", strip_slash(ref_value)) - isEqual = current_value == ref_value - if not isEqual: - incorrect_params.setdefault(current_params[key][1], []) - incorrect_params[current_params[key] - [1]].append([key, current_value]) - except Exception as exc: - pass - - # returning missing_interfaces, additional_interfaces - return list(set_ref - set_current), list(set_current - set_ref), incorrect_params - -def _check_valid(interface_name, interface_type): - if interface_type == '?': - return False - # add more cases here if required - - return True - -def extract_common_ros(model_ref, model_current): - node_ref = list(model_ref.packages[0].artifacts)[0].node - node_current = list(model_current.packages[0].artifacts)[0].node - - set_ref_pub = set((strip_slash(pub.name[0]), pub.type[0]) for pub in node_ref.publishers if _check_valid(pub.name[0], pub.type[0])) - set_current_pub = set((strip_slash(pub.name[0]), pub.type[0]) for pub in node_current.publishers if _check_valid(pub.name[0], pub.type[0])) - - set_ref_sub = set((strip_slash(sub.name[0]), sub.type[0]) for sub in node_ref.subscribers if _check_valid(sub.name[0], sub.type[0])) - set_current_sub = set((strip_slash(sub.name[0]), sub.type[0]) for sub in node_current.subscribers if _check_valid(sub.name[0], sub.type[0])) - - set_ref_srv = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_ref.service_servers if _check_valid(srv.name[0], srv.type[0])) - set_current_srv = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_current.service_servers if _check_valid(srv.name[0], srv.type[0])) - - set_ref_srv_cli = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_ref.service_clients if _check_valid(srv.name[0], srv.type[0])) - set_current_srv_cli = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_current.service_clients if _check_valid(srv.name[0], srv.type[0])) - - set_ref_act = set((strip_slash(act.name[0]), act.type[0]) for act in node_ref.action_servers if _check_valid(act.name[0], act.type[0])) - set_current_act = set((strip_slash(act.name[0]), act.type[0]) for act in node_current.action_servers if _check_valid(act.name[0], act.type[0])) - - set_ref_act_cli = set((strip_slash(act.name[0]), act.type[0]) for act in node_ref.action_clients if _check_valid(act.name[0], act.type[0])) - set_current_act_cli = set((strip_slash(act.name[0]), act.type[0]) for act in node_current.action_clients if _check_valid(act.name[0], act.type[0])) - - return tuple(set_ref_pub.intersection(set_current_pub)), tuple(set_ref_sub.intersection(set_current_sub)), \ - tuple(set_ref_srv.intersection(set_current_srv)), tuple(set_ref_srv_cli.intersection(set_current_srv_cli)), \ - tuple(set_ref_act.intersection(set_current_act)), tuple(set_ref_act_cli.intersection(set_current_act_cli)) diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/rosmodel_parser.py b/rosHumble_model_extractor_scripts/ros_model_parser/rosmodel_parser.py deleted file mode 100644 index 24abd8a..0000000 --- a/rosHumble_model_extractor_scripts/ros_model_parser/rosmodel_parser.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pprint -from pyparsing import * -import ros_metamodels.ros_metamodel_core as rosmodel - -# TODO: extract nodes, topics, services, etc from 'result' -# Compute Connections - - -# stateless functions -def parseActionStr(string, location, tokens): - if((len(tokens[0]) == 1) and (type(tokens[0][0]) == str)): - return tokens[0][0] - - -def parseActionDict(string, location, tokens): - dict_list = list() - for toks in tokens: - param_dict = dict() - for tok in toks: - param_dict[tok[0]] = tok[1] - dict_list.append(param_dict) - return dict_list - - -class RosModelParser(object): - def __init__(self, model, isFile=True): - # OCB = Open Curly Bracket {} - # CCB = Close Curly Bracket } - # ORB = Open Round Bracket ( - # CRB = Close Round Bracket ) - # SQ = Single Quotes ' - # OSB = Open Square Bracket [ - # CSB = Close Square Bracket ] - - OCB, CCB, ORB, CRB, SQ, OSB, CSB = map(Suppress, "{}()'[]") - name = Optional(SQ) + Word(printables, - excludeChars="{},'") + Optional(SQ) - - real = Combine(Word(nums) + '.' + Word(nums)) - - listStr = Forward() - mapStr = Forward() - param_value = Forward() - - sglQStr = QuotedString("'", multiline=True) - string_value = Dict( - Group(sglQStr + ZeroOrMore(OCB + param_value + CCB))) - - string_value.setParseAction(parseActionStr) - values = (Combine(Optional("-") + real) | Combine(Optional("-") + Word(nums))).setParseAction( - lambda tokens: float(tokens[0])) | string_value | Keyword("false") | Keyword("true") | listStr | mapStr - - _packageSet = Keyword("PackageSet").suppress() - _package = Keyword("Package").suppress() - _catkin_pkg = Keyword("CatkinPackage").suppress() - _ament_pkg = Keyword("AmentPackage").suppress() - _git_repo = Keyword("FromGitRepo").suppress() - #_artifact = Keyword("artifact").suppress() - _artifacts = Keyword("Artifact").suppress() - #_node = Keyword("node").suppress() - _nodes = Keyword("Node").suppress() - _name = CaselessKeyword("name").suppress() - - # Types - _srv_type= Keyword("service").suppress() - _topic_type= Keyword("message").suppress() - _act_type = Keyword("action").suppress() - - # Service Server - _service_svrs= Keyword("ServiceServers").suppress() - _service_svr= Keyword("ServiceServer").suppress() - service_svr = Group( _service_svr + OCB + _name + name("name") + _srv_type + name("service") + CCB) - service_svrs = (_service_svrs + OCB + OneOrMore(service_svr + Optional(",").suppress()) + CCB) - - # Service Client - _service_clis= Keyword("ServiceClients").suppress() - _service_cli= Keyword("ServiceClient").suppress() - service_cli = Group( _service_cli + OCB + _name + name("name") + _srv_type + name("service") + CCB) - service_clis = (_service_clis + OCB + OneOrMore(service_cli + Optional(",").suppress()) + CCB) - - # Action Server - _action_svrs= Keyword("ActionServers").suppress() - _action_svr= Keyword("ActionServer").suppress() - action_svr = Group( _action_svr + OCB + _name + name("name") + _act_type + name("action") + CCB) - action_svrs = (_action_svrs + OCB + OneOrMore(action_svr + Optional(",").suppress()) + CCB) - - # Action Client - _action_clis= Keyword("ActionClients").suppress() - _action_cli= Keyword("ActionClient").suppress() - action_cli = Group( _action_cli + OCB + _name + name("name") + _act_type + name("action") + CCB) - action_clis = (_action_clis + OCB + OneOrMore(action_cli + Optional(",").suppress()) + CCB) - - # Publisher - _pubs= Keyword("Publishers").suppress() - _pub= Keyword("Publisher").suppress() - pub = Group( _pub + OCB + _name + name("name") + _topic_type + name("message") + CCB) - pubs = (_pubs + OCB + OneOrMore(pub + Optional(",").suppress()) + CCB) - - # Subscriber - _subs= Keyword("Subscribers").suppress() - _sub= Keyword("Subscriber").suppress() - sub = Group( _sub + OCB + _name + name("name") + _topic_type + name("message") + CCB) - subs = (_subs + OCB + OneOrMore(sub + Optional(",").suppress()) + CCB) - - # Parameter - _params= Keyword("Parameters").suppress() - _param= Keyword("Parameter").suppress() - _type= Keyword("type").suppress() - param = Group( _sub + OCB + _name + name("name") + _type + name("type") + CCB) - params = (_params + OCB + OneOrMore(param + Optional(",").suppress()) + CCB) - - self.rospkg_grammar = _packageSet + \ - OCB + \ - ( _catkin_pkg | _ament_pkg | _package ) + name("pkg_name") + OCB + \ - Optional(_git_repo + name("git_repo")) + \ - _artifacts + name("artifact_name") + OCB + \ - _nodes + OCB + _name + name("node_name") + \ - Optional(service_svrs)("svr_servers") + \ - Optional(pubs)("publishers") + \ - Optional(subs)("subscribers") + \ - Optional(service_clis)("svr_clients") + \ - Optional(action_svrs)("act_servers") + \ - Optional(action_clis)("act_clients") + \ - Optional(params)("parameters") - - self._isFile = isFile - self._model = model - - - def _parse_from_string(self): - self._result = self.rospkg_grammar.parseString(self._model) - - def _parse_from_file(self): - self._result = self.rospkg_grammar.parseFile(self._model) - - def parse(self): - self._result = ParseResults() - try: - if self._isFile: - self._parse_from_file() - else: - self._parse_from_string() - except Exception as e: - print(e.args) # Should set a default 'result'? - ros_model = rosmodel.RosModel() - ros_node = rosmodel.Node(self._result.get("node_name")) - - if self._result.get("svr_servers") is not None: - [ros_node.add_service_server(srv_ser.get("name"), srv_ser.get("service")) for srv_ser in self._result.get("svr_servers")] - if self._result.get("svr_clients") is not None: - [ros_node.add_service_client(srv_cli.get("name"), srv_cli.get("service")) for srv_cli in self._result.get("svr_clients")] - if self._result.get("act_servers") is not None: - [ros_node.add_action_server(act_ser.get("name"), act_ser.get("action")) for act_ser in self._result.get("act_servers")] - if self._result.get("act_clients") is not None: - [ros_node.add_action_client(act_cli.get("name"), act_cli.get("action")) for act_cli in self._result.get("act_clients")] - if self._result.get("publishers") is not None: - [ros_node.add_publisher(pub.get("name"), pub.get("message")) for pub in self._result.get("publishers")] - if self._result.get("subscribers") is not None: - [ros_node.add_subscriber(sub.get("name"), sub.get("message")) for sub in self._result.get("subscribers")] - if self._result.get("parameters") is not None: - [ros_node.add_parameter(param.get("name"), param.get("type")) for param in self._result.get("parameters")] - - ros_artifact = rosmodel.Artifact(self._result.get("artifact_name"),ros_node) - ros_package = rosmodel.Package(self._result.get("pkg_name")) - ros_package.add_artifact(ros_artifact) - ros_model.add_package(ros_package) - - return ros_model - - -if __name__ == "__main__": - import os - my_path = os.path.abspath(os.path.dirname(__file__)) - path = os.path.join( - my_path, "../../resources/cob_light.ros") - print(path) - - parser = RosModelParser(path) - try: - print(parser.parse().dump()) - except Exception as e: - print(e.args) - diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/rossystem_parser.py b/rosHumble_model_extractor_scripts/ros_model_parser/rossystem_parser.py deleted file mode 100644 index 111c482..0000000 --- a/rosHumble_model_extractor_scripts/ros_model_parser/rossystem_parser.py +++ /dev/null @@ -1,233 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pprint -from pyparsing import * - -# TODO: extract nodes, topics, services, etc from 'result' -# Compute Connections - - -# stateless functions -def parseActionStr(string, location, tokens): - if((len(tokens[0]) == 1) and (type(tokens[0][0]) == str)): - return tokens[0][0] - - -def parseActionDict(string, location, tokens): - dict_list = list() - for toks in tokens: - param_dict = dict() - for tok in toks: - param_dict[tok[0]] = tok[1] - dict_list.append(param_dict) - return dict_list - - -class RosSystemModelParser(object): - def __init__(self, model, isFile=True): - # OCB = Open Curly Bracket { - # CCB = Close Curly Bracket } - # ORB = Open Round Bracket ( - # CRB = Close Round Bracket ) - # SQ = Single Quotes ' - # DQ = Double Quotes " - # OSB = Open Square Bracket [ - # CSB = Close Square Bracket ] - - - OCB, CCB, ORB, CRB, SQ, DQ, OSB, CSB = map(Suppress, "{}()'\"[]") - - name = Optional(SQ) + Optional(DQ) + Word(printables, - excludeChars="{},'") + Optional(SQ) + Optional(DQ) - - real = Combine(Word(nums) + '.' + Word(nums)) - - listStr = Forward() - mapStr = Forward() - param_value = Forward() - - sglQStr = QuotedString("'", multiline=True) - string_value = Dict( - Group(sglQStr + ZeroOrMore(OCB + param_value + CCB))) - - string_value.setParseAction(parseActionStr) - values = (Combine(Optional("-") + real) | Combine(Optional("-") + Word(nums))).setParseAction( - lambda tokens: float(tokens[0])) | string_value | Word(alphanums + "/-_.") | Keyword("false") | Keyword("true") | listStr | mapStr - - _system = Keyword("RosSystem").suppress() - _name = CaselessKeyword("name").suppress() - _component = Keyword("RosComponents").suppress() - _interface = Keyword("ComponentInterface").suppress() - - # Parameter Def - _parameters = Keyword("RosParameters").suppress() - _parameter = Keyword("RosParameter").suppress() - _ref_parameter = Keyword("RefParameter").suppress() - _value = Keyword("value").suppress() - - # Subscriber Def - _subscribers = Keyword("RosSubscribers").suppress() - _subscriber = Keyword("RosSubscriber").suppress() - _ref_subscriber = Keyword("RefSubscriber").suppress() - - # Subscriber Def - _publishers = Keyword("RosPublishers").suppress() - _publisher = Keyword("RosPublisher").suppress() - _ref_publisher = Keyword("RefPublisher").suppress() - - # ServiceServers Def - _services = Keyword("RosSrvServers").suppress() - _service = Keyword("RosServiceServer").suppress() - _ref_service = Keyword("RefServer").suppress() - - # ServiceClients Def - _srv_clients = Keyword("RosSrvClients").suppress() - _srv_client = Keyword("RosServiceClient").suppress() - _ref_srv_client = Keyword("RefClient").suppress() - - # ActionServers Def - _action_servers = Keyword("RosActionServers").suppress() - _action_server = Keyword("RosActionServer").suppress( - ) | Keyword("RosServer").suppress() - _ref_server = Keyword("RefServer").suppress() - - # Actio Clients Def - _action_clients = Keyword("RosActionClients").suppress() - _action_client = Keyword("RosActionClient").suppress( - ) | Keyword("RosClient").suppress() - _ref_action_client = Keyword("RefClient").suppress() - - # Topic Connections Def - _topic_connections = Keyword("TopicConnections").suppress() - _topic_connection = Keyword("TopicConnection").suppress() - _from = Keyword("From").suppress() - _to = Keyword("To").suppress() - - # global parameters Def - _g_parameters = Keyword("Parameters").suppress() - _g_parameter = Keyword("Parameter").suppress() - _type = Keyword("type").suppress() - _value = Keyword("value").suppress() - - listStr << delimitedList(Group(OCB + delimitedList(values) + CCB)) - mapStr << (OSB + delimitedList(Group(OCB + delimitedList((Group( - sglQuotedString.setParseAction(removeQuotes) + Suppress(":") + values))) + CCB)) + CSB) - mapStr.setParseAction(parseActionDict) - - param_value << _value + (values | listStr) - - parameter = Group(_parameter + name("param_name") + - OCB + _ref_parameter + name("param_path") + Optional(param_value("param_value")) + CCB) - parameters = (_parameters + OCB + - OneOrMore(parameter + Optional(",").suppress()) + CCB) - - subscriber = Group(_subscriber + name("sub_name") + - OCB + _ref_subscriber + name("sub_path") + CCB) - subscribers = (_subscribers + OCB + - OneOrMore(subscriber + Optional(",").suppress()) + CCB) - - publisher = Group(_publisher + name("pub_name") + - OCB + _ref_publisher + name("pub_path") + CCB) - publishers = (_publishers + OCB + - OneOrMore(publisher + Optional(",").suppress()) + CCB) - - service = Group(_service + name("srv_name") + - OCB + _ref_service + name("srv_path") + CCB) - services = (_services + OCB + - OneOrMore(service + Optional(",").suppress()) + CCB) - - srv_client = Group(_srv_client + name("srv_name") + - OCB + _ref_srv_client + name("srv_path") + CCB) - srv_clients = (_srv_clients + OCB + - OneOrMore(srv_client + Optional(",").suppress()) + CCB) - - action_server = Group(_action_server + name("action_name") + - OCB + _ref_server + name("action_path") + CCB) - action_servers = (_action_servers + OCB + - OneOrMore(action_server + Optional(",").suppress()) + CCB) - - action_client = Group(_action_client + name("action_name") + - OCB + _ref_action_client + name("action_path") + CCB) - action_clients = (_action_clients + OCB + - OneOrMore(action_client + Optional(",").suppress()) + CCB) - - topic_connection = Group(_topic_connection + name("topic_name") + - OCB + _from + ORB + name("from") + CRB + _to + - ORB + name("to") + CRB + CCB) - - topic_connections = (_topic_connections + OCB + - OneOrMore(topic_connection + Optional(",").suppress()) + CCB) - - g_parameter = Group(_g_parameter + OCB + _name + name("param_name") + - _type + name("value_type") + Optional(param_value("param_value")) + CCB) - g_parameters = (_g_parameters + OCB + - OneOrMore(g_parameter + Optional(",").suppress()) + CCB) - - interface = Group( - _interface + - OCB + - _name + name("interface_name") + - Optional(parameters)("parameters") + - Optional(publishers)("publishers") + - Optional(subscribers)("subscribers") + - Optional(services)("services") + - Optional(srv_clients)("srv_clients") + - Optional(action_servers)("action_servers") + - Optional(action_clients)("action_clients") + - CCB) - - self.rossystem_grammar = _system + \ - OCB + \ - _name + name("system_name") + \ - _component + ORB + \ - OneOrMore(interface + Optional(",").suppress())("interfaces") + CRB \ - + Optional(topic_connections)("topic_connections") \ - + Optional(g_parameters)("global_parameters") + CCB - self._model = model - self._isFile = isFile - - def _parse_from_string(self): - self._result = self.rossystem_grammar.parseString(self._model) - - def _parse_from_file(self): - self._result = self.rossystem_grammar.parseFile(self._model) - - def parse(self): - self._result = ParseResults() - try: - if self._isFile: - self._parse_from_file() - else: - self._parse_from_string() - except Exception as e: - print(e.args) # Should set a default 'result'? - return self._result - - -if __name__ == "__main__": - import os - my_path = os.path.abspath(os.path.dirname(__file__)) - path = os.path.join( - my_path, "../../resources/robotino.rossystem") - print(path) - - parser = RosSystemModelParser(path) - try: - print(parser.parse().dump()) - # print(parser.parse().interfaces[2].services) - except Exception as e: - print(e.args)