Coverage for trimesh/resolvers.py: 78%

219 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-24 04:40 +0000

1""" 

2resolvers.py 

3--------------- 

4 

5Provides a common interface to load assets referenced by name 

6like MTL files, texture images, etc. Assets can be from ZIP 

7archives, web assets, or a local file path. 

8""" 

9 

10import abc 

11import itertools 

12import os 

13from pathlib import Path 

14from typing import TypeAlias 

15 

16# URL parsing for remote resources via WebResolver 

17from urllib.parse import urlparse 

18 

19from . import caching, util 

20from .typed import HttpSessionLike, Mapping 

21 

22 

23class Resolver(util.ABC): 

24 """ 

25 The base class for resolvers. 

26 """ 

27 

28 @abc.abstractmethod 

29 def __init__(self, *args, **kwargs): 

30 raise NotImplementedError("Use a resolver subclass!") 

31 

32 @abc.abstractmethod 

33 def get(self, key): 

34 raise NotImplementedError() 

35 

36 @abc.abstractmethod 

37 def write(self, name: str, data): 

38 raise NotImplementedError("`write` not implemented!") 

39 

40 @abc.abstractmethod 

41 def namespaced(self, namespace: str): 

42 raise NotImplementedError("`namespaced` not implemented!") 

43 

44 @abc.abstractmethod 

45 def keys(self): 

46 raise NotImplementedError("`keys` not implemented!") 

47 

48 def __getitem__(self, key: str): 

49 return self.get(key) 

50 

51 def __setitem__(self, key: str, value): 

52 return self.write(key, value) 

53 

54 def __contains__(self, key: str) -> bool: 

55 return key in self.keys() 

56 

57 

58class FilePathResolver(Resolver): 

59 """ 

60 Resolve files from a source path on the file system. 

61 """ 

62 

63 def __init__(self, source: str): 

64 """ 

65 Resolve files based on a source path. 

66 

67 Parameters 

68 ------------ 

69 source : str 

70 File path where mesh was loaded from 

71 """ 

72 # remove everything other than absolute path 

73 clean = os.path.expanduser(os.path.abspath(str(source))) 

74 

75 self.clean = clean 

76 if os.path.isdir(clean): 

77 # if we were passed a directory use it 

78 self.parent = clean 

79 else: 

80 # otherwise get the parent directory we've been passed 

81 split = os.path.split(clean) 

82 self.parent = split[0] 

83 

84 # exit if directory doesn't exist 

85 if not os.path.isdir(self.parent): 

86 raise ValueError(f"path `{self.parent} `not a directory!") 

87 

88 self.file_path = source 

89 self.file_name = os.path.basename(source) 

90 

91 def keys(self): 

92 """ 

93 List all files available to be loaded. 

94 

95 Yields 

96 ----------- 

97 name : str 

98 Name of a file which can be accessed. 

99 """ 

100 parent = self.parent 

101 for path, _, names in os.walk(self.parent): 

102 # strip any leading parent key 

103 if path.startswith(parent): 

104 path = path[len(parent) :] 

105 # yield each name 

106 for name in names: 

107 yield os.path.join(path, name) 

108 

109 def namespaced(self, namespace: str) -> "FilePathResolver": 

110 """ 

111 Return a resolver which changes the root of the 

112 resolver by an added namespace. 

113 

114 Parameters 

115 ------------- 

116 namespace : str 

117 Probably a subdirectory 

118 

119 Returns 

120 -------------- 

121 resolver : FilePathResolver 

122 Resolver with root directory changed. 

123 """ 

124 return FilePathResolver(os.path.join(self.parent, namespace)) 

125 

126 def get(self, name: str): 

127 """ 

128 Get an asset, restricted to the resolver root. 

129 

130 Parameters 

131 ------------- 

132 name : str 

133 Name of the asset. Must resolve inside the resolver root. 

134 

135 Returns 

136 ------------ 

137 data : bytes 

138 Loaded data from asset. 

139 """ 

140 # require each candidate to resolve inside the root 

141 parent = Path(self.parent).resolve() 

142 candidates = ( 

143 name.strip(), 

144 name.strip().lstrip("/"), 

145 os.path.split(name)[-1], 

146 ) 

147 for candidate in candidates: 

148 path = (parent / candidate).resolve() 

149 if not path.is_relative_to(parent): 

150 continue 

151 if path.exists(): 

152 with open(path, "rb") as f: 

153 return f.read() 

154 raise FileNotFoundError(name) 

155 

156 def write(self, name: str, data: str | bytes): 

157 """ 

158 Write an asset to a file path, restricted to the resolver root. 

159 

160 Parameters 

161 ----------- 

162 name : str 

163 Name of the file to write. Must resolve inside the resolver root. 

164 data : str or bytes 

165 Data to write to the file. 

166 """ 

167 parent = Path(self.parent).resolve() 

168 path = (parent / name.strip()).resolve() 

169 if not path.is_relative_to(parent): 

170 raise ValueError(f"path escapes resolver root: {name!r}") 

171 with open(path, "wb") as f: 

172 # handle encodings correctly for str/bytes 

173 util.write_encoded(file_obj=f, stuff=data) 

174 

175 

176class ZipResolver(Resolver): 

177 """ 

178 Resolve files inside a ZIP archive. 

179 """ 

180 

181 def __init__(self, archive: dict | None = None, namespace: str | None = None): 

182 """ 

183 Resolve files inside a ZIP archive as loaded by 

184 trimesh.util.decompress 

185 

186 Parameters 

187 ------------- 

188 archive : dict 

189 Contains resources as file object 

190 namespace : None or str 

191 If passed will only show keys that start 

192 with this value and this substring must be 

193 removed for any get calls. 

194 """ 

195 self.archive = archive 

196 if isinstance(namespace, str): 

197 self.namespace = namespace.strip().rstrip("/") + "/" 

198 else: 

199 self.namespace = None 

200 

201 def keys(self): 

202 """ 

203 Get the available keys in the current archive. 

204 

205 Returns 

206 ----------- 

207 keys : iterable 

208 Keys in the current archive. 

209 """ 

210 if self.namespace is not None: 

211 namespace = self.namespace 

212 length = len(namespace) 

213 # only return keys that start with the namespace 

214 # and strip off the namespace from the returned 

215 # keys. 

216 return [ 

217 k[length:] 

218 for k in self.archive.keys() 

219 if k.startswith(namespace) and len(k) > length 

220 ] 

221 return self.archive.keys() 

222 

223 def write(self, key: str, value) -> None: 

224 """ 

225 Store a value in the current archive. 

226 

227 Parameters 

228 ----------- 

229 key : hashable 

230 Key to store data under. 

231 value : str, bytes, file-like 

232 Value to store. 

233 """ 

234 if self.archive is None: 

235 self.archive = {} 

236 self.archive[key] = value 

237 

238 def get(self, name: str) -> bytes: 

239 """ 

240 Get an asset from the ZIP archive. 

241 

242 Parameters 

243 ------------- 

244 name : str 

245 Name of the asset 

246 

247 Returns 

248 ------------- 

249 data : bytes 

250 Loaded data from asset 

251 """ 

252 # not much we can do with None 

253 if name is None: 

254 return 

255 # make sure name is a string 

256 if hasattr(name, "decode"): 

257 name = name.decode("utf-8") 

258 # store reference to archive inside this function 

259 archive = self.archive 

260 # requested name not identical in 

261 # storage so attempt to recover 

262 if name not in archive: 

263 # loop through unique results 

264 for option in nearby_names(name, self.namespace): 

265 if option in archive: 

266 # cleaned option is in archive 

267 # so store value and exit 

268 name = option 

269 break 

270 

271 # get the stored data 

272 obj = archive[name] 

273 # if the dict is storing data as bytes just return 

274 if isinstance(obj, (bytes, str)): 

275 return obj 

276 # otherwise get it as a file object 

277 # read file object from beginning 

278 obj.seek(0) 

279 # data is stored as a file object 

280 data = obj.read() 

281 obj.seek(0) 

282 return data 

283 

284 def namespaced(self, namespace: str) -> "ZipResolver": 

285 """ 

286 Return a "sub-resolver" with a root namespace. 

287 

288 Parameters 

289 ------------- 

290 namespace : str 

291 The root of the key to clip off, i.e. if 

292 this resolver has key `a/b/c` you can get 

293 'a/b/c' with resolver.namespaced('a/b').get('c') 

294 

295 Returns 

296 ----------- 

297 resolver : Resolver 

298 Namespaced resolver. 

299 """ 

300 return ZipResolver(archive=self.archive, namespace=namespace) 

301 

302 def export(self) -> bytes: 

303 """ 

304 Export the contents of the current archive as 

305 a ZIP file. 

306 

307 Returns 

308 ------------ 

309 compressed : bytes 

310 Compressed data in ZIP format. 

311 """ 

312 return util.compress(self.archive) 

313 

314 

315class WebResolver(Resolver): 

316 """ 

317 Resolve assets from a remote URL. 

318 """ 

319 

320 def __init__( 

321 self, 

322 url: str, 

323 session: HttpSessionLike | None = None, 

324 timeout: float = 30.0, 

325 ): 

326 """ 

327 Resolve assets from a base URL. 

328 

329 Parameters 

330 -------------- 

331 url : str 

332 Location where a mesh was stored or 

333 directory where mesh was stored. 

334 session : HttpSessionLike or None 

335 Optional HTTP session used for fetches. Accepts 

336 `httpx.Client` or `requests.Session`. 

337 timeout : float 

338 Per-request timeout in seconds. 

339 """ 

340 if hasattr(url, "decode"): 

341 url = url.decode("utf-8") 

342 

343 # parse string into namedtuple 

344 parsed = urlparse(url) 

345 # only http(s) is supported, reject `file://`, `gopher://`, etc. 

346 if parsed.scheme not in ("http", "https"): 

347 raise ValueError(f"scheme {parsed.scheme!r} not in ('http', 'https')") 

348 

349 if session is None: 

350 # an explicit session will be required in a future release 

351 import warnings 

352 

353 warnings.warn( 

354 "`WebResolver` without a `session` is deprecated " 

355 + "and will require one in a future release. " 

356 + "pass an `httpx.Client` or `requests.Session`.", 

357 category=DeprecationWarning, 

358 stacklevel=2, 

359 ) 

360 

361 self.session = session 

362 self.timeout = timeout 

363 

364 # we want a base url 

365 split = [i for i in parsed.path.split("/") if len(i) > 0] 

366 

367 # if the last item in the url path is a filename 

368 # move up a "directory" for the base path 

369 if len(split) == 0: 

370 path = "" 

371 elif "." in split[-1]: 

372 # clip off last item 

373 path = "/".join(split[:-1]) 

374 else: 

375 # recombine into string ignoring any double slashes 

376 path = "/".join(split) 

377 

378 # save the URL we were created with, i.e. 

379 # `https://stuff.com/models/thing.glb` 

380 self.url = url 

381 # save the root url, i.e. `https://stuff.com/models` 

382 self.base_url = ( 

383 "/".join( 

384 i 

385 for i in [parsed.scheme + ":/", parsed.netloc.strip("/"), path.strip("/")] 

386 if len(i) > 0 

387 ) 

388 + "/" 

389 ) 

390 

391 # our string handling should have never inserted double slashes 

392 assert "//" not in self.base_url[len(parsed.scheme) + 3 :] 

393 # we should always have ended with a single slash 

394 assert self.base_url.endswith("/") 

395 

396 self.file_name = url.split("/")[-1] 

397 

398 def get(self, name: str) -> bytes: 

399 """ 

400 Get a resource from the remote site. 

401 

402 Parameters 

403 ------------- 

404 name : str 

405 Asset name, i.e. 'quadknot.obj.mtl' 

406 """ 

407 import httpx 

408 

409 # remove leading and trailing whitespace 

410 name = name.strip() 

411 

412 # the caller's session or the bare httpx module, both expose `.get` 

413 client = self.session or httpx 

414 url = self.base_url + name 

415 response = client.get(url, follow_redirects=True, timeout=self.timeout) 

416 

417 if response.status_code >= 300: 

418 # try to strip off filesystem crap 

419 if name.startswith("./"): 

420 name = name[2:] 

421 response = client.get( 

422 self.base_url + name, follow_redirects=True, timeout=self.timeout 

423 ) 

424 

425 # now raise if we don't have 

426 response.raise_for_status() 

427 

428 # return the bytes of the response 

429 return response.content 

430 

431 def get_base(self) -> bytes: 

432 """ 

433 Fetch the data at the full URL this resolver was 

434 instantiated with, i.e. `https://stuff.com/hi.glb` 

435 this will return the response. 

436 

437 Returns 

438 -------- 

439 content 

440 The value at `self.url` 

441 """ 

442 import httpx 

443 

444 # just fetch the url we were created with 

445 response = (self.session or httpx).get( 

446 self.url, follow_redirects=True, timeout=self.timeout 

447 ) 

448 response.raise_for_status() 

449 return response.content 

450 

451 def namespaced(self, namespace: str) -> "WebResolver": 

452 """ 

453 Return a namespaced version of current resolver. 

454 

455 Parameters 

456 ------------- 

457 namespace : str 

458 URL fragment 

459 

460 Returns 

461 ----------- 

462 resolver : WebResolver 

463 With sub-url: `https://example.com/{namespace}` 

464 """ 

465 # propagate session/timeout so the child keeps the same posture 

466 return WebResolver( 

467 url=self.base_url + namespace, 

468 session=self.session, 

469 timeout=self.timeout, 

470 ) 

471 

472 def write(self, key, value): 

473 raise NotImplementedError("`WebResolver` is read-only!") 

474 

475 def keys(self): 

476 raise NotImplementedError("`WebResolver` can't list keys") 

477 

478 

479class GithubResolver(Resolver): 

480 def __init__( 

481 self, 

482 repo: str, 

483 branch: str | None = None, 

484 commit: str | None = None, 

485 save: str | None = None, 

486 session: HttpSessionLike | None = None, 

487 timeout: float = 30.0, 

488 ): 

489 """ 

490 Get files from a remote Github repository by 

491 downloading a zip file with the entire branch 

492 or a specific commit. 

493 

494 Parameters 

495 ------------- 

496 repo 

497 In the format of `owner/repo`. 

498 branch 

499 The remote branch you want to get files from. 

500 commit 

501 The full commit hash: pass either this OR branch. 

502 save 

503 A path if you want to save results locally. 

504 session : HttpSessionLike or None 

505 Optional HTTP session used for fetches. Accepts 

506 `httpx.Client` or `requests.Session`. 

507 timeout : float 

508 Per-request timeout in seconds. 

509 """ 

510 

511 if commit is not None: 

512 # just get the exact commit 

513 self.url = f"https://github.com/{repo}/archive/{commit}.zip" 

514 elif branch is not None: 

515 # gets the latest commit on the specified branch. 

516 self.url = f"https://github.com/{repo}/archive/refs/heads/{branch}.zip" 

517 else: 

518 raise ValueError("`commit` or `branch` must be passed!") 

519 

520 if session is None: 

521 # same deprecation as `WebResolver`, see there for rationale 

522 import warnings 

523 

524 warnings.warn( 

525 "`GithubResolver` without a `session` is deprecated " 

526 + "and will require one in a future release. " 

527 + "pass an `httpx.Client` or `requests.Session`.", 

528 category=DeprecationWarning, 

529 stacklevel=2, 

530 ) 

531 

532 self.session = session 

533 self.timeout = timeout 

534 

535 if save is not None: 

536 self.cache = caching.DiskCache(save) 

537 else: 

538 self.cache = None 

539 

540 def keys(self): 

541 """ 

542 List the available files in the repository. 

543 

544 Returns 

545 ---------- 

546 keys : iterable 

547 Keys available to the resolved. 

548 """ 

549 return self.zipped.keys() 

550 

551 def write(self, name, data): 

552 raise NotImplementedError("`write` not implemented!") 

553 

554 @property 

555 def zipped(self) -> ZipResolver: 

556 """ 

557 - opened zip file 

558 - locally saved zip file 

559 - retrieve zip file and saved 

560 """ 

561 

562 def fetch() -> bytes: 

563 """ 

564 Fetch the remote zip file. 

565 """ 

566 import httpx 

567 

568 response = (self.session or httpx).get( 

569 self.url, follow_redirects=True, timeout=self.timeout 

570 ) 

571 response.raise_for_status() 

572 return response.content 

573 

574 if hasattr(self, "_zip"): 

575 return self._zip 

576 # download the archive or get from disc 

577 raw = self.cache.get(self.url, fetch) 

578 # create a zip resolver for the archive 

579 # the root directory in the zip is the repo+commit so strip that off 

580 # so the keys are usable, i.e. "models" instead of "trimesh-2232323/models" 

581 self._zip = ZipResolver( 

582 { 

583 k.split("/", 1)[1]: v 

584 for k, v in util.decompress( 

585 util.wrap_as_stream(raw), file_type="zip" 

586 ).items() 

587 } 

588 ) 

589 

590 return self._zip 

591 

592 def get(self, key): 

593 return self.zipped.get(key) 

594 

595 def namespaced(self, namespace): 

596 """ 

597 Return a "sub-resolver" with a root namespace. 

598 

599 Parameters 

600 ------------- 

601 namespace : str 

602 The root of the key to clip off, i.e. if 

603 this resolver has key `a/b/c` you can get 

604 'a/b/c' with resolver.namespaced('a/b').get('c') 

605 

606 Returns 

607 ----------- 

608 resolver : Resolver 

609 Namespaced resolver. 

610 """ 

611 return self.zipped.namespaced(namespace) 

612 

613 

614def nearby_names(name, namespace=None): 

615 """ 

616 Try to find nearby variants of a specified name. 

617 

618 Parameters 

619 ------------ 

620 name : str 

621 Initial name. 

622 

623 Yields 

624 ----------- 

625 nearby : str 

626 Name that is a lightly permutated version 

627 of the initial name. 

628 """ 

629 

630 # the various operations that *might* result in a correct key 

631 def trim(prefix, item): 

632 if item.startswith(prefix): 

633 return item[len(prefix) :] 

634 return item 

635 

636 cleaners = [ 

637 lambda x: x, 

638 lambda x: x.strip(), 

639 lambda x: trim("./", x), 

640 lambda x: trim(".\\", x), 

641 lambda x: trim("\\", x), 

642 lambda x: os.path.split(x)[-1], 

643 lambda x: x.replace("%20", " "), 

644 ] 

645 

646 if namespace is None: 

647 namespace = "" 

648 

649 # make sure we don't return repeat values 

650 hit = set() 

651 for f in cleaners: 

652 # try just one cleaning function 

653 current = f(name) 

654 if current in hit: 

655 continue 

656 hit.add(current) 

657 yield namespace + current 

658 

659 for a, b in itertools.combinations(cleaners, 2): 

660 # apply both clean functions 

661 current = a(b(name)) 

662 if current in hit: 

663 continue 

664 hit.add(current) 

665 yield namespace + current 

666 

667 # try applying in reverse order 

668 current = b(a(name)) 

669 if current in hit: 

670 continue 

671 hit.add(current) 

672 yield namespace + current 

673 

674 if ".." in name and namespace is not None: 

675 # if someone specified relative paths give it one attempt 

676 strip = namespace.strip("/").split("/")[: -name.count("..")] 

677 strip.extend(name.split("..")[-1].strip("/").split("/")) 

678 yield "/".join(strip) 

679 

680 

681# most loaders can use a mapping in addition to a resolver 

682ResolverLike: TypeAlias = Resolver | Mapping