diff --git a/CHANGES/7617.feature b/CHANGES/7617.feature new file mode 100644 index 00000000000..887235a1229 --- /dev/null +++ b/CHANGES/7617.feature @@ -0,0 +1 @@ +Added support for passing ``q_select`` as a parameter to the replicate action, allowing users to selectively sync a subset of upstream distributions without modifying the stored upstream-pulp configuration. diff --git a/pulpcore/app/serializers/__init__.py b/pulpcore/app/serializers/__init__.py index 5a1dc1ada78..a691500ebc5 100644 --- a/pulpcore/app/serializers/__init__.py +++ b/pulpcore/app/serializers/__init__.py @@ -125,7 +125,7 @@ UserRoleSerializer, UserSerializer, ) -from .replica import UpstreamPulpSerializer +from .replica import UpstreamPulpReplicateSerializer, UpstreamPulpSerializer from .vulnerability_report import VulnerabilityReportSerializer from .openpgp import ( OpenPGPDistributionSerializer, diff --git a/pulpcore/app/serializers/replica.py b/pulpcore/app/serializers/replica.py index 9ee611c1c09..7d13425865f 100644 --- a/pulpcore/app/serializers/replica.py +++ b/pulpcore/app/serializers/replica.py @@ -184,3 +184,22 @@ class Meta: "last_replication", "policy", ) + + +class UpstreamPulpReplicateSerializer(serializers.Serializer): + q_select = serializers.CharField( + help_text=_( + "Filter distributions on the upstream Pulp using complex filtering. " + "When specified, overrides the stored q_select for this replication run only. " + 'E.g. pulp_label_select="foo" OR pulp_label_select="key=val"', + ), + allow_null=True, + allow_blank=True, + required=False, + ) + + def validate_q_select(self, value): + from pulpcore.app.viewsets import DistributionFilter + + DistributionFilter().filters["q"].field.clean(value) + return value diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 9ce07cf74eb..df2b14c8eb0 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -28,7 +28,7 @@ def user_agent(): return f"pulpcore/{pulp_version} ({python}, {system}) (pulp-glue {pulp_glue_version})" -def replicate_distributions(server_pk): +def replicate_distributions(server_pk, q_select=None): server = UpstreamPulp.objects.get(pk=server_pk) # Write out temporary files related to SSL @@ -88,10 +88,11 @@ def replicate_distributions(server_pk): replicator = replicator_class(ctx, task_group, remote_settings, server) supported_replicators.append(replicator) + effective_q_select = q_select if q_select is not None else server.q_select distro_repo_pairs = [] for replicator in supported_replicators: distro_names = [] - distros = replicator.upstream_distributions(q=server.q_select) + distros = replicator.upstream_distributions(q=effective_q_select) for distro in distros: # Create remote remote = replicator.create_or_update_remote(upstream_distribution=distro) @@ -117,7 +118,13 @@ def replicate_distributions(server_pk): distro_names.append(distro["name"]) distro_repo_pairs.append((distro["name"], str(repository.pk))) - replicator.remove_missing(distro_names) + # When a per-request q_select override is used, this is a selective sync + # of a subset of distributions. Skipping remove_missing avoids deleting + # distributions that simply weren't included in the filter — but it also + # means that distributions removed from upstream won't be cleaned up until + # a full (non-overridden) replication runs. + if q_select is None: + replicator.remove_missing(distro_names) except GluePulpException as e: raise ExternalServiceError(service_name=server.base_url, details=str(e)) diff --git a/pulpcore/app/viewsets/replica.py b/pulpcore/app/viewsets/replica.py index 3d1302d281b..d644ee40f17 100644 --- a/pulpcore/app/viewsets/replica.py +++ b/pulpcore/app/viewsets/replica.py @@ -8,7 +8,11 @@ from pulpcore.app.models import TaskGroup, UpstreamPulp from pulpcore.app.response import TaskGroupOperationResponse -from pulpcore.app.serializers import TaskGroupOperationResponseSerializer, UpstreamPulpSerializer +from pulpcore.app.serializers import ( + TaskGroupOperationResponseSerializer, + UpstreamPulpReplicateSerializer, + UpstreamPulpSerializer, +) from pulpcore.app.tasks import replicate_distributions from pulpcore.app.viewsets import NamedModelViewSet, RolesMixin from pulpcore.app.viewsets.base import DATETIME_FILTER_OPTIONS, NAME_FILTER_OPTIONS @@ -118,24 +122,31 @@ class UpstreamPulpViewSet( summary="Replicate", description="Trigger an asynchronous repository replication task group. This API is " "provided as a tech preview.", - request=None, + request=UpstreamPulpReplicateSerializer, responses={202: TaskGroupOperationResponseSerializer}, ) - @action(detail=True, methods=["post"]) + @action(detail=True, methods=["post"], serializer_class=UpstreamPulpReplicateSerializer) def replicate(self, request, pk): """ Triggers an asynchronous repository replication operation. """ + serializer = UpstreamPulpReplicateSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + server = UpstreamPulp.objects.get(pk=pk) task_group = TaskGroup.objects.create(description=f"Replication of {server.name}") exclusive_resources = [f"pdrn:{request.pulp_domain.pulp_id}:servers"] + task_kwargs = {"server_pk": pk} + if q_select := serializer.validated_data.get("q_select"): + task_kwargs["q_select"] = q_select + dispatch( replicate_distributions, exclusive_resources=exclusive_resources, shared_resources=[server], - kwargs={"server_pk": pk}, + kwargs=task_kwargs, task_group=task_group, ) diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index 94d7a269ba5..56cb8286798 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -36,7 +36,9 @@ def test_replication( pulpcore_bindings.UpstreamPulpsApi, upstream_pulp_body, pulp_domain=non_default_domain.name ) # Run the replicate task and assert that all tasks successfully complete. - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) task_group = monitor_task_group(response.task_group) for task in task_group.tasks: assert task.state == "completed" @@ -101,7 +103,9 @@ def test_replication_idempotence( pulpcore_bindings.UpstreamPulpsApi, upstream_pulp_body, pulp_domain=replica_domain.name ) # Run the replicate task and assert that all tasks successfully complete. - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) for api_client in ( @@ -140,7 +144,9 @@ def test_replication_idempotence( pulpcore_bindings.UpstreamPulpsApi, upstream_pulp_body, pulp_domain=source_domain.name ) # Run the replicate task and assert that all tasks successfully complete. - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp2.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp2.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) # Replicating backwards will create a new repository (deleting the old) + new remote, # but use the same distribution @@ -232,7 +238,9 @@ def test_replication_remote_settings_propagation( pulpcore_bindings.UpstreamPulpsApi, upstream_pulp_body, pulp_domain=replica_domain.name ) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = file_bindings.RemotesFileApi.list(pulp_domain=replica_domain.name) @@ -257,7 +265,9 @@ def test_replication_remote_settings_propagation( "max_retries": 2, }, ) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = file_bindings.RemotesFileApi.list(pulp_domain=replica_domain.name) @@ -315,7 +325,9 @@ def test_replication_with_repo_based_distribution( }, pulp_domain=replica_domain.name, ) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) # Verify replica distribution uses repository_version, not repository or publication @@ -393,7 +405,9 @@ def test_replication_multi_distribution_content_update( }, pulp_domain=replica_domain.name, ) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) # Record initial versions @@ -432,7 +446,9 @@ def test_replication_multi_distribution_content_update( ) # Re-replicate - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) # Verify all distributions were updated to new versions with new content @@ -513,7 +529,9 @@ def test_replication_with_wrong_ca_cert( ) # Run the replicate task and assert that it fails with SSLError with pytest.raises(PulpTaskGroupError) as e: - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) task = pulpcore_bindings.TasksApi.read(e.value.task_group.tasks[0].pulp_href) @@ -526,7 +544,9 @@ def test_replication_with_wrong_ca_cert( ) # Run the replicate task again and assert that all tasks successfully complete. - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) task_group = monitor_task_group(response.task_group) for task in task_group.tasks: assert task.state == "completed" @@ -637,7 +657,9 @@ def _check_replication( old_replication, should_run_sync_task=True, ): - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream_pulp.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) # check if the replication succeeded task_group = monitor_task_group(response.task_group) add_domain_objects_to_cleanup(local_domain) @@ -760,18 +782,45 @@ def test_replicate_rbac( ) # Assert that Alice (upstream pulp viewer) gets a 403 - try_action(alice, pulpcore_bindings.UpstreamPulpsApi, "replicate", 403, upstream_pulp.pulp_href) + replicate_body = pulpcore_bindings.module.UpstreamPulpReplicate() + try_action( + alice, + pulpcore_bindings.UpstreamPulpsApi, + "replicate", + 403, + upstream_pulp.pulp_href, + replicate_body, + ) # Assert that B (upstream pulp owner) gets a 202 - try_action(bob, pulpcore_bindings.UpstreamPulpsApi, "replicate", 202, upstream_pulp.pulp_href) + try_action( + bob, + pulpcore_bindings.UpstreamPulpsApi, + "replicate", + 202, + upstream_pulp.pulp_href, + replicate_body, + ) # Assert that Charlie (no role) get a 404 try_action( - charlie, pulpcore_bindings.UpstreamPulpsApi, "replicate", 404, upstream_pulp.pulp_href + charlie, + pulpcore_bindings.UpstreamPulpsApi, + "replicate", + 404, + upstream_pulp.pulp_href, + replicate_body, ) # Assert that Dean can run replication - try_action(dean, pulpcore_bindings.UpstreamPulpsApi, "replicate", 202, upstream_pulp.pulp_href) + try_action( + dean, + pulpcore_bindings.UpstreamPulpsApi, + "replicate", + 202, + upstream_pulp.pulp_href, + replicate_body, + ) # Assert that Dean can view the upstream pulp try_action(dean, pulpcore_bindings.UpstreamPulpsApi, "read", 200, upstream_pulp.pulp_href) @@ -841,7 +890,9 @@ def test_replicate_with_basic_q_select( pulpcore_bindings.UpstreamPulpsApi, upstream_body, pulp_domain=dest_domain.name ) # Run the replicate task and assert that all 6 repos got synced - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) add_domain_objects_to_cleanup(dest_domain) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) @@ -850,7 +901,9 @@ def test_replicate_with_basic_q_select( # Update q_select to sync only 'even' repos body = {"q_select": "pulp_label_select='even'"} pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) assert result.count == 3 @@ -859,7 +912,9 @@ def test_replicate_with_basic_q_select( # Update q_select to sync one 'upstream' repo body["q_select"] = "pulp_label_select='upstream=4'" pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) assert result.count == 1 @@ -868,13 +923,78 @@ def test_replicate_with_basic_q_select( # Show that basic label select is ANDed together body["q_select"] = "pulp_label_select='even,upstream=0'" pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) assert result.count == 1 assert result.results[0].name == "0" +@pytest.mark.parallel +def test_replicate_with_per_request_q_select( + domain_factory, + populate_upstream, + bindings_cfg, + pulpcore_bindings, + monitor_task_group, + pulp_settings, + gen_object_with_cleanup, + add_domain_objects_to_cleanup, +): + """Test that q_select can be passed per-request to the replicate action.""" + source_domain = populate_upstream(6) + dest_domain = domain_factory() + add_domain_objects_to_cleanup(dest_domain) + + # Create upstream pulp with NO stored q_select + upstream = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, + { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": source_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + }, + pulp_domain=dest_domain.name, + ) + + # Selective replicate: only sync 'even' distributions + replicate_body = pulpcore_bindings.module.UpstreamPulpReplicate( + q_select="pulp_label_select='even'" + ) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, upstream_pulp_replicate=replicate_body + ) + monitor_task_group(response.task_group) + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) + assert result.count == 3 + assert {d.name for d in result.results} == {"0", "2", "4"} + + # Selective replicate of 'odd' should NOT delete the 'even' ones (remove_missing skipped) + replicate_body = pulpcore_bindings.module.UpstreamPulpReplicate( + q_select="pulp_label_select='odd'" + ) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, upstream_pulp_replicate=replicate_body + ) + monitor_task_group(response.task_group) + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) + assert result.count == 6 + assert {d.name for d in result.results} == {"0", "1", "2", "3", "4", "5"} + + # Full replicate (no per-request q_select) should still work and run remove_missing + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) + monitor_task_group(response.task_group) + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) + assert result.count == 6 + + @pytest.mark.parallel def test_replicate_with_complex_q_select( domain_factory, @@ -903,7 +1023,9 @@ def test_replicate_with_complex_q_select( pulpcore_bindings.UpstreamPulpsApi, upstream_body, pulp_domain=dest_domain.name ) # Run the replicate task and assert that two repos got synced - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) assert result.count == 2 @@ -912,7 +1034,9 @@ def test_replicate_with_complex_q_select( # Test odds but not five body = {"q_select": "pulp_label_select='odd' AND NOT pulp_label_select='upstream=5'"} pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) assert result.count == 2 @@ -990,7 +1114,9 @@ def test_replicate_policy( pulpcore_bindings.UpstreamPulpsApi, upstream_body, pulp_domain=b_domain.name ) - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) add_domain_objects_to_cleanup(b_domain) @@ -1007,7 +1133,9 @@ def test_replicate_policy( assert result.results[0].name == "a0" # Perform second replicate and check the correct distros were deleted - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=b_domain.name) assert result.count == len(results[1]) @@ -1024,7 +1152,9 @@ def test_replicate_policy( ) # Replicate again and check that it was managed correctly by policy - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate( + upstream.pulp_href, pulpcore_bindings.module.UpstreamPulpReplicate() + ) monitor_task_group(response.task_group) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=b_domain.name) if policy in ("nodelete", "labeled"):