-
-
Notifications
You must be signed in to change notification settings - Fork 75
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[feature] Added command line utility to export users #355
Closes #355
- Loading branch information
Showing
6 changed files
with
262 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import csv | ||
|
||
from django.contrib.auth import get_user_model | ||
from django.core.exceptions import ObjectDoesNotExist | ||
from django.core.management.base import BaseCommand | ||
|
||
from ... import settings as app_settings | ||
|
||
User = get_user_model() | ||
|
||
|
||
class Command(BaseCommand): | ||
help = 'Exports user data to a CSV file' | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument( | ||
'--exclude-fields', | ||
dest='exclude_fields', | ||
default='', | ||
help='Comma-separated list of fields to exclude from export', | ||
) | ||
parser.add_argument( | ||
'--filename', | ||
dest='filename', | ||
default='openwisp_exported_users.csv', | ||
help=( | ||
'Filename for the exported CSV, defaults to' | ||
' "openwisp_exported_users.csv"' | ||
), | ||
) | ||
|
||
def handle(self, *args, **options): | ||
fields = app_settings.EXPORT_USERS_COMMAND_CONFIG.get('fields', []).copy() | ||
# Get the fields to be excluded from the command-line argument | ||
exclude_fields = options.get('exclude_fields').split(',') | ||
# Remove excluded fields from the export fields | ||
fields = [field for field in fields if field not in exclude_fields] | ||
# Fetch all user data in a single query using select_related for related models | ||
queryset = User.objects.select_related( | ||
*app_settings.EXPORT_USERS_COMMAND_CONFIG.get('select_related', []), | ||
).order_by('date_joined') | ||
|
||
# Prepare a CSV writer | ||
filename = options.get('filename') | ||
csv_file = open(filename, 'w', newline='') | ||
csv_writer = csv.writer(csv_file) | ||
|
||
# Write header row | ||
csv_writer.writerow(fields) | ||
|
||
# Write data rows | ||
for user in queryset.iterator(): | ||
data_row = [] | ||
for field in fields: | ||
# Extract the value from related models | ||
if '.' in field: | ||
related_model, related_field = field.split('.') | ||
try: | ||
related_value = getattr( | ||
getattr(user, related_model), related_field | ||
) | ||
except ObjectDoesNotExist: | ||
data_row.append('') | ||
else: | ||
data_row.append(related_value) | ||
elif field == 'organizations': | ||
organizations = [] | ||
for org_id, user_perm in user.organizations_dict.items(): | ||
organizations.append(f'({org_id},{user_perm["is_admin"]})') | ||
data_row.append('\n'.join(organizations)) | ||
else: | ||
data_row.append(getattr(user, field)) | ||
csv_writer.writerow(data_row) | ||
|
||
# Close the CSV file | ||
csv_file.close() | ||
self.stdout.write( | ||
self.style.SUCCESS(f'User data exported successfully to {filename}!') | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import csv | ||
from io import StringIO | ||
from unittest.mock import patch | ||
|
||
from django.core.files.temp import NamedTemporaryFile | ||
from django.core.management import call_command | ||
from django.test import TestCase | ||
from openwisp_utils.tests import capture_stdout | ||
from rest_framework.authtoken.models import Token | ||
|
||
from .. import settings as app_settings | ||
from .utils import TestOrganizationMixin | ||
|
||
|
||
class TestManagementCommands(TestOrganizationMixin, TestCase): | ||
def setUp(self): | ||
super().setUp() | ||
self.temp_file = NamedTemporaryFile(mode='wt', delete=False) | ||
|
||
def tearDown(self): | ||
super().tearDown() | ||
self.temp_file.close() | ||
|
||
def test_export_users(self): | ||
org1 = self._create_org(name='org1') | ||
org2 = self._create_org(name='org2') | ||
user = self._create_user() | ||
operator = self._create_operator() | ||
admin = self._create_admin() | ||
self._create_org_user(organization=org1, user=user, is_admin=True) | ||
self._create_org_user(organization=org2, user=user, is_admin=False) | ||
self._create_org_user(organization=org2, user=operator, is_admin=False) | ||
stdout = StringIO() | ||
with self.assertNumQueries(2): | ||
call_command('export_users', filename=self.temp_file.name, stdout=stdout) | ||
self.assertIn( | ||
f'User data exported successfully to {self.temp_file.name}', | ||
stdout.getvalue(), | ||
) | ||
|
||
# Read the content of the temporary file | ||
with open(self.temp_file.name, 'r') as temp_file: | ||
csv_reader = csv.reader(temp_file) | ||
csv_data = list(csv_reader) | ||
|
||
# 3 user and 1 header | ||
self.assertEqual(len(csv_data), 4) | ||
self.assertEqual( | ||
csv_data[0], app_settings.EXPORT_USERS_COMMAND_CONFIG['fields'] | ||
) | ||
# Ensuring ordering | ||
self.assertEqual(csv_data[1][0], str(user.id)) | ||
self.assertEqual(csv_data[2][0], str(operator.id)) | ||
self.assertEqual(csv_data[3][0], str(admin.id)) | ||
# Check organizations formatting | ||
self.assertEqual(csv_data[1][-1], f'({org1.id},True)\n({org2.id},False)') | ||
self.assertEqual(csv_data[2][-1], f'({org2.id},False)') | ||
self.assertEqual(csv_data[3][-1], '') | ||
|
||
@capture_stdout() | ||
def test_exclude_fields(self): | ||
self._create_user() | ||
call_command( | ||
'export_users', | ||
filename=self.temp_file.name, | ||
exclude_fields=','.join( | ||
app_settings.EXPORT_USERS_COMMAND_CONFIG['fields'][1:] | ||
), | ||
) | ||
with open(self.temp_file.name, 'r') as temp_file: | ||
csv_reader = csv.reader(temp_file) | ||
csv_data = list(csv_reader) | ||
|
||
# 1 user and 1 header | ||
self.assertEqual(len(csv_data), 2) | ||
self.assertEqual(csv_data[0], ['id']) | ||
|
||
@patch.object( | ||
app_settings, | ||
'EXPORT_USERS_COMMAND_CONFIG', | ||
{'fields': ['id', 'auth_token.key']}, | ||
) | ||
def test_related_fields(self): | ||
user = self._create_user() | ||
token = Token.objects.create(user=user) | ||
stdout = StringIO() | ||
with self.assertNumQueries(2): | ||
call_command('export_users', filename=self.temp_file.name, stdout=stdout) | ||
self.assertIn( | ||
f'User data exported successfully to {self.temp_file.name}', | ||
stdout.getvalue(), | ||
) | ||
|
||
# Read the content of the temporary file | ||
with open(self.temp_file.name, 'r') as temp_file: | ||
csv_reader = csv.reader(temp_file) | ||
csv_data = list(csv_reader) | ||
|
||
# 3 user and 1 header | ||
self.assertEqual(len(csv_data), 2) | ||
self.assertEqual( | ||
csv_data[1], app_settings.EXPORT_USERS_COMMAND_CONFIG['fields'] | ||
) | ||
self.assertEqual(csv_data[0][0], str(user.id)) | ||
self.assertEqual(csv_data[0][1], str(token.key)) |