Skip to content

Commit

Permalink
Merge pull request #74 from azavea/feature/handle_unicode_better
Browse files Browse the repository at this point in the history
Feature/handle unicode better
  • Loading branch information
Steve Lamb committed Jul 27, 2015
2 parents 48df5fd + ac7d2db commit 7dc24fe
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 49 deletions.
29 changes: 16 additions & 13 deletions djqscsv/djqscsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def write_csv(queryset, file_obj, **kwargs):
if key not in DJQSCSV_KWARGS:
csv_kwargs[key] = val

# add BOM to suppor CSVs in MS Excel
file_obj.write(u'\ufeff'.encode('utf8'))
# add BOM to support CSVs in MS Excel (for Windows only)
file_obj.write(_safe_utf8_stringify(u'\ufeff'))

# the CSV must always be built from a values queryset
# in order to introspect the necessary fields.
Expand Down Expand Up @@ -110,7 +110,7 @@ def write_csv(queryset, file_obj, **kwargs):
name_map = dict((field, field) for field in field_names)
if use_verbose_names:
name_map.update(
dict((field.name, field.verbose_name.encode('utf-8'))
dict((field.name, field.verbose_name)
for field in queryset.model._meta.fields
if field.name in field_names))

Expand All @@ -119,6 +119,9 @@ def write_csv(queryset, file_obj, **kwargs):
merged_header_map.update(field_header_map)
if extra_columns:
merged_header_map.update(dict((k, k) for k in extra_columns))

merged_header_map = dict((k, _safe_utf8_stringify(v))
for (k, v) in merged_header_map.items())
writer.writerow(merged_header_map)

for record in values_qs:
Expand Down Expand Up @@ -155,6 +158,15 @@ def _validate_and_clean_filename(filename):
return filename


def _safe_utf8_stringify(value):
if isinstance(value, str):
return value
elif isinstance(value, unicode):
return value.encode('utf-8')
else:
return unicode(value).encode('utf-8')


def _sanitize_unicode_record(field_serializer_map, record):

def _serialize_value(value):
Expand All @@ -165,21 +177,12 @@ def _serialize_value(value):
else:
return unicode(value)

def _sanitize_text(value):
# make sure every text value is of type 'str', coercing unicode
if isinstance(value, unicode):
return value.encode("utf-8")
elif isinstance(value, str):
return value
else:
return str(value).encode("utf-8")

obj = {}
for key, val in six.iteritems(record):
if val is not None:
serializer = field_serializer_map.get(key, _serialize_value)
newval = serializer(val)
obj[_sanitize_text(key)] = _sanitize_text(newval)
obj[_safe_utf8_stringify(key)] = _safe_utf8_stringify(newval)

return obj

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='django-queryset-csv',
version='0.3.0',
version='0.3.1',
description='A simple python module for writing querysets to csv',
long_description=open('README.rst').read(),
author=author,
Expand Down
11 changes: 10 additions & 1 deletion test_app/djqscsv_tests/models.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
from django.db import models

from django.utils.translation import ugettext as _

from datetime import datetime

SOME_TIME = datetime(2001, 01, 01, 01, 01)


class Activity(models.Model):
name = models.CharField(max_length=50, verbose_name="Name of Activity")


class Person(models.Model):
name = models.CharField(max_length=50, verbose_name="Person's name")
name = models.CharField(max_length=50, verbose_name=_("Person's name"))
address = models.CharField(max_length=255)
info = models.TextField(verbose_name="Info on Person")
hobby = models.ForeignKey(Activity)
born = models.DateTimeField(default=SOME_TIME)

def __unicode__(self):
return self.name
70 changes: 39 additions & 31 deletions test_app/djqscsv_tests/tests/test_csv_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

from djqscsv_tests.context import SELECT, EXCLUDE, AS, CONSTANT

from djqscsv_tests.models import Person

from djqscsv_tests.util import create_people_and_get_queryset

from django.utils import six
Expand All @@ -23,6 +21,7 @@
else:
from StringIO import StringIO


class CSVTestCase(TestCase):

def setUp(self):
Expand All @@ -38,7 +37,8 @@ def csv_match(self, csv_file, expected_data, **csv_kwargs):
for csv_row, expected_row in test_pairs:
if is_first:
# add the BOM to the data
expected_row = ['\xef\xbb\xbf' + expected_row[0]] + expected_row[1:]
expected_row = (['\xef\xbb\xbf' + expected_row[0]] +
expected_row[1:])
is_first = False
iteration_happened = True
assertion_results.append(csv_row == expected_row)
Expand All @@ -55,7 +55,6 @@ def assertNotMatchesCsv(self, *args, **kwargs):
assertion_results = self.csv_match(*args, **kwargs)
self.assertFalse(all(assertion_results))


def assertQuerySetBecomesCsv(self, qs, expected_data, **kwargs):
obj = StringIO()
djqscsv.write_csv(qs, obj, **kwargs)
Expand All @@ -68,27 +67,30 @@ def assertEmptyQuerySetMatches(self, expected_data, **kwargs):
if DJANGO_VERSION[:2] == (1, 5):
with self.assertRaises(djqscsv.CSVException):
djqscsv.write_csv(qs, obj)
elif DJANGO_VERSION[:2] == (1, 6):
else:
djqscsv.write_csv(qs, obj,
**kwargs)
self.assertEqual(obj.getvalue(), expected_data)


# the csv data that is returned by the most inclusive query under test.
# use this data structure to build smaller data sets
BASE_CSV = [
['id', 'name', 'address',
'info', 'hobby_id', 'hobby__name', 'Most Powerful'],
['1', 'vetch', 'iffish', 'wizard', '1', 'Doing Magic', '0'],
['2', 'nemmerle', 'roke', 'deceased arch mage', '2', 'Resting', '1'],
['3', 'ged', 'gont', 'former arch mage', '2', 'Resting', '1']]
'info', 'hobby_id', 'born', 'hobby__name', 'Most Powerful'],
['1', 'vetch', 'iffish',
'wizard', '1', '2001-01-01T01:01:00', 'Doing Magic', '0'],
['2', 'nemmerle', 'roke',
'deceased arch mage', '2', '2001-01-01T01:01:00', 'Resting', '1'],
['3', 'ged', 'gont',
'former arch mage', '2', '2001-01-01T01:01:00', 'Resting', '1']]

FULL_PERSON_CSV_WITH_RELATED = SELECT(BASE_CSV,
AS('id', 'ID'),
AS('name', 'Person\'s name'),
'address',
AS('info', 'Info on Person'),
'hobby_id',
'born',
'hobby__name')

FULL_PERSON_CSV = EXCLUDE(FULL_PERSON_CSV_WITH_RELATED,
Expand All @@ -115,11 +117,11 @@ def test_write_csv_full_no_verbose(self):
def test_write_csv_limited_no_verbose(self):
qs = self.qs.values('name', 'address', 'info')
self.assertQuerySetBecomesCsv(qs, self.LIMITED_PERSON_CSV_NO_VERBOSE,
use_verbose_names=False)
use_verbose_names=False)

def test_empty_queryset_no_verbose(self):
self.assertEmptyQuerySetMatches(
'\xef\xbb\xbfid,name,address,info,hobby_id\r\n',
'\xef\xbb\xbfid,name,address,info,hobby_id,born\r\n',
use_verbose_names=False)


Expand All @@ -135,13 +137,18 @@ def test_write_csv_limited(self):
def test_empty_queryset(self):
self.assertEmptyQuerySetMatches(
'\xef\xbb\xbfID,Person\'s name,address,'
'Info on Person,hobby_id\r\n')
'Info on Person,hobby_id,born\r\n')


class FieldHeaderMapTests(CSVTestCase):
def test_write_csv_full_custom_headers(self):
overridden_info_csv = ([['ID', "Person's name", 'address',
'INFORMATION', 'hobby_id']] +
self.FULL_PERSON_CSV[1:])
overridden_info_csv = SELECT(self.FULL_PERSON_CSV,
'ID',
"Person's name",
'address',
AS('Info on Person', 'INFORMATION'),
'hobby_id',
'born')

self.assertQuerySetBecomesCsv(
self.qs, overridden_info_csv,
Expand All @@ -155,8 +162,7 @@ def test_write_csv_limited_custom_headers(self):

self.assertQuerySetBecomesCsv(
qs, overridden_info_csv,
field_header_map={ 'info': 'INFORMATION' })

field_header_map={'info': 'INFORMATION'})

def test_write_csv_with_related_custom_headers(self):
overridden_csv = SELECT(self.FULL_PERSON_CSV_WITH_RELATED,
Expand All @@ -166,23 +172,25 @@ def test_write_csv_with_related_custom_headers(self):

self.assertQuerySetBecomesCsv(
qs, overridden_csv,
field_header_map={ 'hobby__name': 'Name of Activity' })
field_header_map={'hobby__name': 'Name of Activity'})

def test_empty_queryset_custom_headers(self):
self.assertEmptyQuerySetMatches(
'\xef\xbb\xbfID,Person\'s name,address,INFORMATION,hobby_id\r\n',
field_header_map={ 'info': 'INFORMATION' })
'\xef\xbb\xbfID,Person\'s name,'
'address,INFORMATION,hobby_id,born\r\n',
field_header_map={'info': 'INFORMATION'})


class WalkRelationshipTests(CSVTestCase):

def test_with_related(self):

qs = self.qs.values('id', 'name', 'address', 'info',
'hobby_id', 'hobby__name')
'hobby_id', 'born', 'hobby__name')

self.assertQuerySetBecomesCsv(qs, self.FULL_PERSON_CSV_WITH_RELATED)


class ColumnOrderingTests(CSVTestCase):
def setUp(self):
self.qs = create_people_and_get_queryset()
Expand All @@ -208,16 +216,17 @@ def test_no_values_matches_models_file(self):
'name',
'address',
'info',
'hobby_id')

'hobby_id',
'born')
self.assertQuerySetBecomesCsv(self.qs, csv,
use_verbose_names=False)


class AggregateTests(CSVTestCase):

def setUp(self):
self.qs = create_people_and_get_queryset().annotate(num_hobbies=Count('hobby'))
self.qs = (create_people_and_get_queryset()
.annotate(num_hobbies=Count('hobby')))

def test_aggregate(self):
csv_with_aggregate = SELECT(self.FULL_PERSON_CSV,
Expand All @@ -226,6 +235,7 @@ def test_aggregate(self):
'address',
"Info on Person",
'hobby_id',
'born',
CONSTANT('1', 'num_hobbies'))
self.assertQuerySetBecomesCsv(self.qs, csv_with_aggregate)

Expand All @@ -234,7 +244,7 @@ class ExtraOrderingTests(CSVTestCase):

def setUp(self):
self.qs = create_people_and_get_queryset().extra(
select={'Most Powerful':"info LIKE '%arch mage%'"})
select={'Most Powerful': "info LIKE '%arch mage%'"})

def test_extra_select(self):
csv_with_extra = SELECT(self.BASE_CSV,
Expand All @@ -243,19 +253,20 @@ def test_extra_select(self):
'address',
AS('info', 'Info on Person'),
'hobby_id',
'born',
'Most Powerful')

self.assertQuerySetBecomesCsv(self.qs, csv_with_extra)


def test_extra_select_ordering(self):
custom_order_csv = SELECT(self.BASE_CSV,
AS('id', 'ID'),
'Most Powerful',
AS('name', "Person's name"),
'address',
AS('info', 'Info on Person'),
'hobby_id')
'hobby_id',
'born')

self.assertQuerySetBecomesCsv(self.qs, custom_order_csv,
field_order=['id', 'Most Powerful'])
Expand Down Expand Up @@ -283,7 +294,6 @@ def test_render_to_csv_response_no_filename(self):
self.assertRegexpMatches(response['Content-Disposition'],
r'attachment; filename=person_export.csv;')


def test_render_to_csv_response(self):
response = djqscsv.render_to_csv_response(self.qs,
filename="test_csv",
Expand All @@ -292,7 +302,6 @@ def test_render_to_csv_response(self):
self.assertMatchesCsv(response.content.split('\n'),
self.FULL_PERSON_CSV_NO_VERBOSE)


def test_render_to_csv_response_other_delimiter(self):
response = djqscsv.render_to_csv_response(self.qs,
filename="test_csv",
Expand All @@ -304,7 +313,6 @@ def test_render_to_csv_response_other_delimiter(self):
self.FULL_PERSON_CSV_NO_VERBOSE,
delimiter="|")


def test_render_to_csv_fails_on_delimiter_mismatch(self):
response = djqscsv.render_to_csv_response(self.qs,
filename="test_csv",
Expand Down
26 changes: 24 additions & 2 deletions test_app/djqscsv_tests/tests/test_utilities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
import datetime

from django.test import TestCase
Expand All @@ -11,6 +12,7 @@
# csv creation process, but don't participate in it
# directly.


class ValidateCleanFilenameTests(TestCase):

def assertValidatedEquals(self, filename, expected_value):
Expand Down Expand Up @@ -63,14 +65,14 @@ def test_sanitize_date_with_non_string_formatter(self):
this practice.
"""
record = {'name': 'Tenar'}
serializer = {'name': lambda d: len(d) }
serializer = {'name': lambda d: len(d)}
sanitized = djqscsv._sanitize_unicode_record(serializer, record)
self.assertEqual(sanitized, {'name': '5'})

def test_sanitize_date_with_formatter(self):
record = {'name': 'Tenar',
'created': datetime.datetime(1973, 5, 13)}
serializer = {'created': lambda d: d.strftime('%Y-%m-%d') }
serializer = {'created': lambda d: d.strftime('%Y-%m-%d')}
sanitized = djqscsv._sanitize_unicode_record(serializer, record)
self.assertEqual(sanitized,
{'name': 'Tenar',
Expand Down Expand Up @@ -115,3 +117,23 @@ def test_generate_filename(self):
r'person_export_[0-9]{8}.csv')


class SafeUtf8EncodeTest(TestCase):
def test_safe_utf8_encode(self):

class Foo(object):
def __unicode__(self):
return u'¯\_(ツ)_/¯'
def __str_(self):
return self.__unicode__().encode('utf-8')

for val in (u'¯\_(ツ)_/¯', 'plain', r'raw',
b'123', 11312312312313L, False,
datetime.datetime(2001, 01, 01),
4, None, [], set(), Foo):

first_pass = djqscsv._safe_utf8_stringify(val)
second_pass = djqscsv._safe_utf8_stringify(first_pass)
third_pass = djqscsv._safe_utf8_stringify(second_pass)
self.assertEqual(first_pass, second_pass)
self.assertEqual(second_pass, third_pass)
self.assertEqual(type(first_pass), type(third_pass))
1 change: 0 additions & 1 deletion test_app/djqscsv_tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ def create_people_and_get_queryset():
info='former arch mage', hobby=resting)

return Person.objects.all()

0 comments on commit 7dc24fe

Please sign in to comment.