-
Notifications
You must be signed in to change notification settings - Fork 0
/
cisco-send.py
executable file
·269 lines (221 loc) · 8.89 KB
/
cisco-send.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# Import required libraries
import serial
import time
import sys
import os
import re
import threading
import queue
import argparse
from datetime import datetime
# Error checking function
def check_error(condition, message):
if not condition:
print(f"Error: {message}")
cleanup()
sys.exit(1)
DEFAULT_DEVICE_FILE = '/dev/ttyUSB0'
DEFAULT_DELAY = 0.2
# Command-line argument parsing
parser = argparse.ArgumentParser(description='Send a local config to a device over a serial/console connection.')
# Mandatory argument
parser.add_argument('input_file', type=str, help='Path to the config file for the Cisco device (Router/Switch).')
# Optional arguments
parser.add_argument('--device-file', type=str, default=DEFAULT_DEVICE_FILE, help=f'Path to device file. Default is --device-file {DEFAULT_DEVICE_FILE}')
parser.add_argument('--delay', type=float, default=DEFAULT_DELAY, help=f'Delay between sending lines, in seconds. Default is --delay {DEFAULT_DELAY}s')
# Parse the arguments
args = parser.parse_args()
input_file = args.input_file
device_file = args.device_file
delay_between_lines = args.delay
# Check the input file exists and is readable
check_error(os.path.isfile(input_file), f"File not found or not readable: {input_file}")
# Set variables and serial interface to "8N1"
baud_rate = 9600
data_bits = 8
parity = serial.PARITY_NONE
stop_bits = serial.STOPBITS_ONE
# Configure the serial port
print("------------")
print(f"Setting serial port baud, parity, and stop bits to '8N1' on {args.device_file}")
try:
ser = serial.Serial(args.device_file, baudrate=baud_rate, bytesize=data_bits, parity=parity, stopbits=stop_bits, rtscts=False, timeout=1)
except Exception as e:
check_error(False, f"Failed to set serial port parameters: {str(e)}")
# Initialize a thread-safe queue to hold received data
q = queue.Queue()
# Initialize a thread-safe queue to hold detected prompts
prompt_queue = queue.Queue()
# Initialize an empty list to act as a buffer for the device responses
response_buffer = []
# Function to read from port
def read_from_port(serial_port, q, prompt_queue):
while True:
reading = serial_port.readline().decode('utf-8').strip()
if reading:
# Append the reading to the response buffer instead of printing it
response_buffer.append(reading)
q.put(reading)
if is_prompt(reading):
prompt_queue.put(reading)
# Start reader thread
reader_thread = threading.Thread(target=read_from_port, args=(ser, q, prompt_queue))
reader_thread.daemon = True # Daemonize thread
reader_thread.start()
def cleanup():
# Close the serial port
if ser.is_open:
print("Closing serial port...")
ser.close()
print("Cleanup complete.")
def is_prompt(line):
patterns = [
r"initial configuration dialog",
r"terminate autoinstall",
r"Press RETURN",
r"User Access Verification",
r"Password:",
r">",
r"#(?!.*\(config\)#)", # matches '#' but not '(config)#'
r".*\(config\)#"
]
return any(re.search(pattern, line) for pattern in patterns)
def determine_device_state(serial_port, prompt_queue, MAX_RETRIES=15, WAIT_INTERVAL=4):
attempts = 0
while attempts < MAX_RETRIES:
try:
device_state = prompt_queue.get(timeout=WAIT_INTERVAL)
print(f"Device state: {device_state}")
except queue.Empty:
serial_port.write(b"\r\n")
print("Reading current device prompt...")
# attempts += 1
time.sleep(4)
continue
if re.search(r"initial configuration dialog", device_state):
serial_port.write(b"no\r\n")
time.sleep(4)
try:
device_state = prompt_queue.get(timeout=3)
except queue.Empty:
continue
elif re.search(r"terminate autoinstall|Press RETURN", device_state):
serial_port.write(b"\r\n")
time.sleep(4)
try:
device_state = prompt_queue.get(timeout=3)
except queue.Empty:
continue
elif re.search(r"Password:|User Access Verification", device_state):
password = input("Please enter the password: ").strip()
serial_port.write(f"{password}\r\n".encode('utf-8'))
time.sleep(4)
try:
device_state = prompt_queue.get(timeout=3)
except queue.Empty:
continue
elif re.search(r">", device_state):
print(f"Entered User Exec Mode (>). Sending 'en'.")
serial_port.write(b"en\r")
time.sleep(4)
try:
device_state = prompt_queue.get(timeout=3)
except queue.Empty:
continue
if "Password:" in device_state or "User Access Verification" in device_state:
password = input("Please enter the password: ").strip()
serial_port.write(f"{password}\r\n".encode('utf-8'))
time.sleep(2)
elif re.search(r".*\(config\)#", device_state):
print("Entered Global Config Mode (config)#.")
return
elif re.search(r"#(?!.*\(config\)#)", device_state):
print(f"Entered Privileged Exec Mode (#). Sending 'conf t'.")
serial_port.write(b"conf t\r")
time.sleep(4)
try:
device_state = prompt_queue.get(timeout=3)
except queue.Empty:
continue
else:
print("Unknown device state. Retrying...")
attempts += 1
time.sleep(WAIT_INTERVAL)
print("Max retries reached. Exiting.")
cleanup()
sys.exit(1)
determine_device_state(ser, prompt_queue)
print("------------")
print("Device is at hostname(config)# prompt and ready to accept config")
print("CAUTION: Is your device clean? If not, quit this script and manually run '# erase startup-config' && 'reload' first")
print("A wipe feature will be integrated into this script in a future release")
## TO-DO ask user to pull current config and write locally as backup ##
user_response = input("Confirm sending config to device? yes/no (exit): ").strip().lower()
if user_response in ['yes', 'y']:
print(f"Reading {input_file}, sending line by line with delay of {delay_between_lines}")
elif user_response in ['no', 'n']:
cleanup()
print("Exiting the program.")
exit()
else:
print("Invalid input. Exiting the program.")
exit()
print("..")
print("...")
# Read the file line by line and send each line with a delay
with open(input_file, 'r') as f:
for line in f:
line = line.strip()
# Skip lines that start with "!"
if line.startswith("!"):
continue
print(line)
try:
ser.write(f"{line}\r\n".encode('utf-8'))
except Exception as e:
check_error(False, f"Failed to send the command: {line}, Error: {str(e)}")
time.sleep(delay_between_lines)
print("Configuration complete.")
print("-----------------------")
# Ask user to save running-config to startup-config
def save_config_to_startup(serial_port):
user_input = input("Save running-config to startup-config? (y/n): ").strip().lower()
if user_input == 'y':
print("Writing to memory...")
serial_port.write(b"write memory\r")
time.sleep(1)
print("Configuration saved.")
else:
print("Configuration not saved.")
save_config_to_startup(ser)
def print_and_save_responses():
# Ask user if they want to print the buffered responses to the console
user_input = input("Print device responses to console? (y/n): ").strip().lower()
if user_input == 'y':
for response in response_buffer:
print(response)
# Ask user if they want to save the buffered responses to a log file
user_input = input("Save responses to logfile? (y/n): ").strip().lower()
if user_input == 'y' or user_input == 'yes':
# Construct the default log file path
date_str = datetime.now().strftime('%d-%m-%Y-%H:%M')
default_log_file_path = os.path.expanduser(f"~/{date_str}-{os.path.basename(input_file)}-log.txt")
# Ask user for confirmation or modification
user_file_input = input(f"Save to default path: {default_log_file_path}? (y/n): ").strip().lower()
if user_file_input != 'y' and user_file_input != 'yes':
custom_log_file_path = input("Enter the custom path and filename: ").strip()
log_file_path = os.path.expanduser(custom_log_file_path)
else:
log_file_path = default_log_file_path
# Save to the log file
with open(log_file_path, 'w') as log_file:
for response in response_buffer:
log_file.write(f"{response}\n")
print(f"Responses saved to {log_file_path}")
print_and_save_responses()
if __name__ == "__main__":
try:
pass
finally:
cleanup()
sys.exit(0)