mirror of https://github.com/apache/cloudstack.git
Merge 070b956078 into 9bbd32a8ef
This commit is contained in:
commit
32f515a5da
|
|
@ -44,6 +44,7 @@ import com.vmware.nsx_policy.infra.tier_0s.LocaleServices;
|
|||
import com.vmware.nsx_policy.infra.tier_1s.nat.NatRules;
|
||||
import com.vmware.nsx_policy.model.ApiError;
|
||||
import com.vmware.nsx_policy.model.DhcpRelayConfig;
|
||||
import com.vmware.nsx_policy.model.EnforcementPoint;
|
||||
import com.vmware.nsx_policy.model.EnforcementPointListResult;
|
||||
import com.vmware.nsx_policy.model.Group;
|
||||
import com.vmware.nsx_policy.model.GroupListResult;
|
||||
|
|
@ -64,12 +65,13 @@ import com.vmware.nsx_policy.model.PathExpression;
|
|||
import com.vmware.nsx_policy.model.PolicyGroupMembersListResult;
|
||||
import com.vmware.nsx_policy.model.PolicyNatRule;
|
||||
import com.vmware.nsx_policy.model.PolicyNatRuleListResult;
|
||||
import com.vmware.nsx_policy.model.PolicyGroupMemberDetails;
|
||||
import com.vmware.nsx_policy.model.Rule;
|
||||
import com.vmware.nsx_policy.model.SecurityPolicy;
|
||||
import com.vmware.nsx_policy.model.Segment;
|
||||
import com.vmware.nsx_policy.model.SegmentSubnet;
|
||||
import com.vmware.nsx_policy.model.ServiceListResult;
|
||||
import com.vmware.nsx_policy.model.SiteListResult;
|
||||
import com.vmware.nsx_policy.model.Site;
|
||||
import com.vmware.nsx_policy.model.Tier1;
|
||||
import com.vmware.vapi.bindings.Service;
|
||||
import com.vmware.vapi.bindings.Structure;
|
||||
|
|
@ -97,6 +99,7 @@ import java.util.Locale;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.cloudstack.utils.NsxControllerUtils.getServerPoolMemberName;
|
||||
|
|
@ -282,16 +285,18 @@ public class NsxApiClient {
|
|||
Tier1s tier1service = (Tier1s) nsxService.apply(Tier1s.class);
|
||||
return tier1service.get(tier1GatewayId);
|
||||
} catch (Exception e) {
|
||||
logger.debug(String.format("NSX Tier-1 gateway with name: %s not found", tier1GatewayId));
|
||||
logger.debug("NSX Tier-1 gateway with name: {} not found", tier1GatewayId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<com.vmware.nsx_policy.model.LocaleServices> getTier0LocalServices(String tier0Gateway) {
|
||||
private Optional<com.vmware.nsx_policy.model.LocaleServices> findTier0LocalServices(String tier0Gateway) {
|
||||
try {
|
||||
LocaleServices tier0LocaleServices = (LocaleServices) nsxService.apply(LocaleServices.class);
|
||||
LocaleServicesListResult result = tier0LocaleServices.list(tier0Gateway, null, false, null, null, null, null);
|
||||
return result.getResults();
|
||||
LocaleServicesListResult result = tier0LocaleServices.list(tier0Gateway, null, false, null, 1L, null, null);
|
||||
return Optional.ofNullable(result.getResults())
|
||||
.filter(Predicate.not(List::isEmpty))
|
||||
.map(l -> l.get(0));
|
||||
} catch (Exception e) {
|
||||
throw new CloudRuntimeException(String.format("Failed to fetch locale services for tier gateway %s due to %s", tier0Gateway, e.getMessage()));
|
||||
}
|
||||
|
|
@ -302,10 +307,13 @@ public class NsxApiClient {
|
|||
*/
|
||||
private void createTier1LocaleServices(String tier1Id, String edgeCluster, String tier0Gateway) {
|
||||
try {
|
||||
List<com.vmware.nsx_policy.model.LocaleServices> localeServices = getTier0LocalServices(tier0Gateway);
|
||||
Optional<com.vmware.nsx_policy.model.LocaleServices> localeServices = findTier0LocalServices(tier0Gateway);
|
||||
if (localeServices.isEmpty()) {
|
||||
throw new CloudRuntimeException(String.format("Failed to find locale services for tier-0 gateway %s", tier0Gateway));
|
||||
}
|
||||
com.vmware.nsx_policy.infra.tier_1s.LocaleServices tier1LocalService = (com.vmware.nsx_policy.infra.tier_1s.LocaleServices) nsxService.apply(com.vmware.nsx_policy.infra.tier_1s.LocaleServices.class);
|
||||
com.vmware.nsx_policy.model.LocaleServices localeService = new com.vmware.nsx_policy.model.LocaleServices.Builder()
|
||||
.setEdgeClusterPath(localeServices.get(0).getEdgeClusterPath()).build();
|
||||
.setEdgeClusterPath(localeServices.get().getEdgeClusterPath()).build();
|
||||
tier1LocalService.patch(tier1Id, TIER_1_LOCALE_SERVICE_ID, localeService);
|
||||
} catch (Error error) {
|
||||
throw new CloudRuntimeException(String.format("Failed to instantiate tier-1 gateway %s in edge cluster %s", tier1Id, edgeCluster));
|
||||
|
|
@ -327,7 +335,7 @@ public class NsxApiClient {
|
|||
String tier0GatewayPath = TIER_0_GATEWAY_PATH_PREFIX + tier0Gateway;
|
||||
Tier1 tier1 = getTier1Gateway(name);
|
||||
if (tier1 != null) {
|
||||
logger.info(String.format("VPC network with name %s exists in NSX zone", name));
|
||||
logger.info("VPC network with name {} exists in NSX zone", name);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -359,7 +367,7 @@ public class NsxApiClient {
|
|||
com.vmware.nsx_policy.infra.tier_1s.LocaleServices localeService = (com.vmware.nsx_policy.infra.tier_1s.LocaleServices)
|
||||
nsxService.apply(com.vmware.nsx_policy.infra.tier_1s.LocaleServices.class);
|
||||
if (getTier1Gateway(tier1Id) == null) {
|
||||
logger.warn(String.format("The Tier 1 Gateway %s does not exist, cannot be removed", tier1Id));
|
||||
logger.warn("The Tier 1 Gateway {} does not exist, cannot be removed", tier1Id);
|
||||
return;
|
||||
}
|
||||
removeTier1GatewayNatRules(tier1Id);
|
||||
|
|
@ -370,13 +378,21 @@ public class NsxApiClient {
|
|||
|
||||
private void removeTier1GatewayNatRules(String tier1Id) {
|
||||
NatRules natRulesService = (NatRules) nsxService.apply(NatRules.class);
|
||||
PolicyNatRuleListResult result = natRulesService.list(tier1Id, NAT_ID, null, false, null, null, null, null);
|
||||
List<PolicyNatRule> natRules = result.getResults();
|
||||
List<PolicyNatRule> natRules = PagedFetcher.<PolicyNatRuleListResult, PolicyNatRule>withPageFetcher(
|
||||
cursor -> natRulesService.list(tier1Id, NAT_ID, cursor, false, null, null, null, null)
|
||||
).cursorExtractor(PolicyNatRuleListResult::getCursor)
|
||||
.itemsExtractor(PolicyNatRuleListResult::getResults)
|
||||
.itemsSetter((page, allItems) -> {
|
||||
page.setResults(allItems);
|
||||
page.setResultCount((long) allItems.size());
|
||||
})
|
||||
.fetchAll()
|
||||
.getResults();
|
||||
if (CollectionUtils.isEmpty(natRules)) {
|
||||
logger.debug(String.format("Didn't find any NAT rule to remove on the Tier 1 Gateway %s", tier1Id));
|
||||
logger.debug("Didn't find any NAT rule to remove on the Tier 1 Gateway {}", tier1Id);
|
||||
} else {
|
||||
for (PolicyNatRule natRule : natRules) {
|
||||
logger.debug(String.format("Removing NAT rule %s from Tier 1 Gateway %s", natRule.getId(), tier1Id));
|
||||
logger.debug("Removing NAT rule {} from Tier 1 Gateway {}", natRule.getId(), tier1Id);
|
||||
natRulesService.delete(tier1Id, NAT_ID, natRule.getId());
|
||||
}
|
||||
}
|
||||
|
|
@ -384,38 +400,45 @@ public class NsxApiClient {
|
|||
}
|
||||
|
||||
public String getDefaultSiteId() {
|
||||
SiteListResult sites = getSites();
|
||||
if (CollectionUtils.isEmpty(sites.getResults())) {
|
||||
Optional<Site> site = findFirstSite();
|
||||
if (site.isEmpty()) {
|
||||
String errorMsg = "No sites are found in the linked NSX infrastructure";
|
||||
logger.error(errorMsg);
|
||||
throw new CloudRuntimeException(errorMsg);
|
||||
}
|
||||
return sites.getResults().get(0).getId();
|
||||
return site.get().getId();
|
||||
}
|
||||
|
||||
protected SiteListResult getSites() {
|
||||
protected Optional<Site> findFirstSite() {
|
||||
try {
|
||||
Sites sites = (Sites) nsxService.apply(Sites.class);
|
||||
return sites.list(null, false, null, null, null, null);
|
||||
List<Site> siteList = sites.list(null, false, null, 1L, null, null)
|
||||
.getResults();
|
||||
return Optional.ofNullable(siteList)
|
||||
.filter(Predicate.not(List::isEmpty))
|
||||
.map(l -> l.get(0));
|
||||
} catch (Exception e) {
|
||||
throw new CloudRuntimeException(String.format("Failed to fetch sites list due to %s", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
public String getDefaultEnforcementPointPath(String siteId) {
|
||||
EnforcementPointListResult epList = getEnforcementPoints(siteId);
|
||||
if (CollectionUtils.isEmpty(epList.getResults())) {
|
||||
Optional<EnforcementPoint> ep = findFirstEnforcementPoint(siteId);
|
||||
if (ep.isEmpty()) {
|
||||
String errorMsg = String.format("No enforcement points are found in the linked NSX infrastructure for site ID %s", siteId);
|
||||
logger.error(errorMsg);
|
||||
throw new CloudRuntimeException(errorMsg);
|
||||
}
|
||||
return epList.getResults().get(0).getPath();
|
||||
return ep.get().getPath();
|
||||
}
|
||||
|
||||
protected EnforcementPointListResult getEnforcementPoints(String siteId) {
|
||||
protected Optional<EnforcementPoint> findFirstEnforcementPoint(String siteId) {
|
||||
try {
|
||||
EnforcementPoints enforcementPoints = (EnforcementPoints) nsxService.apply(EnforcementPoints.class);
|
||||
return enforcementPoints.list(siteId, null, false, null, null, null, null);
|
||||
EnforcementPointListResult result = enforcementPoints.list(siteId, null, false, null, 1L, null, null);
|
||||
return Optional.ofNullable(result.getResults())
|
||||
.filter(Predicate.not(List::isEmpty))
|
||||
.map(l -> l.get(0));
|
||||
} catch (Exception e) {
|
||||
throw new CloudRuntimeException(String.format("Failed to fetch enforcement points due to %s", e.getMessage()));
|
||||
}
|
||||
|
|
@ -424,7 +447,15 @@ public class NsxApiClient {
|
|||
public TransportZoneListResult getTransportZones() {
|
||||
try {
|
||||
com.vmware.nsx.TransportZones transportZones = (com.vmware.nsx.TransportZones) nsxService.apply(com.vmware.nsx.TransportZones.class);
|
||||
return transportZones.list(null, null, true, null, null, null, null, null, TransportType.OVERLAY.name(), null);
|
||||
return PagedFetcher.<TransportZoneListResult, TransportZone>withPageFetcher(
|
||||
cursor -> transportZones.list(cursor, null, true, null, null, null, null, null, TransportType.OVERLAY.name(), null)
|
||||
).cursorExtractor(TransportZoneListResult::getCursor)
|
||||
.itemsExtractor(TransportZoneListResult::getResults)
|
||||
.itemsSetter((page, allItems) -> {
|
||||
page.setResults(allItems);
|
||||
page.setResultCount((long) allItems.size());
|
||||
})
|
||||
.fetchAll();
|
||||
} catch (Exception e) {
|
||||
throw new CloudRuntimeException(String.format("Failed to fetch transport zones due to %s", e.getMessage()));
|
||||
}
|
||||
|
|
@ -465,7 +496,7 @@ public class NsxApiClient {
|
|||
removeSegment(segmentName, zoneId);
|
||||
DhcpRelayConfigs dhcpRelayConfig = (DhcpRelayConfigs) nsxService.apply(DhcpRelayConfigs.class);
|
||||
String dhcpRelayConfigId = NsxControllerUtils.getNsxDhcpRelayConfigId(zoneId, domainId, accountId, vpcId, networkId);
|
||||
logger.debug(String.format("Removing the DHCP relay config with ID %s", dhcpRelayConfigId));
|
||||
logger.debug("Removing the DHCP relay config with ID {}", dhcpRelayConfigId);
|
||||
dhcpRelayConfig.delete(dhcpRelayConfigId);
|
||||
} catch (Error error) {
|
||||
ApiError ae = error.getData()._convertTo(ApiError.class);
|
||||
|
|
@ -476,7 +507,7 @@ public class NsxApiClient {
|
|||
}
|
||||
|
||||
protected void removeSegment(String segmentName, long zoneId) {
|
||||
logger.debug(String.format("Removing the segment with ID %s", segmentName));
|
||||
logger.debug("Removing the segment with ID {}", segmentName);
|
||||
Segments segmentService = (Segments) nsxService.apply(Segments.class);
|
||||
String errMsg = String.format("The segment with ID %s is not found, skipping removal", segmentName);
|
||||
try {
|
||||
|
|
@ -498,7 +529,7 @@ public class NsxApiClient {
|
|||
portCount = retrySegmentDeletion(segmentPortsService, segmentName, enforcementPointPath, zoneId);
|
||||
}
|
||||
if (portCount == 0L) {
|
||||
logger.debug(String.format("Removing the segment with ID %s", segmentName));
|
||||
logger.debug("Removing the segment with ID {}", segmentName);
|
||||
removeGroupForSegment(segmentName);
|
||||
segmentService.delete(segmentName);
|
||||
} else {
|
||||
|
|
@ -509,8 +540,18 @@ public class NsxApiClient {
|
|||
}
|
||||
|
||||
private PolicyGroupMembersListResult getSegmentPortList(SegmentPorts segmentPortsService, String segmentName, String enforcementPointPath) {
|
||||
return segmentPortsService.list(DEFAULT_DOMAIN, segmentName, null, enforcementPointPath,
|
||||
false, null, 50L, false, null);
|
||||
return PagedFetcher.
|
||||
<PolicyGroupMembersListResult, PolicyGroupMemberDetails>withPageFetcher(
|
||||
cursor -> segmentPortsService.list(DEFAULT_DOMAIN, segmentName, cursor, enforcementPointPath,
|
||||
false, null, 50L, false, null)
|
||||
)
|
||||
.cursorExtractor(PolicyGroupMembersListResult::getCursor)
|
||||
.itemsExtractor(PolicyGroupMembersListResult::getResults)
|
||||
.itemsSetter((page, allItems) -> {
|
||||
page.setResults(allItems);
|
||||
page.setResultCount((long) allItems.size());
|
||||
})
|
||||
.fetchAll();
|
||||
}
|
||||
|
||||
private Long retrySegmentDeletion(SegmentPorts segmentPortsService, String segmentName, String enforcementPointPath, long zoneId) {
|
||||
|
|
@ -546,7 +587,7 @@ public class NsxApiClient {
|
|||
.setEnabled(true)
|
||||
.build();
|
||||
|
||||
logger.debug(String.format("Creating NSX static NAT rule %s for tier-1 gateway %s (VPC: %s)", ruleName, tier1GatewayName, vpcName));
|
||||
logger.debug("Creating NSX static NAT rule {} for tier-1 gateway {} (VPC: {})", ruleName, tier1GatewayName, vpcName);
|
||||
natService.patch(tier1GatewayName, NatId.USER.name(), ruleName, rule);
|
||||
} catch (Error error) {
|
||||
ApiError ae = error.getData()._convertTo(ApiError.class);
|
||||
|
|
@ -582,8 +623,7 @@ public class NsxApiClient {
|
|||
natService.delete(tier1GatewayName, NatId.USER.name(), ruleName);
|
||||
}
|
||||
} catch (Error error) {
|
||||
String msg = String.format("Cannot find NAT rule with name %s: %s, skipping deletion", ruleName, error.getMessage());
|
||||
logger.debug(msg);
|
||||
logger.debug("Cannot find NAT rule with name {}: {}, skipping deletion", ruleName, error.getMessage());
|
||||
}
|
||||
|
||||
if (service == Network.Service.PortForwarding) {
|
||||
|
|
@ -595,7 +635,7 @@ public class NsxApiClient {
|
|||
String vmIp, String publicPort, String service) {
|
||||
try {
|
||||
NatRules natService = (NatRules) nsxService.apply(NatRules.class);
|
||||
logger.debug(String.format("Creating NSX Port-Forwarding NAT %s for network %s", ruleName, networkName));
|
||||
logger.debug("Creating NSX Port-Forwarding NAT {} for network {}", ruleName, networkName);
|
||||
PolicyNatRule rule = new PolicyNatRule.Builder()
|
||||
.setId(ruleName)
|
||||
.setDisplayName(ruleName)
|
||||
|
|
@ -698,7 +738,15 @@ public class NsxApiClient {
|
|||
}
|
||||
|
||||
LBMonitorProfileListResult listLBActiveMonitors(LbMonitorProfiles lbActiveMonitor) {
|
||||
return lbActiveMonitor.list(null, false, null, null, null, null);
|
||||
return PagedFetcher.<LBMonitorProfileListResult, Structure>withPageFetcher(
|
||||
cursor -> lbActiveMonitor.list(cursor, false, null, null, null, null)
|
||||
).cursorExtractor(LBMonitorProfileListResult::getCursor)
|
||||
.itemsExtractor(LBMonitorProfileListResult::getResults)
|
||||
.itemsSetter((page, allItems) -> {
|
||||
page.setResults(allItems);
|
||||
page.setResultCount((long) allItems.size());
|
||||
})
|
||||
.fetchAll();
|
||||
}
|
||||
|
||||
public void createNsxLoadBalancer(String tier1GatewayName) {
|
||||
|
|
@ -763,7 +811,7 @@ public class NsxApiClient {
|
|||
return lbVirtualServer;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug(String.format("Found an LB virtual server named: %s on NSX", lbVSName));
|
||||
logger.debug("Found an LB virtual server named: {} on NSX", lbVSName);
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
|
|
@ -851,8 +899,15 @@ public class NsxApiClient {
|
|||
private String getLbProfileForProtocol(String protocol) {
|
||||
try {
|
||||
LbAppProfiles lbAppProfiles = (LbAppProfiles) nsxService.apply(LbAppProfiles.class);
|
||||
LBAppProfileListResult lbAppProfileListResults = lbAppProfiles.list(null, null,
|
||||
null, null, null, null);
|
||||
LBAppProfileListResult lbAppProfileListResults = PagedFetcher.<LBAppProfileListResult, Structure>withPageFetcher(
|
||||
cursor -> lbAppProfiles.list(cursor, null, null, null, null, null)
|
||||
).cursorExtractor(LBAppProfileListResult::getCursor)
|
||||
.itemsExtractor(LBAppProfileListResult::getResults)
|
||||
.itemsSetter((page, allItems) -> {
|
||||
page.setResults(allItems);
|
||||
page.setResultCount((long) allItems.size());
|
||||
})
|
||||
.fetchAll();
|
||||
Optional<Structure> appProfile = lbAppProfileListResults.getResults().stream().filter(profile -> profile._getDataValue().getField("path").toString().contains(protocol.toLowerCase(Locale.ROOT))).findFirst();
|
||||
return appProfile.map(structure -> structure._getDataValue().getField("path").toString()).orElse(null);
|
||||
} catch (Error error) {
|
||||
|
|
@ -868,7 +923,15 @@ public class NsxApiClient {
|
|||
Services service = (Services) nsxService.apply(Services.class);
|
||||
|
||||
// Find default service if present
|
||||
ServiceListResult serviceList = service.list(null, true, false, null, null, null, null);
|
||||
ServiceListResult serviceList = PagedFetcher.<ServiceListResult, com.vmware.nsx_policy.model.Service>withPageFetcher(
|
||||
cursor -> service.list(cursor, true, false, null, null, null, null)
|
||||
).cursorExtractor(ServiceListResult::getCursor)
|
||||
.itemsExtractor(ServiceListResult::getResults)
|
||||
.itemsSetter((page, allItems) -> {
|
||||
page.setResults(allItems);
|
||||
page.setResultCount((long) allItems.size());
|
||||
})
|
||||
.fetchAll();
|
||||
|
||||
List<com.vmware.nsx_policy.model.Service> services = serviceList.getResults();
|
||||
List<String> matchedDefaultSvc = services.parallelStream().filter(svc ->
|
||||
|
|
@ -1095,9 +1158,17 @@ public class NsxApiClient {
|
|||
|
||||
private List<Group> listNsxGroups() {
|
||||
try {
|
||||
Groups groups = (Groups) nsxService.apply(Groups.class);
|
||||
GroupListResult result = groups.list(DEFAULT_DOMAIN, null, false, null, null, null, null, null);
|
||||
return result.getResults();
|
||||
Groups groups = (Groups) nsxService.apply(Groups.class);
|
||||
GroupListResult result = PagedFetcher.<GroupListResult, Group>withPageFetcher(
|
||||
cursor -> groups.list(DEFAULT_DOMAIN, cursor, false, null, null, null, null, null)
|
||||
).cursorExtractor(GroupListResult::getCursor)
|
||||
.itemsExtractor(GroupListResult::getResults)
|
||||
.itemsSetter((page, allItems) -> {
|
||||
page.setResults(allItems);
|
||||
page.setResultCount((long) allItems.size());
|
||||
})
|
||||
.fetchAll();
|
||||
return result.getResults();
|
||||
} catch (Error error) {
|
||||
ApiError ae = error.getData()._convertTo(ApiError.class);
|
||||
String msg = String.format("Failed to list NSX groups, due to: %s", ae.getErrorMessage());
|
||||
|
|
|
|||
|
|
@ -0,0 +1,82 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package org.apache.cloudstack.service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
class PagedFetcher<R, T> {
|
||||
|
||||
private final Function<String, R> fetchPage;
|
||||
private Function<R, String> cursorExtractor;
|
||||
private Function<R, List<T>> itemsExtractor;
|
||||
private BiConsumer<R, List<T>> itemsSetter;
|
||||
|
||||
static <R, T> PagedFetcher<R, T> withPageFetcher(Function<String, R> pageFetcher) {
|
||||
return new PagedFetcher<>(pageFetcher);
|
||||
}
|
||||
|
||||
PagedFetcher<R, T> cursorExtractor(Function<R, String> cursorProvider) {
|
||||
this.cursorExtractor = cursorProvider;
|
||||
return this;
|
||||
}
|
||||
|
||||
PagedFetcher<R, T> itemsExtractor(Function<R, List<T>> resultsProvider) {
|
||||
this.itemsExtractor = resultsProvider;
|
||||
return this;
|
||||
}
|
||||
|
||||
PagedFetcher<R, T> itemsSetter(BiConsumer<R, List<T>> resultsSetter) {
|
||||
this.itemsSetter = resultsSetter;
|
||||
return this;
|
||||
}
|
||||
|
||||
private PagedFetcher(Function<String, R> pageFetcher) {
|
||||
this.fetchPage = pageFetcher;
|
||||
}
|
||||
|
||||
R fetchAll() {
|
||||
Objects.requireNonNull(cursorExtractor, "Cursor extractor must be set");
|
||||
Objects.requireNonNull(itemsExtractor, "Items extractor must be set");
|
||||
Objects.requireNonNull(itemsSetter, "Items setter must be set");
|
||||
|
||||
R firstPage = fetchPage.apply(null);
|
||||
String cursor = cursorExtractor.apply(firstPage);
|
||||
if (cursor == null || cursor.isEmpty()) {
|
||||
return firstPage;
|
||||
}
|
||||
|
||||
List<T> firstResults = itemsExtractor.apply(firstPage);
|
||||
List<T> allItems = firstResults != null
|
||||
? new ArrayList<>(firstResults)
|
||||
: new ArrayList<>();
|
||||
while (cursor != null && !cursor.isEmpty()) {
|
||||
R nextPage = fetchPage.apply(cursor);
|
||||
List<T> nextItems = itemsExtractor.apply(nextPage);
|
||||
if (nextItems != null && !nextItems.isEmpty()) {
|
||||
allItems.addAll(nextItems);
|
||||
}
|
||||
cursor = cursorExtractor.apply(nextPage);
|
||||
}
|
||||
|
||||
itemsSetter.accept(firstPage, allItems);
|
||||
return firstPage;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,156 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package org.apache.cloudstack.service;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
|
||||
public class PagedFetcherTest {
|
||||
|
||||
private static class Page {
|
||||
private String cursor;
|
||||
private List<String> items;
|
||||
|
||||
Page(String cursor, List<String> items) {
|
||||
this.cursor = cursor;
|
||||
this.items = items;
|
||||
}
|
||||
|
||||
String getCursor() {
|
||||
return cursor;
|
||||
}
|
||||
|
||||
List<String> getItems() {
|
||||
return items;
|
||||
}
|
||||
|
||||
void setItems(List<String> items) {
|
||||
this.items = items;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchAllWhenThereIsNoPagination() {
|
||||
// given
|
||||
Page firstPage = new Page(null, new ArrayList<>(List.of("a", "b")));
|
||||
AtomicBoolean itemsSetterCalled = new AtomicBoolean(false);
|
||||
PagedFetcher<Page, String> fetcher = PagedFetcher.<Page, String>withPageFetcher(
|
||||
cursor -> {
|
||||
assertNull(cursor);
|
||||
return firstPage;
|
||||
})
|
||||
.cursorExtractor(Page::getCursor)
|
||||
.itemsExtractor(Page::getItems)
|
||||
.itemsSetter((page, items) -> itemsSetterCalled.set(true));
|
||||
|
||||
// when
|
||||
Page result = fetcher.fetchAll();
|
||||
|
||||
// then
|
||||
assertSame(firstPage, result);
|
||||
assertEquals(List.of("a", "b"), result.getItems());
|
||||
assertFalse("itemsSetter must not be called when there is no next page", itemsSetterCalled.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchAllWhenThereIsNoPaginationAndEmptyCursor() {
|
||||
// given
|
||||
Page firstPage = new Page("", new ArrayList<>(List.of("x")));
|
||||
|
||||
AtomicBoolean itemsSetterCalled = new AtomicBoolean(false);
|
||||
|
||||
PagedFetcher<Page, String> fetcher = PagedFetcher
|
||||
.<Page, String>withPageFetcher(cursor -> {
|
||||
assertNull(cursor);
|
||||
return firstPage;
|
||||
})
|
||||
.cursorExtractor(Page::getCursor)
|
||||
.itemsExtractor(Page::getItems)
|
||||
.itemsSetter((page, items) -> itemsSetterCalled.set(true));
|
||||
|
||||
// when
|
||||
Page result = fetcher.fetchAll();
|
||||
|
||||
// then
|
||||
assertSame(firstPage, result);
|
||||
assertEquals(List.of("x"), result.getItems());
|
||||
assertFalse("itemsSetter must not be called when there is no next page", itemsSetterCalled.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchAllWhenMultiPages() {
|
||||
// given
|
||||
Page page1 = new Page("c1", new ArrayList<>(List.of("p1a", "p1b")));
|
||||
Page page2 = new Page("c2", new ArrayList<>(List.of("p2a")));
|
||||
Page page3 = new Page(null, new ArrayList<>(List.of("p3a", "p3b")));
|
||||
|
||||
Map<String, Page> pagesByCursor = new HashMap<>();
|
||||
pagesByCursor.put(null, page1);
|
||||
pagesByCursor.put("c1", page2);
|
||||
pagesByCursor.put("c2", page3);
|
||||
|
||||
PagedFetcher<Page, String> fetcher = PagedFetcher
|
||||
.<Page, String>withPageFetcher(pagesByCursor::get)
|
||||
.cursorExtractor(Page::getCursor)
|
||||
.itemsExtractor(Page::getItems)
|
||||
.itemsSetter((page, items) -> {
|
||||
assertSame(page1, page);
|
||||
page.setItems(items);
|
||||
});
|
||||
|
||||
// when
|
||||
Page result = fetcher.fetchAll();
|
||||
|
||||
// then
|
||||
assertSame("Result must be the first page object", page1, result);
|
||||
assertEquals(List.of("p1a", "p1b", "p2a", "p3a", "p3b"), result.getItems());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchAllFirstPageItemsNullSecondWithItems() {
|
||||
// given
|
||||
Page page1 = new Page("next", null);
|
||||
Page page2 = new Page(null, new ArrayList<>(List.of("x", "y")));
|
||||
|
||||
Map<String, Page> pages = new HashMap<>();
|
||||
pages.put(null, page1);
|
||||
pages.put("next", page2);
|
||||
|
||||
PagedFetcher<Page, String> fetcher = PagedFetcher
|
||||
.<Page, String>withPageFetcher(pages::get)
|
||||
.cursorExtractor(Page::getCursor)
|
||||
.itemsExtractor(Page::getItems)
|
||||
.itemsSetter(Page::setItems);
|
||||
|
||||
// when
|
||||
Page result = fetcher.fetchAll();
|
||||
|
||||
// then
|
||||
assertSame(page1, result);
|
||||
assertEquals(List.of("x", "y"), result.getItems());
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue