1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
|
#!/usr/bin/env python
import argparse
import json
import logging
import os
from math import floor
from sys import exit
from time import sleep
import requests
API_BASE_URL = "https://api.shapeshift3d.com"
log = logging.getLogger(__name__)
def remove_nones(iterable):
"""Filter out "None"s from an iterable
Args:
iterable (Iterable): an iterable
Returns:
Iterable: a new iterable without the None values
"""
return filter(lambda x: x is not None, iterable)
def authenticate(username, password):
"""Login a user: get a JWT from the API, authenticating using a USERNAME and PASSWORD
Args:
username (str): The email of the user to authenticate
password (str): _description_
Returns:
str: the bearer token
"""
response = requests.post(f'{API_BASE_URL}/v1/auth/login',
{
'username': username,
'password': password
})
response.raise_for_status()
auth = response.json()
token = auth['accessToken']
return token
def make_authorization_header(token):
"""Create a dict that contains the HTTP authorization header
Args:
token (str): The authorisation header
Returns:
dict: The dictionary containing the HTTP authorization header
"""
return {"Authorization": f"Bearer {token}"}
def get_product_iteration(token, product_id, product_iteration_id):
"""Get a product iteration, with all its steps
Args:
product_id (string): The GUID of the product
product_iteration_id (string): The GUID of the product iteration
Returns:
dict: Returns a product iteration
"""
url = f'{API_BASE_URL}/v1/products/{product_id}/iterations/{product_iteration_id}?steps=true'
response = requests.get(url, headers=make_authorization_header(token))
return response.json()
def get_step_global_variable_definitions(step):
"""Extract the global variables's definitions from a step
Args:
step (dict): A product iteration's step
Returns:
list(dict): a list of global variable definitions
"""
# Get the steps' component iteration
componentIteration = step["componentIteration"]
# Get the componentIteration's metadata
metadata = componentIteration["globalVariables"]
# From the metadata, get the global variable definitions
#
# For backward-compatibility, we check if the metadata is just the list of
# global variables (as opposed to a dict).
if type(metadata) == list:
gvars = metadata
else:
gvars = metadata["gvars"]
return gvars
def make_step_options(order, gvar_def_list, global_variables):
"""Create a list of options, for ONE step
Args:
order (int): The order of the step (e.g. the Nth step has the order N)
gvar_def_list (dict): The list of global varible definition for a step
global_variables (dict): A mapping of variable names to value.
Returns:
_type_: _description_
"""
step_options = []
# for each variable definition
for gvar_def in gvar_def_list:
# Ignore the "non-manual" global variables
if gvar_def['interaction'] != 'Manual':
continue
# Get the name (label) of the variable
name = gvar_def['label']
# Get the value of the variable from global_variables
value = global_variables[name]
log.info('Step %s Adding the global variable "%s" with the value "%s"', order, name, value)
# Append a new option
step_options.append({
name: value
})
return step_options
def make_options(steps, global_variables):
"""Create a list of options, for all steps
Args:
steps (list(dict)): A list of steps
global_variables (dict): A mapping of variable names to value.
Returns:
list: a list of options
"""
options = []
for step in steps:
order = step['order']
# Extract the list of global variables for this step
gvar_def_list = get_step_global_variable_definitions(step)
# Create a list of options, replacing the definitions by the values from global_variables
step_options = make_step_options(order, gvar_def_list, global_variables)
# Don't add the options (for this step) if it's empty
if len(step_options) == 0:
continue
options.append({
'stepOrder': step['order'],
'options': step_options
})
return options
def create_case(token, product_iteration, filename, global_variables, referenceId=""):
"""Create a new case
Args:
token (str): The authorisation token
product_iteration (dict): The product iteration
filename (str): The path to the file to upload as an input
global_variables (dict): A mapping of variable names to value.
referenceId (str, optional): Optional id, for the user's internal usage. Defaults to "".
Returns:
dict: The newly create case
"""
log.info('Creating a case with the file "%s"...', filename)
# Get the list of steps
steps = product_iteration['steps']
# Make the options for all the steps
options = make_options(steps, global_variables)
url = f'{API_BASE_URL}/v1/orders-tracking'
data = {
'products': product_iteration['id'],
'referenceId': referenceId,
# The library "requests", recognize form-data names endings with "[]".
# The library will take each element of the list and add one form-data
# key/value pair with the name "options[]".
# Said otherwise, we're sending multiple "options[]", each of them
# containing a string (which contains serialized JSON).
'options[]': [json.dumps(o) for o in options]
}
with open(filename, 'rb') as input:
files = {
'files': (os.path.basename(filename), input),
}
response = requests.post(
url,
data,
files=files,
headers=make_authorization_header(token))
response.raise_for_status()
cases = response.json()
# The API returns a list of cases, but we only created 1
case = cases[0]
log.info('Case created, caseId: %s', case['orderId'])
return case
def start_case(token, case_id):
"""Start an existing case
Args:
token (str): The authorisation token.
case_id (str): The caseId of the case to start.
"""
response = requests.patch(f'{API_BASE_URL}/v1/orders-tracking/{case_id}',
data={"stepOrder": 0, "status": "Start Process"},
headers=make_authorization_header(token))
response.raise_for_status()
log.info('Case started')
def get_case_status(token, case_id):
"""Get a case's current status
Args:
token (str): The authorisation token.
case_id (str): The caseId of the case to start.
Returns:
str: The status of the case as a string.
"""
response = requests.get(f'{API_BASE_URL}/v1/orders-tracking/{case_id}/status',
headers=make_authorization_header(token))
response.raise_for_status()
return response.json()['status']
def wait_for_case_to_be_completed(token, case_id, interval=5):
"""Poll the API until the case is completed or failed.
Args:
token (str): The authorisation token
case_id (str): The id of the case (order)
interval (int, optional): Time to wait before each poll (in seconds). Defaults to 5 seconds.
"""
status = get_case_status(token, case_id)
log.info(status)
while not (status == 'Completed' or status.endswith('Error')):
sleep(interval) # wait a few seconds
status = get_case_status(token, case_id)
log.info(status)
if status.endswith('Error'):
log.error("An error occured while processing this case.")
exit(1)
def download(url, filename):
"""Download the url and save it on disk
Args:
url (str): The url to download
filename (str): The path where to save the downloaded file.
"""
with requests.get(url, stream=True) as response:
response.raise_for_status()
total_size = int(response.headers['Content-Length'])
number_of_bytes_downloaded = 0
progress = 0
with open(filename, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
number_of_bytes_downloaded += len(chunk)
progress = floor(number_of_bytes_downloaded / total_size * 100)
log.info('Downloading... %s %%', progress)
log.info('File downloaded as "%s".', filename)
def download_case_output(token, case_id):
"""Download a case's final output
Args:
token (str): The authorisation token
case_id (str): The id of the case
"""
response = requests.get(f'{API_BASE_URL}/v1/orders-tracking/{case_id}/download',
headers=make_authorization_header(token))
response.raise_for_status()
file_info = response.json()
extension = file_info['file']['extension']
filename = f'{case_id}.{extension}'
link = file_info['link']
download(link, filename)
def configure_logging():
"""
Configure python's logging module.
"""
format = '%(levelname)s\t%(message)s'
level = logging.INFO
logging.basicConfig(format=format, level=level)
def parse_arguments():
"""
Parse the command-line arguments.
"""
parser = argparse.ArgumentParser(
description=f'Create a case')
parser.add_argument('--username', help='Username used to login', required=True)
parser.add_argument('--password', help='Password used for login', required=True)
parser.add_argument('--input', help='Path of the file to use as input', required=True)
parser.add_argument('global_variables',
nargs=argparse.REMAINDER,
help="Parameters to pass to the case")
args = parser.parse_args()
return args
if __name__ == '__main__':
configure_logging()
# Handle command line arguments
args = parse_arguments()
# For simplicity, we hardcored some information
PRODUCT_ID = "5a990f65-eff5-4178-a39e-e0d35e6e8f94"
PRODUCT_ITERATION_ID = "9cf833c0-a476-4dab-8887-15f5705051e0"
global_variables = {
'Left': True,
'Negative': True,
}
referenceId = "Test"
############################################################################
# Login
token = authenticate(args.username, args.password)
############################################################################
# Create
product_iteration = get_product_iteration(token, PRODUCT_ID, PRODUCT_ITERATION_ID)
case = create_case(token, product_iteration, args.input,
global_variables, referenceId)
case_id = case['orderId']
############################################################################
# Process
start_case(token, case_id)
wait_for_case_to_be_completed(token, case_id)
############################################################################
# Download result
download_case_output(token, case_id)
|