-
Notifications
You must be signed in to change notification settings - Fork 4.7k
[Fix] Fix cpu inference UT failure #4430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e60e645
ed95d21
f0022b0
af2f380
257ed96
ac4254f
cc0294f
af6661a
861088f
48787d9
f516fbd
17183bd
34b2570
f40a484
4ed3b60
bac6bb6
577b292
15295ae
8d182cb
d52ff77
0c6fa89
3dd7d34
e9fafa7
50bba12
1bd0dfb
5e41955
51922e4
c98752b
fc6025c
a8cec8b
9fb8ecb
590c959
c4cabcd
3663b75
7ca2ba5
3934919
4bf6493
b90fa99
62d835c
8055034
71d1106
b50a481
3dce178
1596224
a72beea
057b6ff
5886645
3244e1f
21b438c
2e6fa99
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,7 +61,8 @@ def is_initialized(self): | |
|
|
||
| def run_collective(self, name, **kwargs): | ||
| if name in self.available_coll: | ||
| kwargs['group'] = self.get_all_ranks_from_group(kwargs['group']) | ||
| if 'group' in kwargs: | ||
| kwargs['group'] = self.get_all_ranks_from_group(kwargs['group']) | ||
| if 'dst' in kwargs: | ||
| kwargs['dst'] = kwargs['group'].index(kwargs['dst']) | ||
| if 'src' in kwargs: | ||
|
|
@@ -71,23 +72,38 @@ def run_collective(self, name, **kwargs): | |
| return CCLHandler(self.ccl_comm_op) | ||
| else: | ||
| func = "super(CCLBackend, self)." + name | ||
| return eval(func)(*(kwargs.values())) | ||
| eval(func)(*(kwargs.values())) | ||
| return CCLHandler(self.ccl_comm_op) | ||
|
|
||
| def all_reduce(self, tensor, op=ReduceOp.SUM, group=None, async_op=False): | ||
| use_caching = False | ||
| if use_caching: | ||
| match_id = f"{tensor.size()}-{op}" | ||
| return self.run_collective(name="all_reduce_caching", | ||
| tensor=tensor, | ||
| op=op, | ||
| match_id=match_id, | ||
| group=group, | ||
| async_op=async_op) | ||
| name = "all_reduce_caching" | ||
| if name in self.available_coll: | ||
| group = self.get_all_ranks_from_group(group) | ||
| return self.ccl_comm_op.all_reduce_caching(tensor, op, match_id, group, async_op) | ||
| else: | ||
| return self.run_collective(name=name, | ||
| tensor=tensor, | ||
| op=op, | ||
| match_id=match_id, | ||
| group=group, | ||
| async_op=async_op) | ||
| else: | ||
| return self.run_collective(name="all_reduce", tensor=tensor, op=op, group=group, async_op=async_op) | ||
| name = "all_reduce" | ||
| if name in self.available_coll: | ||
| group = self.get_all_ranks_from_group(group) | ||
| return self.ccl_comm_op.all_reduce(tensor, op, group, async_op) | ||
| else: | ||
| return self.run_collective(name=name, tensor=tensor, op=op, group=group, async_op=async_op) | ||
|
|
||
| def inference_all_reduce(self, tensor, op=ReduceOp.SUM, group=None, async_op=False): | ||
| return self.run_collective(name="inference_all_reduce", tensor=tensor, op=op, group=group, async_op=async_op) | ||
| name = "inference_all_reduce" | ||
| if name in self.available_coll: | ||
| return self.ccl_comm_op.inference_all_reduce(tensor, op, async_op) | ||
| else: | ||
| return self.run_collective(name=name, tensor=tensor, op=op, group=None, async_op=async_op) | ||
|
|
||
| def broadcast(self, tensor, src, group=None, async_op=False): | ||
| return self.run_collective(name="broadcast", tensor=tensor, src=src, group=group, async_op=async_op) | ||
|
|
@@ -120,11 +136,11 @@ def all_to_all_single(self, output, input, output_split_sizes, input_split_sizes | |
| input_split_sizes=input_split_sizes, | ||
| group=group) | ||
|
|
||
| def send(self, tensor, dst, group=None, async_op=False): | ||
| return self.run_collective(name="send", tensor=tensor, dst=dst, group=group, async_op=async_op) | ||
| def send(self, tensor, dst, group=None, tag=0): | ||
| return self.run_collective(name="send", tensor=tensor, dst=dst, group=group, tag=tag) | ||
|
|
||
| def recv(self, tensor, src, group=None, async_op=False): | ||
| return self.run_collective(name="recv", tensor=tensor, src=src, group=group, async_op=async_op) | ||
| def recv(self, tensor, src, group=None, tag=0): | ||
| return self.run_collective(name="recv", tensor=tensor, src=src, group=group, tag=tag) | ||
|
|
||
| def gather(self, tensor, gather_list, dst, group=None, async_op=False): | ||
| return self.run_collective(name="gather", tensor=tensor, gather_list=gather_list, dst=dst, group=group) | ||
|
|
@@ -170,7 +186,7 @@ def get_all_ranks_from_group(self, group): | |
| while True: | ||
| results.append(super(CCLBackend, self).get_global_rank(group, rank)) | ||
| rank += 1 | ||
| except ValueError: | ||
| except (ValueError, RuntimeError): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the runtime error that we can hit here?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| pass | ||
| if tuple(results) not in self.groups: | ||
| self._new_group(results, group) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
|
|
||
| import os | ||
| import time | ||
| import pickle | ||
| import torch | ||
| import pytest | ||
| import itertools | ||
|
|
@@ -65,7 +66,13 @@ | |
| ] | ||
|
|
||
| # Get a list of all models and mapping from task to supported models | ||
| _hf_models = list(HfApi().list_models()) | ||
| try: | ||
| with open("hf_models.pkl", "rb") as fp: | ||
| _hf_models = pickle.load(fp) | ||
| except FileNotFoundError: | ||
| _hf_models = list(HfApi().list_models()) | ||
| with open("hf_models.pkl", "wb") as fp: | ||
| pickle.dump(_hf_models, fp) | ||
|
Comment on lines
+69
to
+75
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that caching the model list can be a good idea for the tests, but we need to save it to blob storage so that it is persistent. Additionally, I think the cache should have a timestamp connected to it, such that we update it every hour/day/week. See how we do this in MII: |
||
| _hf_model_names = [m.modelId for m in _hf_models] | ||
| _hf_task_to_models = {task: [m.modelId for m in _hf_models if m.pipeline_tag == task] for task in _test_tasks} | ||
|
|
||
|
|
@@ -280,6 +287,12 @@ def test( | |
| if invalid_test_msg: | ||
| pytest.skip(invalid_test_msg) | ||
|
|
||
| if dtype not in get_accelerator().supported_dtypes(): | ||
| pytest.skip(f"Acceleraor {get_accelerator().device_name()} does not support {dtype}.") | ||
|
|
||
| if not deepspeed.ops.__compatible_ops__[InferenceBuilder.NAME]: | ||
| pytest.skip("This op had not been implemented on this system.", allow_module_level=True) | ||
|
|
||
| model, task = model_w_task | ||
| local_rank = int(os.getenv("LOCAL_RANK", "0")) | ||
|
|
||
|
|
@@ -536,9 +549,8 @@ def test( | |
| if dtype not in get_accelerator().supported_dtypes(): | ||
| pytest.skip(f"Acceleraor {get_accelerator().device_name()} does not support {dtype}.") | ||
|
|
||
| # TODO: enable this test after torch 2.1 stable release | ||
| if dtype == torch.bfloat16 and model_w_task[0] == "Salesforce/codegen-350M-mono": | ||
| pytest.skip("Codegen model(bf16) need to use torch version > 2.0.") | ||
| pytest.skip("Disable Codegen model(bf16) due to slight result difference") | ||
|
|
||
| model, task = model_w_task | ||
| local_rank = int(os.getenv("LOCAL_RANK", "0")) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.