From e84ffbd43d21885a00fd32bdedd99ed3cd1cc6bc Mon Sep 17 00:00:00 2001 From: Jose Oscar Vogel Date: Sat, 26 Oct 2019 09:31:38 -0300 Subject: [PATCH 1/3] Agregado WSFCRED y modificado pyfepdf para que emita la factura de credito --- pyfepdf.py | 39 ++-- wsfecred.py | 592 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 613 insertions(+), 18 deletions(-) create mode 100644 wsfecred.py diff --git a/pyfepdf.py b/pyfepdf.py index acd518170..2ec035722 100644 --- a/pyfepdf.py +++ b/pyfepdf.py @@ -120,6 +120,9 @@ class FEPDF: (1, 6, 11, 19, 51): 'Factura', (2, 7, 12, 20, 52): 'Nota de Débito', (3, 8, 13, 21, 53): 'Nota de Crédito', + (201, 206, 211): u'Factura de Crédito MiPyMEs', + (202, 207, 212): u'Nota de Débito MiPyMEs', + (203, 208, 213): u'Nota de Crédito MiPyMEs', (4, 9, 15, 54): 'Recibo', (10, 5): 'Nota de Venta al contado', (60, 61): 'Cuenta de Venta y Líquido producto', @@ -127,13 +130,13 @@ class FEPDF: (91, ): 'Remito', (39, 40): '???? (R.G. N° 3419)'} - letras_fact = {(1, 2, 3, 4, 5, 39, 60, 63): 'A', - (6, 7, 8, 9, 10, 40, 61, 64): 'B', - (11, 12, 13, 15): 'C', + letras_fact = {(1, 2, 3, 4, 5, 39, 60, 63, 201, 202, 203): 'A', + (6, 7, 8, 9, 10, 40, 61, 64, 206, 207, 208): 'B', + (11, 12, 13, 15, 211, 212, 213): 'C', (51, 52, 53, 54): 'M', (19, 20, 21): 'E', (91, ): 'R', - } + } def __init__(self): self.Version = __version__ @@ -517,13 +520,13 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): fact[k] = ds.replace('
', '\n') # divido las observaciones por linea: - if fact.get('obs_generales') and 'obs' not in f and 'ObservacionesGenerales1' not in f: + if fact.get('obs_generales') and not f.has_key('obs') and not f.has_key('ObservacionesGenerales1'): obs = "\nObservaciones:\n\n" + fact['obs_generales'] # limpiar texto (campos dbf) y reemplazar saltos de linea: obs = obs.replace('\x00', '').replace('
', '\n') for ds in f.split_multicell(obs, 'Item.Descripcion01'): li_items.append(dict(codigo=None, ds=ds, qty=None, umed=None, precio=None, importe=None)) - if fact.get('obs_comerciales') and 'obs_comerciales' not in f and 'ObservacionesComerciales1' not in f: + if fact.get('obs_comerciales') and not f.has_key('obs_comerciales') and not f.has_key('ObservacionesComerciales1'): obs = "\nObservaciones Comerciales:\n\n" + fact['obs_comerciales'] # limpiar texto (campos dbf) y reemplazar saltos de linea: obs = obs.replace('\x00', '').replace('
', '\n') @@ -534,13 +537,13 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): permisos = ['Codigo de Despacho %s - Destino de la mercadería: %s' % ( p['id_permiso'], self.paises.get(p['dst_merc'], p['dst_merc'])) for p in fact.get('permisos', [])] - #import dbg; dbg.set_trace() - if 'permiso.id1' in f and "permiso.delivery1" in f: + + if f.has_key('permiso.id1') and f.has_key("permiso.delivery1"): for i, p in enumerate(fact.get('permisos', [])): self.AgregarDato("permiso.id%d" % (i + 1), p['id_permiso']) pais_dst = self.paises.get(p['dst_merc'], p['dst_merc']) self.AgregarDato("permiso.delivery%d" % (i + 1), pais_dst) - elif 'permisos' not in f and permisos: + elif not f.has_key('permisos') and permisos: obs = "\nPermisos de Embarque:\n\n" + '\n'.join(permisos) for ds in f.split_multicell(obs, 'Item.Descripcion01'): li_items.append(dict(codigo=None, ds=ds, qty=None, umed=None, precio=None, importe=None)) @@ -549,7 +552,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): # agrego comprobantes asociados cmps_asoc = ['%s %s %s' % self.fmt_fact(c['cbte_tipo'], c['cbte_punto_vta'], c['cbte_nro']) for c in fact.get('cbtes_asoc', [])] - if 'cmps_asoc' not in f and cmps_asoc: + if not f.has_key('cmps_asoc') and cmps_asoc: obs = "\nComprobantes Asociados:\n\n" + '\n'.join(cmps_asoc) for ds in f.split_multicell(obs, 'Item.Descripcion01'): li_items.append(dict(codigo=None, ds=ds, qty=None, umed=None, precio=None, importe=None)) @@ -572,7 +575,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): # mostrar las validaciones no excluyentes de AFIP (observaciones) if fact.get('motivos_obs') and fact['motivos_obs'] != '00': - if 'motivos_ds.L' not in f: + if not f.has_key('motivos_ds.L'): motivos_ds = "Irregularidades observadas por AFIP (F136): %s" % fact['motivos_obs'] else: motivos_ds = "%s" % fact['motivos_obs'] @@ -583,7 +586,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): if letra_fact in ('A', 'M'): msg_no_iva = "\nEl IVA discriminado no puede computarse como Crédito Fiscal (RG2485/08 Art. 30 inc. c)." - if 'leyenda_credito_fiscal' not in f and motivos_ds: + if not f.has_key('leyenda_credito_fiscal') and motivos_ds: motivos_ds += msg_no_iva copias = {1: 'Original', 2: 'Duplicado', 3: 'Triplicado'} @@ -687,7 +690,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): if it['codigo'] is not None: f.set('Item.Codigo%02d' % li, it['codigo']) if it['umed'] is not None: - if it['umed'] and "Item.Umed_ds01" in f: + if it['umed'] and f.has_key("Item.Umed_ds01"): # recortar descripción: umed_ds = self.umeds_ds.get(int(it['umed'])) s = f.split_multicell(umed_ds, 'Item.Umed_ds01') @@ -829,9 +832,9 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): # Datos del pie de factura (obtenidos desde AFIP): f.set('motivos_ds', motivos_ds) - if 'motivos_ds1' in f and motivos_ds: + if f.has_key('motivos_ds1') and motivos_ds: if letra_fact in ('A', 'M'): - if 'leyenda_credito_fiscal' in f: + if f.has_key('leyenda_credito_fiscal'): f.set('leyenda_credito_fiscal', msg_no_iva) for i, txt in enumerate(f.split_multicell(motivos_ds, 'motivos_ds1')): f.set('motivos_ds%d' % (i + 1), txt) @@ -863,13 +866,13 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'): f.set('estado', "") # compatibilidad hacia atras # colocar campos de observaciones (si no van en ds) - if 'observacionesgenerales1' in f and 'obs_generales' in fact: + if f.has_key('observacionesgenerales1') and 'obs_generales' in fact: for i, txt in enumerate(f.split_multicell(fact['obs_generales'], 'ObservacionesGenerales1')): f.set('ObservacionesGenerales%d' % (i + 1), txt) - if 'observacionescomerciales1' in f and 'obs_comerciales' in fact: + if f.has_key('observacionescomerciales1') and 'obs_comerciales' in fact: for i, txt in enumerate(f.split_multicell(fact['obs_comerciales'], 'ObservacionesComerciales1')): f.set('ObservacionesComerciales%d' % (i + 1), txt) - if 'enletras1' in f and 'en_letras' in fact: + if f.has_key('enletras1') and 'en_letras' in fact: for i, txt in enumerate(f.split_multicell(fact['en_letras'], 'EnLetras1')): f.set('EnLetras%d' % (i + 1), txt) diff --git a/wsfecred.py b/wsfecred.py new file mode 100644 index 000000000..ae19f757e --- /dev/null +++ b/wsfecred.py @@ -0,0 +1,592 @@ +#!/usr/bin/python +# -*- coding: utf8 -*- +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 3, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. + +"""Módulo para la Gestión de cuentas corrientes de Facturas Electrónicas de +Crédito del servicio web FECredService versión 1.0.1-rc1 (RG4367/18) +""" + +__author__ = "Mariano Reingart " +__copyright__ = "Copyright (C) 2018-2019 Mariano Reingart" +__license__ = "LGPL 3.0" +__version__ = "1.02a" + +LICENCIA = """ +wsfecred.py: Interfaz para REGISTRO DE FACTURAS de CRÉDITO ELECTRÓNICA MiPyMEs +Resolución General 4367/2018. +Copyright (C) 2019 Mariano Reingart reingart@gmail.com +http://www.sistemasagiles.com.ar/trac/wiki/FacturaCreditoElectronica + +Este progarma es software libre, se entrega ABSOLUTAMENTE SIN GARANTIA +y es bienvenido a redistribuirlo bajo la licencia GPLv3. + +Para información adicional sobre garantía, soporte técnico comercial +e incorporación/distribución en programas propietarios ver PyAfipWs: +http://www.sistemasagiles.com.ar/trac/wiki/PyAfipWs +""" + +AYUDA = """ +Opciones: + --ayuda: este mensaje + + --debug: modo depuración (detalla y confirma las operaciones) + --prueba: genera y autoriza una rec de prueba (no usar en producción!) + --xml: almacena los requerimientos y respuestas XML (depuración) + --dummy: consulta estado de servidores + + --obligado: consultar monto obligado a recepcion (según CUIT) + --ctasctes: consultar cuentas corrientes generadas a partir de facturación + + --tipos_ajuste: tabla de parametros para tipo de ajuste + --tipos_cancelacion: tabla de parametros para formas cancelacion + --tipos_retencion: tabla de parametros para tipo de retenciones + --tipos_rechazo: tabla de parametros para tipo de motivos de rechazo + +Ver rece.ini para parámetros de configuración (URL, certificados, etc.)" +""" + +from collections import OrderedDict +import datetime +import os, sys, time, base64 +import traceback +from pysimplesoap.client import SoapFault +# import utils + +# importo funciones compartidas: +from .utils import json, BaseWS, inicializar_y_capturar_excepciones, get_install_dir, json_serializer + +# constantes de configuración (producción/homologación): + +WSDL = ["https://serviciosjava.afip.gob.ar/wsfecred/FECredService?wsdl", + "https://fwshomo.afip.gov.ar/wsfecred/FECredService?wsdl"] + +DEBUG = False +XML = False +CONFIG_FILE = "rece.ini" +HOMO = False +ENCABEZADO = [] + + +class WSFECred(BaseWS): + "Interfaz para el WebService de Factura de Crédito Electronica" + _public_methods_ = ['Conectar', 'Dummy', 'SetTicketAcceso', 'DebugLog', + 'CrearFECred', 'AgregarFormasCancelacion', 'AgregarAjustesOperacion', 'AgregarRetenciones', + 'AgregarConfirmarNotasDC', + 'ConsultarCtasCtes', 'LeerCtaCte', + 'ConsultarTiposAjustesOperacion', 'ConsultarTiposFormasCancelacion', + 'ConsultarTiposMotivosRechazo', 'ConsultarTiposRetenciones', + 'ConsultarMontoObligadoRecepcion', + 'SetParametros', 'SetParametro', 'GetParametro', 'AnalizarXml', 'ObtenerTagXml', 'LoadTestXML', + ] + _public_attrs_ = ['XmlRequest', 'XmlResponse', 'Version', 'Traceback', 'Excepcion', 'LanzarExcepciones', + 'Token', 'Sign', 'Cuit', 'AppServerStatus', 'DbServerStatus', 'AuthServerStatus', + 'CodCtaCte', 'TipoComprobante', 'PuntoVenta', + 'NroComprobante', 'CodAutorizacion', 'FechaVencimiento', 'FechaEmision', 'Estado', 'Resultado', + 'QR', + 'ErrCode', 'ErrMsg', 'Errores', 'ErroresFormato', 'Observaciones', 'Obs', 'Evento', 'Eventos', + ] + _reg_progid_ = "WSFECred" + _reg_clsid_ = "{F4B2B652-C992-4E46-9134-121F62011C46}" + + # Variables globales para BaseWS: + HOMO = HOMO + WSDL = WSDL[HOMO] + LanzarExcepciones = False + Version = "%s %s" % (__version__, HOMO and 'Homologación' or '') + + def Conectar(self, *args, **kwargs): + ret = BaseWS.Conectar(self, *args, **kwargs) + return ret + + def inicializar(self): + self.AppServerStatus = self.DbServerStatus = self.AuthServerStatus = None + self.CodCtaCte = self.TipoComprobante = self.PuntoVenta = None + self.NroComprobante = self.CUITEmisor = None + self.Resultado = None + self.Errores = [] + self.ErroresFormato = [] + self.Observaciones = [] + self.Eventos = [] + self.Evento = self.ErrCode = self.ErrMsg = self.Obs = "" + + def __analizar_errores(self, ret): + "Comprueba y extrae errores si existen en la respuesta XML" + self.Errores = [err['codigoDescripcion'] for err in ret.get('arrayErrores', [])] + self.ErroresFormato = [err['codigoDescripcionString'] for err in ret.get('arrayErroresFormato', [])] + errores = self.Errores + self.ErroresFormato + self.ErrCode = ' '.join(["%(codigo)s" % err for err in errores]) + self.ErrMsg = '\n'.join(["%(codigo)s: %(descripcion)s" % err for err in errores]) + + def __analizar_observaciones(self, ret): + "Comprueba y extrae observaciones si existen en la respuesta XML" + self.Observaciones = [obs["codigoDescripcion"] for obs in ret.get('arrayObservaciones', [])] + self.Obs = '\n'.join(["%(codigo)s: %(descripcion)s" % obs for obs in self.Observaciones]) + + def __analizar_evento(self, ret): + "Comprueba y extrae el wvento informativo si existen en la respuesta XML" + evt = ret.get('evento') + if evt: + self.Eventos = [evt] + self.Evento = "%(codigo)s: %(descripcion)s" % evt + + @inicializar_y_capturar_excepciones + def Dummy(self): + "Obtener el estado de los servidores de la AFIP" + results = self.client.dummy()['dummyReturn'] + self.AppServerStatus = str(results['appserver']) + self.DbServerStatus = str(results['dbserver']) + self.AuthServerStatus = str(results['authserver']) + + @inicializar_y_capturar_excepciones + def CrearFECred(self, cuit_emisor, tipo_cbte, punto_vta, nro_cbte, cod_moneda="PES", ctz_moneda_ult=1, + importe_cancelado=0.00, importe_embargo_pesos=0.00, importe_total_ret_pesos=0.00, + saldo_aceptado=0.00, tipo_cancelacion="TOT", + **kwargs): + "Inicializa internamente los datos de una Factura de Crédito Electrónica para aceptacion/rechazo" + self.factura = { + 'idCtaCte': { + ## "codCtaCte": 2561, + "idFactura": { + 'CUITEmisor': cuit_emisor, + 'codTipoCmp': tipo_cbte, + 'ptoVta': punto_vta, + 'nroCmp': nro_cbte, + } + }, + 'codMoneda': cod_moneda, + 'cotizacionMonedaUlt': ctz_moneda_ult, + 'importeCancelado': importe_cancelado, + 'importeEmbargoPesos': importe_embargo_pesos, + 'importeTotalRetPesos': importe_total_ret_pesos, + 'saldoAceptado': saldo_aceptado, + 'tipoCancelacion': tipo_cancelacion, + 'arrayAjustesOperacion': [], + 'arrayFormasCancelacion': [], + 'arrayRetenciones': [], + 'arrayConfirmarNotasDC': [], + } + return True + + @inicializar_y_capturar_excepciones + def AgregarAjustesOperacion(self, codigo=None, importe=0.00, **kwargs): + "Agrega la información de los ajustes a la Factura de Crédito Electrónica" + self.factura['arrayAjustesOperacion'].append({ + 'ajuste': { + 'codigo': codigo, + 'importe': importe, + } + }) + return True + + @inicializar_y_capturar_excepciones + def AgregarFormasCancelacion(self, codigo=None, descripcion=None, **kwargs): + "Agrega la información de las formas de cancelación a la Factura de Crédito Electrónica" + self.factura['arrayFormasCancelacion'].append({ + 'codigoDescripcion': { + 'codigo': codigo, + 'descripcion': descripcion, + } + }) + return True + + @inicializar_y_capturar_excepciones + def AgregarRetenciones(self, cod_tipo=None, desc_motivo=None, importe=None, porcentaje=None, **kwargs): + "Agrega la información de las retenciones a la Factura de Crédito Electrónica" + self.factura['arrayRetenciones'].append({ + 'retencion': { + 'codTipo': cod_tipo, + 'descMotivo': desc_motivo, + 'importe': importe, + 'porcentaje': porcentaje, + } + }) + return True + + @inicializar_y_capturar_excepciones + def AgregarConfirmarNotasDC(self, cuit_emisor, tipo_cbte, punto_vta, nro_cbte, **kwargs): + "Agrega la información referente al viaje del remito electrónico cárnico" + self.factura['arrayConfirmarNotasDC'].append({ + 'confirmarNota': { + 'acepta': 'acepta', + 'idNota': { + 'CUITEmisor': cuit_emisor, + 'codTipoCmp': tipo_cbte, + 'ptoVta': punto_vta, + 'nroCmp': nro_cbte, + } + } + }) + return True + + @inicializar_y_capturar_excepciones + def AceptarFECred(self): + "Aceptar el saldo actual de la Cta. Cte. de una Factura de Crédito" + # pudiendo indicar: pagos parciales, retenciones y/o embargos + params = { + 'authRequest': { + 'cuitRepresentada': self.Cuit, + 'sign': self.Sign, + 'token': self.Token + }, + } + params.update(self.factura) + response = self.client.aceptarFECred(**params) + ret = response.get("operacionFECredReturn") + if ret: + self.__analizar_errores(ret) + self.__analizar_observaciones(ret) + self.__analizar_evento(ret) + self.AnalizarFECred(ret) + return True + + @inicializar_y_capturar_excepciones + def AnalizarFECred(self, ret, archivo=None): + "Extrae el resultado de la Factura de Crédito Electrónica, si existen en la respuesta XML" + if ret: + id_cta_cte = ret.get("idCtaCte", {}) + self.CodCtaCte = id_cta_cte.get("codCtaCte") + id_factura = id_cta_cte.get("idFactura") + if id_factura: + self.CUITEmisor = ret.get("cuitEmisor") + self.TipoComprobante = ret.get("tipoComprobante") + self.PuntoVenta = ret.get("ptoVta") + self.NroComprobante = ret.get('NroComprobante') + self.Resultado = ret.get('resultado') + + @inicializar_y_capturar_excepciones + def ConsultarTiposAjustesOperacion(self, sep="||"): + "Listar los tipos de ajustes disponibles" + # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada + ret = self.client.consultarTiposAjustesOperacion( + authRequest={ + 'token': self.Token, 'sign': self.Sign, + 'cuitRepresentada': self.Cuit, }, + )['codigoDescripcionReturn'] + self.__analizar_errores(ret) + array = ret.get('arrayCodigoDescripcion', []) + lista = [it['codigoDescripcion'] for it in array] + return [(u"%s {codigo} %s {descripcion} %s" % (sep, sep, sep)).format(**it) if sep else it for it in lista] + + @inicializar_y_capturar_excepciones + def ConsultarTiposFormasCancelacion(self, sep="||"): + "Listar los tipos de formas de cancelación habilitados para una Factura Electrónica de Crédito" + # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada + ret = self.client.consultarTiposFormasCancelacion( + authRequest={ + 'token': self.Token, 'sign': self.Sign, + 'cuitRepresentada': self.Cuit, }, + )['codigoDescripcionReturn'] + self.__analizar_errores(ret) + array = ret.get('arrayCodigoDescripcion', []) + lista = [it['codigoDescripcion'] for it in array] + return [(u"%s {codigo} %s {descripcion} %s" % (sep, sep, sep)).format(**it) if sep else it for it in lista] + + @inicializar_y_capturar_excepciones + def ConsultarTiposMotivosRechazo(self, sep="||"): + "Listar los tipos de motivos de rechazo habilitados para una cta. cte." + # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada + ret = self.client.consultarTiposMotivosRechazo( + authRequest={ + 'token': self.Token, 'sign': self.Sign, + 'cuitRepresentada': self.Cuit, }, + )['codigoDescripcionReturn'] + self.__analizar_errores(ret) + array = ret.get('arrayCodigoDescripcion', []) + lista = [it['codigoDescripcion'] for it in array] + return [(u"%s {codigo} %s {descripcion} %s" % (sep, sep, sep)).format(**it) if sep else it for it in lista] + + @inicializar_y_capturar_excepciones + def ConsultarTiposRetenciones(self, sep="||"): + "Listar los tipos de retenciones habilitados para una Factura Electrónica de Crédito" + # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada + ret = self.client.consultarTiposRetenciones( + authRequest={ + 'token': self.Token, 'sign': self.Sign, + 'cuitRepresentada': self.Cuit, }, + )['consultarTiposRetencionesReturn'] + self.__analizar_errores(ret) + array = ret.get('arrayTiposRetenciones', []) + lista = [it['tipoRetencion'] for it in array] + return [(u"%s {codigoJurisdiccion} %s {descripcionJurisdiccion} %s {porcentajeRetencion} %s" % + (sep, sep, sep, sep)).format(**it) if sep else it for it in lista] + + @inicializar_y_capturar_excepciones + def ConsultarMontoObligadoRecepcion(self, cuit_consultada, fecha_emision=None): + "Conocer la obligación respecto a la emisión o recepción de Facturas de Créditos" + if not fecha_emision: + fecha_emision = datetime.datetime.today().strftime("%Y-%m-%d") + response = self.client.consultarMontoObligadoRecepcion( + authRequest={ + 'token': self.Token, 'sign': self.Sign, + 'cuitRepresentada': self.Cuit, + }, + cuitConsultada=cuit_consultada, + fechaEmision=fecha_emision, + ) + ret = response.get('consultarMontoObligadoRecepcionReturn') + if ret: + self.__analizar_errores(ret) + self.__analizar_observaciones(ret) + self.__analizar_evento(ret) + self.Resultado = ret['obligado'] + return ret['montoDesde'] + + @inicializar_y_capturar_excepciones + def ConsultarCtasCtes(self, cuit_contraparte=None, rol="Receptor", + fecha_desde=None, fecha_hasta=None, fecha_tipo="Emision"): + """Obtener las cuentas corrientes que fueron generadas a partir de la facturación + + Args: + cuit_contraparte (str): Cuit de la contraparte, que ocupa el rol opuesto (CUITContraparte) + rol (str): Identificar la CUIT Representada que origina la cuenta corriente (rolCUITRepresentada) + "Emisor" o "Receptor" + fecha_desde (str): Fecha Desde, si no se indica se usa "2019-01-01" + fecha_hasta (str): Fecha Hasta, si no se indica se usa la fecha de hoy + fecha_tipo (str): permite determinar sobre qué fecha vamos a hacer el filtro (TipoFechaSimpleType) + "Emision": Fecha de Emisión + "PuestaDispo": Fecha puesta a Disposición + "VenPago": Fecha vencimiento de pago + "VenAcep": Fecha vencimiento aceptación + "Acep": Fecha aceptación + "InfoAgDptoCltv": Fecha informada a Agente de Deposito + + Returns: + int: cantidad de cuentas corrientes + """ + if not fecha_desde: + fecha_desde = datetime.datetime.today().strftime("2019-01-01") + if not fecha_hasta: + fecha_hasta = datetime.datetime.today().strftime("%Y-%m-%d") + response = self.client.consultarCtasCtes( + authRequest={ + 'token': self.Token, 'sign': self.Sign, + 'cuitRepresentada': self.Cuit, + }, + CUITContraparte=cuit_contraparte, + rolCUITRepresentada=rol, + fecha={ + 'desde': fecha_desde, + 'hasta': fecha_hasta, + 'tipo': fecha_tipo + }, + ) + ret = response.get('consultarCtasCtesReturn') + self.ctas_ctes = [] + if ret: + self.__analizar_errores(ret) + self.__analizar_observaciones(ret) + self.__analizar_evento(ret) + array = ret.get('arrayInfosCtaCte', []) + for cc in [it['infoCtaCte'] for it in array]: + cc = { + 'cod_cta_cte': cc['codCtaCte'], + 'estado_cta_cte': cc['estadoCtaCte']['estado'], + 'fecha_hora_estado': cc['estadoCtaCte']['fechaHoraEstado'], + 'cuit_emisor': cc['idFacturaCredito']['CUITEmisor'], + 'tipo_cbte': cc['idFacturaCredito']['codTipoCmp'], + 'nro_cbte': cc['idFacturaCredito']['nroCmp'], + 'punto_vta': cc['idFacturaCredito']['ptoVta'], + 'cod_moneda': cc['codMoneda'], + 'importe_total_fc': cc['importeTotalFC'], + 'saldo': cc['saldo'], + 'saldo_aceptado': cc['saldoAceptado'], + } + self.ctas_ctes.append(cc) + return len(self.ctas_ctes) + + @inicializar_y_capturar_excepciones + def LeerCtaCte(self, pos=0): + """Leer la cuenta corriente generada a partir de la facturación + + Args: + pos (int): posición de la cuenta corriente (0 a n) + + Returns: + dict: elemento de la cuenta corriente: { + 'cod_cta_cte': 2561, + 'estado_cta_cte': 'Modificable', + 'fecha_hora_estado': datetime.datetime(2019, 5, 13, 9, 25, 32), + 'cuit_emisor': 20267565393, + 'tipo_cbte': 201, + 'nro_cbte': 22, + 'punto_vta': 999 + 'cod_moneda': 'PES', + 'importe_total_fc': Decimal('12850000'), + 'saldo': Decimal('12850000'), + 'saldo_aceptado': Decimal('0') + } + """ + from win32com.client import Dispatch + d = Dispatch('Scripting.Dictionary') + cc = self.ctas_ctes.pop(pos) if pos < len(self.ctas_ctes) else {} + for k, v in cc.items(): + d.Add(k, str(v)) + return d + + +# busco el directorio de instalación (global para que no cambie si usan otra dll) +if not hasattr(sys, "frozen"): + basepath = __file__ +elif sys.frozen == 'dll': + import win32api + + basepath = win32api.GetModuleFileName(sys.frozendllhandle) +else: + basepath = sys.executable +INSTALL_DIR = WSFECred.InstallDir = get_install_dir() + +if __name__ == '__main__': + if '--ayuda' in sys.argv: + print(LICENCIA) + print(AYUDA) + sys.exit(0) + + if "--register" in sys.argv or "--unregister" in sys.argv: + import win32com.server.register + + win32com.server.register.UseCommandLine(WSFECred) + sys.exit(0) + + from configparser import SafeConfigParser + + try: + + if "--version" in sys.argv: + print("Versión: ", __version__) + + for arg in sys.argv[1:]: + if arg.startswith("--"): + break + print("Usando configuración:", arg) + CONFIG_FILE = arg + + config = SafeConfigParser() + config.read(CONFIG_FILE) + CERT = config.get('WSAA', 'CERT') + PRIVATEKEY = config.get('WSAA', 'PRIVATEKEY') + CUIT = config.get('WSFECred', 'CUIT') + ENTRADA = config.get('WSFECred', 'ENTRADA') + SALIDA = config.get('WSFECred', 'SALIDA') + + if config.has_option('WSAA', 'URL') and not HOMO: + wsaa_url = config.get('WSAA', 'URL') + else: + wsaa_url = None + if config.has_option('WSFECred', 'URL') and not HOMO: + wsfecred_url = config.get('WSFECred', 'URL') + else: + wsfecred_url = WSDL[HOMO] + + if config.has_section('DBF'): + conf_dbf = dict(config.items('DBF')) + if DEBUG: print("conf_dbf", conf_dbf) + else: + conf_dbf = {} + + DEBUG = '--debug' in sys.argv + XML = '--xml' in sys.argv + + if DEBUG: + print("Usando Configuración:") + print("wsaa_url:", wsaa_url) + print("wsfecred_url:", wsfecred_url) + + # obteniendo el TA + from wsaa import WSAA + + wsaa = WSAA() + ta = wsaa.Autenticar("wsfecred", CERT, PRIVATEKEY, wsaa_url, debug=DEBUG) + if not ta: + sys.exit("Imposible autenticar con WSAA: %s" % wsaa.Excepcion) + + # cliente soap del web service + wsfecred = WSFECred() + wsfecred.Conectar(wsdl=wsfecred_url) + wsfecred.SetTicketAcceso(ta) + wsfecred.Cuit = CUIT + ok = None + + if '--dummy' in sys.argv: + ret = wsfecred.Dummy() + print("AppServerStatus", wsfecred.AppServerStatus) + print("DbServerStatus", wsfecred.DbServerStatus) + print("AuthServerStatus", wsfecred.AuthServerStatus) + sys.exit(0) + + if '--obligado' in sys.argv: + try: + cuit_consultar = int(sys.argv[sys.argv.index("--obligado") + 1]) + except IndexError as ValueError: + cuit_consultar = raw_input("Cuit a Consultar: ") + ret = wsfecred.ConsultarMontoObligadoRecepcion(cuit_consultar) + print("Obligado:", wsfecred.Resultado) + print("Monto Desde:", ret) + + if '--ctasctes' in sys.argv: + try: + cuit_contraparte = int(sys.argv[sys.argv.index("--ctasctes") + 1]) + except IndexError as ValueError: + cuit_contraparte = None + ret = wsfecred.ConsultarCtasCtes(cuit_contraparte, rol="Emisor") + print("Observaciones:", wsfecred.Obs) + import pprint + + for cc in ret: + pprint.pprint(cc) + + if '--prueba' in sys.argv: + fec = dict( + cuit_emisor=30999999999, + tipo_cbte=201, punto_vta=99, nro_cbte=22, + cod_moneda="PES", ctz_moneda_ult=1, + importe_cancelado=1000.00, importe_embargo_pesos=0.00, importe_total_ret_pesos=0.00, + saldo_aceptado=1000.00, tipo_cancelacion="TOT", + ) + + wsfecred.CrearFECred(**fec) + wsfecred.AgregarFormasCancelacion(codigo=2, descripcion="Transferencia Bancaria") + wsfecred.AceptarFECred() + + print("Resultado", wsfecred.Resultado) + print("CodCtaCte", wsfecred.CodCtaCte) + + # Recuperar parámetros: + + if '--tipos_ajuste' in sys.argv: + ret = wsfecred.ConsultarTiposAjustesOperacion() + print("\n".join(ret)) + + if '--tipos_cancelacion' in sys.argv: + ret = wsfecred.ConsultarTiposFormasCancelacion() + print("\n".join(ret)) + + if '--tipos_retencion' in sys.argv: + ret = wsfecred.ConsultarTiposRetenciones() + print("\n".join(ret)) + + if '--tipos_rechazo' in sys.argv: + ret = wsfecred.ConsultarTiposMotivosRechazo() + print("\n".join(ret)) + + if wsfecred.Errores or wsfecred.ErroresFormato: + print("Errores:", wsfecred.Errores, wsfecred.ErroresFormato) + + print("hecho.") + + except SoapFault as e: + print("Falla SOAP:", e.faultcode, e.faultstring.encode("ascii", "ignore")) + sys.exit(3) + except Exception as e: + ex = utils.exception_info() + print(ex) + if DEBUG: + raise + sys.exit(5) From 882cebf2c657335842b5965c90f2b71d42c32416 Mon Sep 17 00:00:00 2001 From: Oscar Vogel Date: Mon, 1 Jun 2020 11:29:17 -0300 Subject: [PATCH 2/3] Error con el afip_ca solucionado --- nsis.py | 2 +- tests/wsfev1.py | 2 +- wsaa.py | 109 ++++++++++++++++++++++++++++++------------------ wsfecred.py | 2 +- 4 files changed, 72 insertions(+), 43 deletions(-) diff --git a/nsis.py b/nsis.py index e752bc863..06ca07655 100644 --- a/nsis.py +++ b/nsis.py @@ -24,7 +24,7 @@ import os import sys from py2exe.build_exe import py2exe - +# import py2exe nsi_base_script = r"""\ ; base.nsi diff --git a/tests/wsfev1.py b/tests/wsfev1.py index ead3251b3..f43b2135b 100644 --- a/tests/wsfev1.py +++ b/tests/wsfev1.py @@ -9,7 +9,7 @@ # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. - +from pyafipws import wsfev1 from pyafipws.wsaa import WSAA from pyafipws.wsfev1 import WSFEv1 "Pruebas para WSFEv1 de AFIP (Factura Electrónica Mercado Interno sin detalle)" diff --git a/wsaa.py b/wsaa.py index eaa4cdc10..24a15b1d1 100644 --- a/wsaa.py +++ b/wsaa.py @@ -1,5 +1,5 @@ #!/usr/bin/python -# -*- coding: latin-1 -*- +# -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 3, or (at your option) any later @@ -10,11 +10,11 @@ # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. -"Módulo para obtener un ticket de autorización del web service WSAA de AFIP" +"Módulo para obtener un ticket de autorización del web service WSAA de AFIP" # Basado en wsaa-client.php de Gerardo Fisanotti - DvSHyS/DiOPIN/AFIP - 13-apr-07 # Definir WSDL, CERT, PRIVATEKEY, PASSPHRASE, SERVICE, WSAAURL -# Devuelve TA.xml (ticket de autorización de WSAA) +# Devuelve TA.xml (ticket de autorización de WSAA) __author__ = "Mariano Reingart (reingart@gmail.com)" __copyright__ = "Copyright (C) 2008-2011 Mariano Reingart" @@ -38,17 +38,18 @@ except ImportError: ex = exception_info() warnings.warn("No es posible importar M2Crypto (OpenSSL)") - warnings.warn(ex['msg']) # revisar instalación y DLLs de OpenSSL + warnings.warn(ex['msg']) # revisar instalación y DLLs de OpenSSL BIO = Rand = SMIME = SSL = None # utilizar alternativa (ejecutar proceso por separado) from subprocess import Popen, PIPE from base64 import b64encode + from tempfile import NamedTemporaryFile # Constantes (si se usa el script de linea de comandos) WSDL = "https://wsaahomo.afip.gov.ar/ws/services/LoginCms?wsdl" # El WSDL correspondiente al WSAA CERT = "reingart.crt" # El certificado X.509 obtenido de Seg. Inf. PRIVATEKEY = "reingart.key" # La clave privada del certificado CERT -PASSPHRASE = "xxxxxxx" # La contraseña para firmar (si hay) +PASSPHRASE = "xxxxxxx" # La contraseña para firmar (si hay) SERVICE = "wsfe" # El nombre del web service al que se le pide el TA # WSAAURL: la URL para acceder al WSAA, verificar http/https y wsaa/wsaahomo @@ -57,7 +58,7 @@ SOAP_ACTION = 'http://ar.gov.afip.dif.facturaelectronica/' # Revisar WSDL SOAP_NS = "http://wsaa.view.sua.dvadac.desein.afip.gov" # Revisar WSDL -# Verificación del web server remoto, necesario para verificar canal seguro +# Verificación del web server remoto, necesario para verificar canal seguro CACERT = "conf/afip_ca_info.crt" # WSAA CA Cert (Autoridades de Confiaza) HOMO = False @@ -65,7 +66,7 @@ DEFAULT_TTL = 60 * 60 * 5 # five hours DEBUG = False -# No debería ser necesario modificar nada despues de esta linea +# No debería ser necesario modificar nada despues de esta linea def create_tra(service=SERVICE, ttl=2400): @@ -82,18 +83,19 @@ def create_tra(service=SERVICE, ttl=2400): tra.header.add_child('generationTime', str(date('c', date('U') - ttl))) tra.header.add_child('expirationTime', str(date('c', date('U') + ttl))) tra.add_child('service', service) - return tra.as_xml().decode("utf8") + return tra.as_xml() def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""): "Firmar PKCS#7 el TRA y devolver CMS (recortando los headers SMIME)" if BIO: + print("pudo importar m2crypto") # Firmar el texto (tra) usando m2crypto (openssl bindings para python) buf = BIO.MemoryBuffer(tra) # Crear un buffer desde el texto - # Rand.load_file('randpool.dat', -1) # Alimentar el PRNG + #Rand.load_file('randpool.dat', -1) # Alimentar el PRNG s = SMIME.SMIME() # Instanciar un SMIME - # soporte de contraseña de encriptación (clave privada, opcional) + # soporte de contraseña de encriptación (clave privada, opcional) callback = lambda *args, **kwarg: passphrase # Cargar clave privada y certificado if not privatekey.startswith("-----BEGIN RSA PRIVATE KEY-----"): @@ -104,8 +106,8 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""): else: raise RuntimeError("Archivos no encontrados: %s, %s" % (privatekey, cert)) # crear buffers en memoria de la clave privada y certificado: - key_bio = BIO.MemoryBuffer(privatekey) - crt_bio = BIO.MemoryBuffer(cert) + key_bio = BIO.MemoryBuffer(privatekey.encode('utf8')) + crt_bio = BIO.MemoryBuffer(cert.encode('utf8')) s.load_key_bio(key_bio, crt_bio, callback) # (desde buffer) p7 = s.sign(buf, 0) # Firmar el buffer out = BIO.MemoryBuffer() # Crear un buffer para la salida @@ -113,7 +115,7 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""): # Rand.save_file('randpool.dat') # Guardar el estado del PRNG's # extraer el cuerpo del mensaje (parte firmada) - msg = email.message_from_string(out.read()) + msg = email.message_from_string(out.read().decode('utf8')) for part in msg.walk(): filename = part.get_filename() if filename == "smime.p7m": # es la parte firmada? @@ -128,10 +130,35 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""): openssl = r"c:\OpenSSL-Win32\bin\openssl.exe" else: openssl = r"c:\OpenSSL-Win64\bin\openssl.exe" - out = Popen([openssl, "smime", "-sign", - "-signer", cert, "-inkey", privatekey, - "-outform", "DER", "-nodetach"], - stdin=PIPE, stdout=PIPE, stderr=PIPE).communicate(tra.encode("utf8"))[0] + # NOTE: workaround if certificate is not already stored in a file + # SECURITY WARNING: the private key will be exposed a bit in /tmp + # (in theory only for the current user) + if cert.startswith("-----BEGIN CERTIFICATE-----"): + cert_f = NamedTemporaryFile() + cert_f.write(cert.encode('utf-8')) + cert_f.flush() + cert = cert_f.name + else: + cert_f = None + if privatekey.startswith("-----BEGIN RSA PRIVATE KEY-----"): + key_f = NamedTemporaryFile() + key_f.write(privatekey.encode('utf-8')) + key_f.flush() + privatekey = key_f.name + else: + key_f = None + try: + out = Popen([openssl, "smime", "-sign", + "-signer", cert, "-inkey", privatekey, + "-outform","DER", "-nodetach"], + stdin=PIPE, stdout=PIPE, + stderr=PIPE).communicate(tra)[0] + finally: + # close temp files to delete them (just in case): + if cert_f: + cert_f.close() + if key_f: + key_f.close() return b64encode(out).decode("utf8") except OSError as e: if e.errno == 2: @@ -140,7 +167,7 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""): def call_wsaa(cms, location=WSAAURL, proxy=None, trace=False, cacert=None): - "Llamar web service con CMS para obtener ticket de autorización (TA)" + "Llamar web service con CMS para obtener ticket de autorización (TA)" # creo la nueva clase wsaa = WSAA() @@ -156,7 +183,7 @@ def call_wsaa(cms, location=WSAAURL, proxy=None, trace=False, cacert=None): class WSAA(BaseWS): - "Interfaz para el WebService de Autenticación y Autorización" + "Interfaz para el WebService de Autenticación y Autorización" _public_methods_ = ['CreateTRA', 'SignTRA', 'CallWSAA', 'LoginCMS', 'Conectar', 'AnalizarXml', 'ObtenerTagXml', 'Expirado', 'Autenticar', 'DebugLog', 'AnalizarCertificado', @@ -180,7 +207,7 @@ class WSAA(BaseWS): # Variables globales para BaseWS: HOMO = HOMO WSDL = WSDL - Version = "%s %s" % (__version__, HOMO and 'Homologación' or '') + Version = "%s %s" % (__version__, HOMO and 'Homologación' or '') @inicializar_y_capturar_excepciones def CreateTRA(self, service="wsfe", ttl=2400): @@ -189,15 +216,17 @@ def CreateTRA(self, service="wsfe", ttl=2400): @inicializar_y_capturar_excepciones def AnalizarCertificado(self, crt, binary=False): - "Carga un certificado digital y extrae los campos más importantes" + "Carga un certificado digital y extrae los campos más importantes" from M2Crypto import BIO, EVP, RSA, X509 if binary: - bio = BIO.MemoryBuffer(cert) - x509 = X509.load_cert_bio(bio, X509.FORMAT_DER) + bio = BIO.MemoryBuffer(cert.encode('utf8')) + x509 = X511.load_cert_bio(bio, X509.FORMAT_DER) else: if not crt.startswith("-----BEGIN CERTIFICATE-----"): crt = open(crt).read() - bio = BIO.MemoryBuffer(crt) + if isinstance(crt, str): + crt = crt.encode('utf-8') + bio = BIO.MemoryBuffer(crt.encode('utf8')) x509 = X509.load_cert_bio(bio, X509.FORMAT_PEM) if x509: self.Identidad = x509.get_subject().as_text() @@ -236,7 +265,7 @@ def CrearPedidoCertificado(self, cuit="", empresa="", nombre="pyafipws", # create the certificate signing request (CSR): self.x509_req = X509.Request() - # normalizar encoding (reemplazar acentos, eñe, etc.) + # normalizar encoding (reemplazar acentos, eñe, etc.) if isinstance(empresa, str): empresa = unicodedata.normalize('NFKD', empresa).encode('ASCII', 'ignore') if isinstance(nombre, str): @@ -264,11 +293,11 @@ def CrearPedidoCertificado(self, cuit="", empresa="", nombre="pyafipws", @inicializar_y_capturar_excepciones def SignTRA(self, tra, cert, privatekey, passphrase=""): "Firmar el TRA y devolver CMS" - return sign_tra(str(tra), cert.encode('latin1'), privatekey.encode('latin1'), passphrase.encode("utf8")) + return sign_tra(tra, cert, privatekey, passphrase) @inicializar_y_capturar_excepciones def LoginCMS(self, cms): - "Obtener ticket de autorización (TA)" + "Obtener ticket de autorización (TA)" results = self.client.loginCms(in0=str(cms)) ta_xml = results['loginCmsReturn'] # .encode("utf-8") self.xml = ta = SimpleXMLElement(ta_xml) @@ -278,7 +307,7 @@ def LoginCMS(self, cms): return ta_xml def CallWSAA(self, cms, url="", proxy=None): - "Obtener ticket de autorización (TA) -version retrocompatible-" + "Obtener ticket de autorización (TA) -version retrocompatible-" self.Conectar("", url, proxy) ta_xml = self.LoginCMS(cms) if not ta_xml: @@ -287,7 +316,7 @@ def CallWSAA(self, cms, url="", proxy=None): @inicializar_y_capturar_excepciones def Expirado(self, fecha=None): - "Comprueba la fecha de expiración, devuelve si ha expirado" + "Comprueba la fecha de expiración, devuelve si ha expirado" if not fecha: fecha = self.ObtenerTagXml('expirationTime') now = datetime.datetime.now() @@ -295,7 +324,7 @@ def Expirado(self, fecha=None): return now > d def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cacert=None, cache=None, debug=False): - "Método unificado para obtener el ticket de acceso (cacheado)" + "Método unificado para obtener el ticket de acceso (cacheado)" self.LanzarExcepciones = True try: @@ -303,7 +332,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac for filename in (crt, key): if not os.access(filename, os.R_OK): raise RuntimeError("Imposible abrir %s\n" % filename) - # creo el nombre para el archivo del TA (según credenciales y ws) + # creo el nombre para el archivo del TA (según credenciales y ws) ta_src = (service + crt + key).encode("utf8") fn = "TA-%s.xml" % hashlib.md5(ta_src).hexdigest() if cache: @@ -318,7 +347,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac if DEBUG: print("Creando TRA...") tra = self.CreateTRA(service=service, ttl=DEFAULT_TTL) - # firmarlo criptográficamente + # firmarlo criptográficamente if DEBUG: print("Frimando TRA...") cms = self.SignTRA(tra, crt, key) @@ -327,8 +356,8 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac print("Conectando a WSAA...") ok = self.Conectar(cache, wsdl, proxy, wrapper, cacert) if not ok or self.Excepcion: - raise RuntimeError("Fallo la conexión: %s" % self.Excepcion) - # llamar al método remoto para solicitar el TA + raise RuntimeError("Fallo la conexión: %s" % self.Excepcion) + # llamar al método remoto para solicitar el TA if DEBUG: print("Llamando WSAA...") ta = self.LoginCMS(cms) @@ -362,7 +391,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac return ta -# busco el directorio de instalación (global para que no cambie si usan otra dll) +# busco el directorio de instalación (global para que no cambie si usan otra dll) INSTALL_DIR = WSAA.InstallDir = get_install_dir() @@ -395,21 +424,21 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac # start the server. win32com.server.localserver.serve([WSAA._reg_clsid_]) elif "--crear_pedido_cert" in sys.argv: - # instanciar el helper y revisar los parámetros + # instanciar el helper y revisar los parámetros wsaa = WSAA() args = [arg for arg in sys.argv if not arg.startswith("--")] # obtengo el CUIT y lo normalizo: cuit = len(args) > 1 and args[1] or input("Ingrese un CUIT: ") cuit = ''.join([c for c in cuit if c.isdigit()]) nombre = len(args) > 2 and args[2] or "PyAfipWs" - # consultar el padrón online de AFIP si no se especificó razón social: + # consultar el padrón online de AFIP si no se especificó razón social: empresa = len(args) > 3 and args[3] or "" if not empresa: from .padron import PadronAFIP padron = PadronAFIP() ok = padron.Consultar(cuit) if ok and padron.denominacion: - print("Denominación según AFIP:", padron.denominacion) + print("Denominación según AFIP:", padron.denominacion) empresa = padron.denominacion else: print("CUIT %s no encontrado: %s..." % (cuit, padron.Excepcion)) @@ -430,7 +459,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac print("Se crearon los archivos:") print(clave_privada) print(pedido_cert) - # convertir a terminación de linea windows y abrir con bloc de notas + # convertir a terminación de linea windows y abrir con bloc de notas if sys.platform == "win32": txt = open(pedido_cert + ".txt", "wb") for linea in open(pedido_cert, "r"): @@ -449,7 +478,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac url = len(argv) > 5 and argv[5] or WSAAURL wrapper = len(argv) > 6 and argv[6] or None cacert = len(argv) > 7 and argv[7] or CACERT - DEBUG = "--debug" in args + DEBUG = "--debug" in args or DEBUG print("Usando CRT=%s KEY=%s URL=%s SERVICE=%s TTL=%s" % (crt, key, url, service, ttl), file=sys.stderr) diff --git a/wsfecred.py b/wsfecred.py index ae19f757e..1c14d8dca 100644 --- a/wsfecred.py +++ b/wsfecred.py @@ -58,7 +58,7 @@ import os, sys, time, base64 import traceback from pysimplesoap.client import SoapFault -# import utils +from . import utils # importo funciones compartidas: from .utils import json, BaseWS, inicializar_y_capturar_excepciones, get_install_dir, json_serializer From ea108272752af76131f33da880a2428600e65efe Mon Sep 17 00:00:00 2001 From: Oscar Vogel Date: Wed, 13 Jan 2021 08:21:06 -0300 Subject: [PATCH 3/3] Factura con QR --- pyqr.py | 180 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ setup.py | 19 +++--- 2 files changed, 190 insertions(+), 9 deletions(-) create mode 100644 pyqr.py diff --git a/pyqr.py b/pyqr.py new file mode 100644 index 000000000..389126be4 --- /dev/null +++ b/pyqr.py @@ -0,0 +1,180 @@ +#!/usr/bin/python +# -*- coding: latin-1 -*- +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation; either version 3, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. + +"M?dulo para generar c?digos QR" + +__author__ = "Mariano Reingart " +__copyright__ = "Copyright (C) 2011 Mariano Reingart" +__license__ = "LGPL 3.0" +__version__ = "1.03b" + +import base64 +import json +import os +import sys +import tempfile + +import qrcode + + +TEST_QR_DATA = """ +eyJ2ZXIiOjEsImZlY2hhIjoiMjAyMC0xMC0xMyIsImN1aXQiOjMwMDAwMDAwMDA3LCJwdG9WdGEiOj +EwLCJ0aXBvQ21wIjoxLCJucm9DbXAiOjk0LCJpbXBvcnRlIjoxMjEwMCwibW9uZWRhIjoiRE9MIiwi +Y3R6Ijo2NSwidGlwb0RvY1JlYyI6ODAsIm5yb0RvY1JlYyI6MjAwMDAwMDAwMDEsInRpcG9Db2RBdX +QiOiJFIiwiY29kQXV0Ijo3MDQxNzA1NDM2NzQ3Nn0=""".replace("\n", "") + + +class PyQR: + "Interfaz para generar Codigo QR de Factura Electr?nica" + _public_methods_ = ['GenerarImagen', 'CrearArchivo', + ] + _public_attrs_ = ['Version', 'Excepcion', 'Traceback', "URL", "Archivo", + 'qr_ver', 'box_size', 'border', 'error_correction', + ] + + _reg_progid_ = "PyQR" + _reg_clsid_ = "{0868A2B6-2DC7-478D-8884-A10E92C588DE}" + + URL = "https://www.afip.gob.ar/fe/qr/?p=%s" + Archivo = "qr.png" + + # qrencode default parameters: + qr_ver = 1 + box_size = 10 + border = 4 + error_correction = qrcode.constants.ERROR_CORRECT_L + + def __init__(self): + self.Version = __version__ + self.Exception = self.Traceback = "" + + def CrearArchivo(self): + """Crea un nombre de archivo temporal""" + # para evitar errores de permisos y poder generar varios qr simultaneos + tmp = tempfile.NamedTemporaryFile(prefix="qr_afip_", + suffix=".png", + delete=False) + self.Archivo = tmp.name + return self.Archivo + + def GenerarImagen(self, ver=1, + fecha="2020-10-13", + cuit=30000000007, + pto_vta=10, tipo_cmp=1, nro_cmp=94, + importe=12100, moneda="PES", ctz=1.000, + tipo_doc_rec=80, nro_doc_rec=20000000001, + tipo_cod_aut="E", cod_aut=70417054367476, + ): + "Generar una im?gen con el c?digo QR" + # basado en: https://www.afip.gob.ar/fe/qr/especificaciones.asp + datos_cmp = { + "ver": int(ver), + "fecha": fecha, + "cuit": int(cuit), + "ptoVta": int(pto_vta), + "tipoCmp": int(tipo_cmp), + "nroCmp": int(nro_cmp), + "importe": float(importe), + "moneda": moneda, + "ctz": float(ctz), + "tipoDocRec": int(tipo_doc_rec), + "nroDocRec": int(nro_doc_rec), + "tipoCodAut": tipo_cod_aut, + "codAut": int(cod_aut), + } + + # convertir a representación json y codificar en base64: + datos_cmp_json = json.dumps(datos_cmp) + url = self.URL % (base64.b64encode(datos_cmp_json)) + + qr = qrcode.QRCode( + version=self.qr_ver, + error_correction=self.error_correction, + box_size=self.box_size, + border=self.border, + ) + qr.add_data(url) + qr.make(fit=True) + + img = qr.make_image(fill_color="black", back_color="white") + + img.save(self.Archivo, "PNG") + return url + + +if __name__ == '__main__': + + if "--register" in sys.argv or "--unregister" in sys.argv: + import win32com.server.register + win32com.server.register.UseCommandLine(PyQR) + elif "/Automate" in sys.argv: + try: + # MS seems to like /automate to run the class factories. + import win32com.server.localserver + win32com.server.localserver.serve([PyQR._reg_clsid_]) + except Exception: + raise + else: + + pyqr = PyQR() + + if '--datos' in sys.argv: + args = sys.argv[sys.argv.index("--datos")+1:] + (ver, fecha, cuit, pto_vta, tipo_cmp, nro_cmp, importe, moneda, ctz, + tipo_doc_rec, nro_doc_rec, tipo_cod_aut, cod_aut) = args + else: + ver = 1 + fecha = "2020-10-13" + cuit = 30000000007 + pto_vta = 10 + tipo_cmp = 1 + nro_cmp = 94 + importe = 12100 + moneda = "DOL" + ctz = 65.000 + tipo_doc_rec = 80 + nro_doc_rec = 20000000001 + tipo_cod_aut = "E" + cod_aut = 70417054367476 + + if '--archivo' in sys.argv: + pyqr.Archivo = sys.argv[sys.argv.index("--archivo")+1] + else: + pyqr.CrearArchivo() + + if '--url' in sys.argv: + pyqr.URL = sys.argv[sys.argv.index("--url")+1] + + print("datos:", (ver, fecha, cuit, pto_vta, tipo_cmp, nro_cmp, + importe, moneda, ctz, tipo_doc_rec, nro_doc_rec, + tipo_cod_aut, cod_aut)) + print("archivo", pyqr.Archivo) + + url = pyqr.GenerarImagen(ver, fecha, cuit, pto_vta, tipo_cmp, nro_cmp, + importe, moneda, ctz, tipo_doc_rec, nro_doc_rec, + tipo_cod_aut, cod_aut) + + print("url generada:", url) + + if "--prueba" in sys.argv: + qr_data_test = json.loads(base64.b64decode(TEST_QR_DATA)) + qr_data_gen = json.loads(base64.b64decode(url[33:])) + assert url.startswith("https://www.afip.gob.ar/fe/qr/?p=") + assert qr_data_test == qr_data_gen, "Diff: %r != %r" % (qr_data_test, qr_data_gen) + print("QR data ok:", qr_data_gen) + + if not '--mostrar' in sys.argv: + pass + elif sys.platform=="linux2": + os.system("eog ""%s""" % pyqr.Archivo) + else: + os.startfile(pyqr.archivo) \ No newline at end of file diff --git a/setup.py b/setup.py index b188ef275..47cbab0ed 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ __version__ = "%s.%s.%s" % (sys.version_info[0:2] + (rev, )) -HOMO = True +HOMO = False # build a one-click-installer for windows: if 'py2exe' in sys.argv: @@ -38,14 +38,14 @@ #import pyrece from . import wsaa from . import wsfev1, rece1, rg3685 - #import wsfexv1, recex1 + import wsfexv1, recex1 #import wsbfev1, receb1 #import wsmtx, recem #import wsct, recet - #import ws_sr_padron - #import pyfepdf - #import pyemail - #import pyi25 + import ws_sr_padron + import pyfepdf + import pyemail + import pyi25 #import wsctg #import wslpg #import wsltv @@ -53,7 +53,7 @@ #import wslsp #import wsremcarne #import wscoc - #import wscdc + import wscdc #import cot #import iibb #import trazamed @@ -61,8 +61,9 @@ #import trazarenpre #import trazafito #import trazavet - #import padron - #import sired + import padron + import sired + import pyqr data_files = [ (".", ["licencia.txt", ]),