Coverage for trimesh/resolvers.py: 78%
219 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-24 04:40 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-24 04:40 +0000
1"""
2resolvers.py
3---------------
5Provides a common interface to load assets referenced by name
6like MTL files, texture images, etc. Assets can be from ZIP
7archives, web assets, or a local file path.
8"""
10import abc
11import itertools
12import os
13from pathlib import Path
14from typing import TypeAlias
16# URL parsing for remote resources via WebResolver
17from urllib.parse import urlparse
19from . import caching, util
20from .typed import HttpSessionLike, Mapping
23class Resolver(util.ABC):
24 """
25 The base class for resolvers.
26 """
28 @abc.abstractmethod
29 def __init__(self, *args, **kwargs):
30 raise NotImplementedError("Use a resolver subclass!")
32 @abc.abstractmethod
33 def get(self, key):
34 raise NotImplementedError()
36 @abc.abstractmethod
37 def write(self, name: str, data):
38 raise NotImplementedError("`write` not implemented!")
40 @abc.abstractmethod
41 def namespaced(self, namespace: str):
42 raise NotImplementedError("`namespaced` not implemented!")
44 @abc.abstractmethod
45 def keys(self):
46 raise NotImplementedError("`keys` not implemented!")
48 def __getitem__(self, key: str):
49 return self.get(key)
51 def __setitem__(self, key: str, value):
52 return self.write(key, value)
54 def __contains__(self, key: str) -> bool:
55 return key in self.keys()
58class FilePathResolver(Resolver):
59 """
60 Resolve files from a source path on the file system.
61 """
63 def __init__(self, source: str):
64 """
65 Resolve files based on a source path.
67 Parameters
68 ------------
69 source : str
70 File path where mesh was loaded from
71 """
72 # remove everything other than absolute path
73 clean = os.path.expanduser(os.path.abspath(str(source)))
75 self.clean = clean
76 if os.path.isdir(clean):
77 # if we were passed a directory use it
78 self.parent = clean
79 else:
80 # otherwise get the parent directory we've been passed
81 split = os.path.split(clean)
82 self.parent = split[0]
84 # exit if directory doesn't exist
85 if not os.path.isdir(self.parent):
86 raise ValueError(f"path `{self.parent} `not a directory!")
88 self.file_path = source
89 self.file_name = os.path.basename(source)
91 def keys(self):
92 """
93 List all files available to be loaded.
95 Yields
96 -----------
97 name : str
98 Name of a file which can be accessed.
99 """
100 parent = self.parent
101 for path, _, names in os.walk(self.parent):
102 # strip any leading parent key
103 if path.startswith(parent):
104 path = path[len(parent) :]
105 # yield each name
106 for name in names:
107 yield os.path.join(path, name)
109 def namespaced(self, namespace: str) -> "FilePathResolver":
110 """
111 Return a resolver which changes the root of the
112 resolver by an added namespace.
114 Parameters
115 -------------
116 namespace : str
117 Probably a subdirectory
119 Returns
120 --------------
121 resolver : FilePathResolver
122 Resolver with root directory changed.
123 """
124 return FilePathResolver(os.path.join(self.parent, namespace))
126 def get(self, name: str):
127 """
128 Get an asset, restricted to the resolver root.
130 Parameters
131 -------------
132 name : str
133 Name of the asset. Must resolve inside the resolver root.
135 Returns
136 ------------
137 data : bytes
138 Loaded data from asset.
139 """
140 # require each candidate to resolve inside the root
141 parent = Path(self.parent).resolve()
142 candidates = (
143 name.strip(),
144 name.strip().lstrip("/"),
145 os.path.split(name)[-1],
146 )
147 for candidate in candidates:
148 path = (parent / candidate).resolve()
149 if not path.is_relative_to(parent):
150 continue
151 if path.exists():
152 with open(path, "rb") as f:
153 return f.read()
154 raise FileNotFoundError(name)
156 def write(self, name: str, data: str | bytes):
157 """
158 Write an asset to a file path, restricted to the resolver root.
160 Parameters
161 -----------
162 name : str
163 Name of the file to write. Must resolve inside the resolver root.
164 data : str or bytes
165 Data to write to the file.
166 """
167 parent = Path(self.parent).resolve()
168 path = (parent / name.strip()).resolve()
169 if not path.is_relative_to(parent):
170 raise ValueError(f"path escapes resolver root: {name!r}")
171 with open(path, "wb") as f:
172 # handle encodings correctly for str/bytes
173 util.write_encoded(file_obj=f, stuff=data)
176class ZipResolver(Resolver):
177 """
178 Resolve files inside a ZIP archive.
179 """
181 def __init__(self, archive: dict | None = None, namespace: str | None = None):
182 """
183 Resolve files inside a ZIP archive as loaded by
184 trimesh.util.decompress
186 Parameters
187 -------------
188 archive : dict
189 Contains resources as file object
190 namespace : None or str
191 If passed will only show keys that start
192 with this value and this substring must be
193 removed for any get calls.
194 """
195 self.archive = archive
196 if isinstance(namespace, str):
197 self.namespace = namespace.strip().rstrip("/") + "/"
198 else:
199 self.namespace = None
201 def keys(self):
202 """
203 Get the available keys in the current archive.
205 Returns
206 -----------
207 keys : iterable
208 Keys in the current archive.
209 """
210 if self.namespace is not None:
211 namespace = self.namespace
212 length = len(namespace)
213 # only return keys that start with the namespace
214 # and strip off the namespace from the returned
215 # keys.
216 return [
217 k[length:]
218 for k in self.archive.keys()
219 if k.startswith(namespace) and len(k) > length
220 ]
221 return self.archive.keys()
223 def write(self, key: str, value) -> None:
224 """
225 Store a value in the current archive.
227 Parameters
228 -----------
229 key : hashable
230 Key to store data under.
231 value : str, bytes, file-like
232 Value to store.
233 """
234 if self.archive is None:
235 self.archive = {}
236 self.archive[key] = value
238 def get(self, name: str) -> bytes:
239 """
240 Get an asset from the ZIP archive.
242 Parameters
243 -------------
244 name : str
245 Name of the asset
247 Returns
248 -------------
249 data : bytes
250 Loaded data from asset
251 """
252 # not much we can do with None
253 if name is None:
254 return
255 # make sure name is a string
256 if hasattr(name, "decode"):
257 name = name.decode("utf-8")
258 # store reference to archive inside this function
259 archive = self.archive
260 # requested name not identical in
261 # storage so attempt to recover
262 if name not in archive:
263 # loop through unique results
264 for option in nearby_names(name, self.namespace):
265 if option in archive:
266 # cleaned option is in archive
267 # so store value and exit
268 name = option
269 break
271 # get the stored data
272 obj = archive[name]
273 # if the dict is storing data as bytes just return
274 if isinstance(obj, (bytes, str)):
275 return obj
276 # otherwise get it as a file object
277 # read file object from beginning
278 obj.seek(0)
279 # data is stored as a file object
280 data = obj.read()
281 obj.seek(0)
282 return data
284 def namespaced(self, namespace: str) -> "ZipResolver":
285 """
286 Return a "sub-resolver" with a root namespace.
288 Parameters
289 -------------
290 namespace : str
291 The root of the key to clip off, i.e. if
292 this resolver has key `a/b/c` you can get
293 'a/b/c' with resolver.namespaced('a/b').get('c')
295 Returns
296 -----------
297 resolver : Resolver
298 Namespaced resolver.
299 """
300 return ZipResolver(archive=self.archive, namespace=namespace)
302 def export(self) -> bytes:
303 """
304 Export the contents of the current archive as
305 a ZIP file.
307 Returns
308 ------------
309 compressed : bytes
310 Compressed data in ZIP format.
311 """
312 return util.compress(self.archive)
315class WebResolver(Resolver):
316 """
317 Resolve assets from a remote URL.
318 """
320 def __init__(
321 self,
322 url: str,
323 session: HttpSessionLike | None = None,
324 timeout: float = 30.0,
325 ):
326 """
327 Resolve assets from a base URL.
329 Parameters
330 --------------
331 url : str
332 Location where a mesh was stored or
333 directory where mesh was stored.
334 session : HttpSessionLike or None
335 Optional HTTP session used for fetches. Accepts
336 `httpx.Client` or `requests.Session`.
337 timeout : float
338 Per-request timeout in seconds.
339 """
340 if hasattr(url, "decode"):
341 url = url.decode("utf-8")
343 # parse string into namedtuple
344 parsed = urlparse(url)
345 # only http(s) is supported, reject `file://`, `gopher://`, etc.
346 if parsed.scheme not in ("http", "https"):
347 raise ValueError(f"scheme {parsed.scheme!r} not in ('http', 'https')")
349 if session is None:
350 # an explicit session will be required in a future release
351 import warnings
353 warnings.warn(
354 "`WebResolver` without a `session` is deprecated "
355 + "and will require one in a future release. "
356 + "pass an `httpx.Client` or `requests.Session`.",
357 category=DeprecationWarning,
358 stacklevel=2,
359 )
361 self.session = session
362 self.timeout = timeout
364 # we want a base url
365 split = [i for i in parsed.path.split("/") if len(i) > 0]
367 # if the last item in the url path is a filename
368 # move up a "directory" for the base path
369 if len(split) == 0:
370 path = ""
371 elif "." in split[-1]:
372 # clip off last item
373 path = "/".join(split[:-1])
374 else:
375 # recombine into string ignoring any double slashes
376 path = "/".join(split)
378 # save the URL we were created with, i.e.
379 # `https://stuff.com/models/thing.glb`
380 self.url = url
381 # save the root url, i.e. `https://stuff.com/models`
382 self.base_url = (
383 "/".join(
384 i
385 for i in [parsed.scheme + ":/", parsed.netloc.strip("/"), path.strip("/")]
386 if len(i) > 0
387 )
388 + "/"
389 )
391 # our string handling should have never inserted double slashes
392 assert "//" not in self.base_url[len(parsed.scheme) + 3 :]
393 # we should always have ended with a single slash
394 assert self.base_url.endswith("/")
396 self.file_name = url.split("/")[-1]
398 def get(self, name: str) -> bytes:
399 """
400 Get a resource from the remote site.
402 Parameters
403 -------------
404 name : str
405 Asset name, i.e. 'quadknot.obj.mtl'
406 """
407 import httpx
409 # remove leading and trailing whitespace
410 name = name.strip()
412 # the caller's session or the bare httpx module, both expose `.get`
413 client = self.session or httpx
414 url = self.base_url + name
415 response = client.get(url, follow_redirects=True, timeout=self.timeout)
417 if response.status_code >= 300:
418 # try to strip off filesystem crap
419 if name.startswith("./"):
420 name = name[2:]
421 response = client.get(
422 self.base_url + name, follow_redirects=True, timeout=self.timeout
423 )
425 # now raise if we don't have
426 response.raise_for_status()
428 # return the bytes of the response
429 return response.content
431 def get_base(self) -> bytes:
432 """
433 Fetch the data at the full URL this resolver was
434 instantiated with, i.e. `https://stuff.com/hi.glb`
435 this will return the response.
437 Returns
438 --------
439 content
440 The value at `self.url`
441 """
442 import httpx
444 # just fetch the url we were created with
445 response = (self.session or httpx).get(
446 self.url, follow_redirects=True, timeout=self.timeout
447 )
448 response.raise_for_status()
449 return response.content
451 def namespaced(self, namespace: str) -> "WebResolver":
452 """
453 Return a namespaced version of current resolver.
455 Parameters
456 -------------
457 namespace : str
458 URL fragment
460 Returns
461 -----------
462 resolver : WebResolver
463 With sub-url: `https://example.com/{namespace}`
464 """
465 # propagate session/timeout so the child keeps the same posture
466 return WebResolver(
467 url=self.base_url + namespace,
468 session=self.session,
469 timeout=self.timeout,
470 )
472 def write(self, key, value):
473 raise NotImplementedError("`WebResolver` is read-only!")
475 def keys(self):
476 raise NotImplementedError("`WebResolver` can't list keys")
479class GithubResolver(Resolver):
480 def __init__(
481 self,
482 repo: str,
483 branch: str | None = None,
484 commit: str | None = None,
485 save: str | None = None,
486 session: HttpSessionLike | None = None,
487 timeout: float = 30.0,
488 ):
489 """
490 Get files from a remote Github repository by
491 downloading a zip file with the entire branch
492 or a specific commit.
494 Parameters
495 -------------
496 repo
497 In the format of `owner/repo`.
498 branch
499 The remote branch you want to get files from.
500 commit
501 The full commit hash: pass either this OR branch.
502 save
503 A path if you want to save results locally.
504 session : HttpSessionLike or None
505 Optional HTTP session used for fetches. Accepts
506 `httpx.Client` or `requests.Session`.
507 timeout : float
508 Per-request timeout in seconds.
509 """
511 if commit is not None:
512 # just get the exact commit
513 self.url = f"https://github.com/{repo}/archive/{commit}.zip"
514 elif branch is not None:
515 # gets the latest commit on the specified branch.
516 self.url = f"https://github.com/{repo}/archive/refs/heads/{branch}.zip"
517 else:
518 raise ValueError("`commit` or `branch` must be passed!")
520 if session is None:
521 # same deprecation as `WebResolver`, see there for rationale
522 import warnings
524 warnings.warn(
525 "`GithubResolver` without a `session` is deprecated "
526 + "and will require one in a future release. "
527 + "pass an `httpx.Client` or `requests.Session`.",
528 category=DeprecationWarning,
529 stacklevel=2,
530 )
532 self.session = session
533 self.timeout = timeout
535 if save is not None:
536 self.cache = caching.DiskCache(save)
537 else:
538 self.cache = None
540 def keys(self):
541 """
542 List the available files in the repository.
544 Returns
545 ----------
546 keys : iterable
547 Keys available to the resolved.
548 """
549 return self.zipped.keys()
551 def write(self, name, data):
552 raise NotImplementedError("`write` not implemented!")
554 @property
555 def zipped(self) -> ZipResolver:
556 """
557 - opened zip file
558 - locally saved zip file
559 - retrieve zip file and saved
560 """
562 def fetch() -> bytes:
563 """
564 Fetch the remote zip file.
565 """
566 import httpx
568 response = (self.session or httpx).get(
569 self.url, follow_redirects=True, timeout=self.timeout
570 )
571 response.raise_for_status()
572 return response.content
574 if hasattr(self, "_zip"):
575 return self._zip
576 # download the archive or get from disc
577 raw = self.cache.get(self.url, fetch)
578 # create a zip resolver for the archive
579 # the root directory in the zip is the repo+commit so strip that off
580 # so the keys are usable, i.e. "models" instead of "trimesh-2232323/models"
581 self._zip = ZipResolver(
582 {
583 k.split("/", 1)[1]: v
584 for k, v in util.decompress(
585 util.wrap_as_stream(raw), file_type="zip"
586 ).items()
587 }
588 )
590 return self._zip
592 def get(self, key):
593 return self.zipped.get(key)
595 def namespaced(self, namespace):
596 """
597 Return a "sub-resolver" with a root namespace.
599 Parameters
600 -------------
601 namespace : str
602 The root of the key to clip off, i.e. if
603 this resolver has key `a/b/c` you can get
604 'a/b/c' with resolver.namespaced('a/b').get('c')
606 Returns
607 -----------
608 resolver : Resolver
609 Namespaced resolver.
610 """
611 return self.zipped.namespaced(namespace)
614def nearby_names(name, namespace=None):
615 """
616 Try to find nearby variants of a specified name.
618 Parameters
619 ------------
620 name : str
621 Initial name.
623 Yields
624 -----------
625 nearby : str
626 Name that is a lightly permutated version
627 of the initial name.
628 """
630 # the various operations that *might* result in a correct key
631 def trim(prefix, item):
632 if item.startswith(prefix):
633 return item[len(prefix) :]
634 return item
636 cleaners = [
637 lambda x: x,
638 lambda x: x.strip(),
639 lambda x: trim("./", x),
640 lambda x: trim(".\\", x),
641 lambda x: trim("\\", x),
642 lambda x: os.path.split(x)[-1],
643 lambda x: x.replace("%20", " "),
644 ]
646 if namespace is None:
647 namespace = ""
649 # make sure we don't return repeat values
650 hit = set()
651 for f in cleaners:
652 # try just one cleaning function
653 current = f(name)
654 if current in hit:
655 continue
656 hit.add(current)
657 yield namespace + current
659 for a, b in itertools.combinations(cleaners, 2):
660 # apply both clean functions
661 current = a(b(name))
662 if current in hit:
663 continue
664 hit.add(current)
665 yield namespace + current
667 # try applying in reverse order
668 current = b(a(name))
669 if current in hit:
670 continue
671 hit.add(current)
672 yield namespace + current
674 if ".." in name and namespace is not None:
675 # if someone specified relative paths give it one attempt
676 strip = namespace.strip("/").split("/")[: -name.count("..")]
677 strip.extend(name.split("..")[-1].strip("/").split("/"))
678 yield "/".join(strip)
681# most loaders can use a mapping in addition to a resolver
682ResolverLike: TypeAlias = Resolver | Mapping