-
Notifications
You must be signed in to change notification settings - Fork 1
/
mpl_process_shapefile.py
232 lines (179 loc) · 7.07 KB
/
mpl_process_shapefile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
"""
MAPLE Workflow
(4) Post process the inferences into a standard shape file that can be processed further (Final Product)
Will create .shp / .dbf / .shx and .prj files in the data/projected_shp directory. Code reads the final_shp
created by the inferencing step
Project: Permafrost Discovery Gateway: Mapping Application for Arctic Permafrost Land Environment(MAPLE)
PI : Chandi Witharana
Author : Rajitha Udwalpola / Amal Perera
"""
import os
import shutil
import shapefile
from mpl_config import MPL_Config
from osgeo import gdal, osr
from shapely.geometry import Polygon, Point
def get_coordinate_system_info(filepath):
try:
# Open the dataset
dataset = gdal.Open(filepath)
# Check if the dataset is valid
if dataset is None:
raise Exception("Error: Unable to open the dataset.")
# Get the spatial reference
spatial_ref = osr.SpatialReference()
# Try to import the coordinate system from WKT
if spatial_ref.ImportFromWkt(dataset.GetProjection()) != gdal.CE_None:
raise Exception("Error: Unable to import coordinate system from WKT.")
# Check if the spatial reference is valid
if spatial_ref.Validate() != gdal.CE_None:
raise Exception("Error: Invalid spatial reference.")
# Export the spatial reference to WKT
return spatial_ref.ExportToWkt()
except Exception as e:
print(f"Error: {e}")
return None
def write_prj_file(geotiff_path, prj_file_path):
"""
Will create the prj file by getting the geo cordinate system from the input tiff file
Parameters
----------
geotiff_path : Path to the geo tiff used for processing
prj_file_path : Path to the location to create the prj files
"""
try:
# Get the coordinate system information
wkt = get_coordinate_system_info(geotiff_path)
print(wkt)
if wkt is not None:
# Write the WKT to a .prj file
with open(prj_file_path, "w") as prj_file:
prj_file.write(wkt)
print(f"Coordinate system information written to {prj_file_path}")
else:
print("Failed to get coordinate system information.")
except Exception as e:
print(f"Error: {e}")
def copy_to_project_dir(source_path, new_name):
try:
# Ensure the source directory exists
if not os.path.exists(source_path):
print(f"Source directory '{source_path}' does not exist.")
return
# Get the parent directory of the source directory
parent_directory = os.path.dirname(source_path)
# Create the new directory path by combining the parent directory and the new name
new_directory_path = os.path.join(parent_directory, new_name)
# Copy the source directory to the same location and rename it
shutil.copytree(source_path, new_directory_path)
print(
f"Directory copied to '{new_directory_path}' and renamed to '{new_name}'."
)
except Exception as e:
print(f"Error: {e}")
def process_shapefile(config: MPL_Config, image_name: str):
data_dir = config.WORKER_ROOT
image_file_name = (image_name).split(".tif")[0]
shp_dir = os.path.join(data_dir, "final_shp", image_file_name)
# projected_dir = os.path.join(data_dir, 'projected_shp', image_file_name)
projected_dir = os.path.join(config.PROJECTED_SHP_DIR, image_file_name)
temp_dir = os.path.join(data_dir, "temp_shp", image_file_name)
shape_file = os.path.join(shp_dir, f"{image_file_name}.shp")
output_shape_file = os.path.join(temp_dir, f"{image_file_name}.shp")
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Error deleting directory {temp_dir}: {e}")
try:
os.makedirs(temp_dir, exist_ok=True)
print(f"Created directory {temp_dir}")
except Exception as e:
print(f"Error creating directory {temp_dir}: {e}")
w = shapefile.Writer(output_shape_file)
try:
r = shapefile.Reader(shape_file)
print(f"Reading shaper file {r.fields[1:]}")
except Exception as e:
print(f"Error reading shapefile {shape_file}: {e}")
return
try:
shutil.rmtree(projected_dir)
print(f"removed directory {projected_dir}")
except Exception as e:
print(f"Error deleting directory {projected_dir}: {e}")
try:
os.makedirs(projected_dir, exist_ok=True)
print(f"Created directory {projected_dir}")
except Exception as e:
print(f"Error creating directory {projected_dir}: {e}")
w.fields = r.fields[1:]
w.field("Sensor", "C", "10")
w.field("Date", "C", "10")
w.field("Time", "C", "10")
w.field("CatalogID", "C", "20")
w.field("Area", "N", decimal=3)
w.field("CentroidX", "N", decimal=3)
w.field("CentroidY", "N", decimal=3)
w.field("Perimeter", "N", decimal=3)
w.field("Length", "N", decimal=3)
w.field("Width", "N", decimal=3)
for shaperec in r.iterShapeRecords():
rec = shaperec.record
rec.extend(
[
image_file_name[0:4],
image_file_name[5:13],
image_file_name[13:19],
image_file_name[20:36],
]
)
poly_vtx = shaperec.shape.points
poly = Polygon(poly_vtx)
area = poly.area
perimeter = poly.length
box = poly.minimum_rotated_rectangle
x, y = box.exterior.coords.xy
centroid = poly.centroid
p0 = Point(x[0], y[0])
p1 = Point(x[1], y[1])
p2 = Point(x[2], y[2])
edge_length = (p0.distance(p1), p1.distance(p2))
length = max(edge_length)
width = min(edge_length)
rec.extend([area, centroid.x, centroid.y, perimeter, length, width])
w.record(*rec)
w.shape(shaperec.shape)
w.close()
try:
shutil.rmtree(projected_dir)
except Exception as e:
print(f"Error deleting directory {projected_dir}: {e}")
os.chmod(temp_dir, 0o777)
temp_dir_prj = os.path.join(temp_dir, f"{image_file_name}.prj")
input_image = os.path.join(config.INPUT_IMAGE_DIR, image_name)
write_prj_file(input_image, temp_dir_prj)
try:
shutil.copytree(temp_dir, projected_dir)
except Exception as e:
print(f"Error creating projected directory {projected_dir}: {e}")
# Example usage:
def get_tif_file_names(directory_path):
tif_file_names = []
try:
# List all files in the specified directory
files = os.listdir(directory_path)
# Add each *.tif file name to the list
for file in files:
if file.endswith(".tif"):
tif_file_names.append(file)
except Exception as e:
print(f"Error: {e}")
return tif_file_names
# Unit TEST CODE
#
# files_to_process = get_tif_file_names(MPL_Config.INPUT_IMAGE_DIR)
# for image_name in files_to_process[0:3]:
# print("##################################### PROCESSING:", image_name, "###########################")
# process_shapefile(image_name)
# get_coordinate_system_info_rio(image_name)