package org.eclipse.hono.client.command.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient;
import org.eclipse.hono.client.amqp.RequestResponseClient;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.CommandRouterClient;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.CommandRouterConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.RequestResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/command/amqp/ProtonBasedCommandRouterClient.class */
public class ProtonBasedCommandRouterClient extends AbstractRequestResponseServiceClient<JsonObject, RequestResponseResult<JsonObject>> implements CommandRouterClient {
    protected static final long SET_LAST_KNOWN_GATEWAY_UPDATE_INTERVAL_MILLIS = 400;
    protected static final int SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_ENTRIES = 100;
    protected static final int SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_PARALLEL_REQ = 50;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProtonBasedCommandRouterClient.class);
    private final LinkedHashMap<Pair<String, String>, String> lastKnownGatewaysWorkQueue;
    private Long lastKnownGatewaysUpdateTimerId;
    private boolean stopped;
    private Clock clock;

    public ProtonBasedCommandRouterClient(HonoConnection honoConnection, SendMessageSampler.Factory factory) {
        super(honoConnection, factory, new CachingClientFactory(honoConnection.getVertx(), (v0) -> {
            return v0.isOpen();
        }), null);
        this.lastKnownGatewaysWorkQueue = new LinkedHashMap<>();
        this.stopped = false;
        this.clock = Clock.systemUTC();
        honoConnection.getVertx().eventBus().consumer(Constants.EVENT_BUS_ADDRESS_TENANT_TIMED_OUT, message -> {
            this.handleTenantTimeout(message);
        });
    }

    void setClock(Clock clock) {
        this.clock = (Clock) Objects.requireNonNull(clock);
    }

    @Override // org.eclipse.hono.client.amqp.AbstractServiceClient, org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        this.stopped = true;
        Optional.ofNullable(this.lastKnownGatewaysUpdateTimerId).ifPresent(l -> {
            this.connection.getVertx().cancelTimer(l.longValue());
        });
        return disconnectOnStop();
    }

    @Override // org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient
    protected String getKey(String str) {
        return String.format("%s-%s", CommandRouterConstants.COMMAND_ROUTER_ENDPOINT, str);
    }

    private Future<RequestResponseClient<RequestResponseResult<JsonObject>>> getOrCreateClient(String str) {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient(getKey(str), () -> {
                    return RequestResponseClient.forEndpoint(this.connection, CommandRouterConstants.COMMAND_ROUTER_ENDPOINT, str, this.samplerFactory.create(CommandRouterConstants.COMMAND_ROUTER_ENDPOINT), str2 -> {
                        this.removeClient(str2);
                    }, str3 -> {
                        this.removeClient(str3);
                    });
                }, promise);
            });
        });
    }

    @Override // org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient
    protected final RequestResponseResult<JsonObject> getResult(int i, String str, Buffer buffer, CacheDirective cacheDirective, ApplicationProperties applicationProperties) {
        Map map = (Map) Optional.ofNullable(applicationProperties).map((v0) -> {
            return v0.getValue();
        }).orElse(null);
        if (buffer == null) {
            return new RequestResponseResult<>(i, null, null, map);
        }
        try {
            return new RequestResponseResult<>(i, new JsonObject(buffer), CacheDirective.noCacheDirective(), map);
        } catch (DecodeException e) {
            LOG.warn("received malformed payload from Command Router service", (Throwable) e);
            return new RequestResponseResult<>(500, null, null, map);
        }
    }

    @Override // org.eclipse.hono.client.command.CommandRouterClient
    public Future<Void> setLastKnownGatewayForDevice(String str, String str2, String str3, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        this.lastKnownGatewaysWorkQueue.put(Pair.of(str, str2), str3);
        if (this.lastKnownGatewaysUpdateTimerId == null && !this.stopped) {
            this.lastKnownGatewaysUpdateTimerId = Long.valueOf(this.connection.getVertx().setTimer(SET_LAST_KNOWN_GATEWAY_UPDATE_INTERVAL_MILLIS, l -> {
                processLastKnownGatewaysWorkQueue(null, null, null);
            }));
        }
        return Future.succeededFuture();
    }

    private void processLastKnownGatewaysWorkQueue(Instant instant, Set<String> set, Span span) {
        this.log.debug("processLastKnownGatewaysWorkQueue; queue size: {}", Integer.valueOf(this.lastKnownGatewaysWorkQueue.size()));
        Instant instant2 = (Instant) Optional.ofNullable(instant).orElseGet(() -> {
            return Instant.now(this.clock);
        });
        Span span2 = (Span) Optional.ofNullable(span).orElseGet(() -> {
            return newFollowingSpan(null, "set last known gateways");
        });
        if (set == null) {
            span2.log(Map.of("no_of_device_entries_to_set", Integer.valueOf(this.lastKnownGatewaysWorkQueue.size())));
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        HashSet hashSet = new HashSet();
        Iterator<Map.Entry<Pair<String, String>, String>> it = this.lastKnownGatewaysWorkQueue.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Pair<String, String>, String> next = it.next();
            String one = next.getKey().one();
            if (set == null || set.contains(one)) {
                if (linkedHashMap.containsKey(one) || linkedHashMap.size() < 50) {
                    linkedHashMap.putIfAbsent(one, new HashMap());
                    Map map = (Map) linkedHashMap.get(one);
                    if (map.size() < 100) {
                        map.put(next.getKey().two(), next.getValue());
                        it.remove();
                    } else {
                        hashSet.add(one);
                    }
                } else {
                    hashSet.add(one);
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        linkedHashMap.forEach((str, map2) -> {
            arrayList.add(setLastKnownGateways(str, map2, span2.context()));
        });
        CompositeFuture.join(arrayList).onComplete2(asyncResult -> {
            if (asyncResult.failed()) {
                TracingHelper.logError(span2, asyncResult.cause());
            }
            if (this.stopped) {
                span2.finish();
                return;
            }
            if (this.lastKnownGatewaysWorkQueue.isEmpty()) {
                span2.finish();
                this.lastKnownGatewaysUpdateTimerId = null;
                return;
            }
            long millis = SET_LAST_KNOWN_GATEWAY_UPDATE_INTERVAL_MILLIS - Duration.between(instant2, Instant.now(this.clock)).toMillis();
            if (millis < 1) {
                if (!hashSet.isEmpty()) {
                    span2.log(String.format("still remaining entries to be set for %d tenants - will be handled in next overall run", Integer.valueOf(hashSet.size())));
                    this.log.info("processLastKnownGatewaysWorkQueue: not all entries could be set during update interval; current queue size: {}", Integer.valueOf(this.lastKnownGatewaysWorkQueue.size()));
                }
                span2.finish();
                processLastKnownGatewaysWorkQueue(null, null, null);
                return;
            }
            if (!hashSet.isEmpty()) {
                span2.log(String.format("starting another round of requests for %d tenants (request size/count limit was reached)", Integer.valueOf(hashSet.size())));
                processLastKnownGatewaysWorkQueue(instant2, hashSet, span2);
            } else {
                this.log.debug("schedule next processLastKnownGatewaysWorkQueue invocation in {}ms", Long.valueOf(millis));
                span2.finish();
                this.lastKnownGatewaysUpdateTimerId = Long.valueOf(this.connection.getVertx().setTimer(millis, l -> {
                    processLastKnownGatewaysWorkQueue(null, null, null);
                }));
            }
        });
    }

    protected Future<Void> setLastKnownGateways(String str, Map<String, String> map, SpanContext spanContext) {
        Span newChildSpan;
        Object compose;
        if (map.size() == 1) {
            Map.Entry<String, String> next = map.entrySet().iterator().next();
            String key = next.getKey();
            String value = next.getValue();
            Map<String, Object> createDeviceIdProperties = createDeviceIdProperties(key);
            createDeviceIdProperties.put(MessageHelper.APP_PROPERTY_GATEWAY_ID, value);
            newChildSpan = newChildSpan(spanContext, "set last known gateway for device");
            TracingHelper.setDeviceTags(newChildSpan, str, key);
            newChildSpan.setTag(MessageHelper.APP_PROPERTY_GATEWAY_ID, value);
            compose = getOrCreateClient(str).compose(requestResponseClient -> {
                return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.SET_LAST_KNOWN_GATEWAY.getSubject(), createDeviceIdProperties, null, null, message -> {
                    return this.getRequestResponseResult(message);
                }, newChildSpan);
            });
        } else {
            newChildSpan = newChildSpan(spanContext, "set last known gateways for tenant devices");
            TracingHelper.setDeviceTags(newChildSpan, str, null);
            newChildSpan.log(Map.of("no_of_entries", Integer.valueOf(map.size())));
            JsonObject jsonObject = new JsonObject();
            Objects.requireNonNull(jsonObject);
            map.forEach((v1, v2) -> {
                r1.put(v1, v2);
            });
            compose = getOrCreateClient(str).compose(requestResponseClient2 -> {
                return requestResponseClient2.createAndSendRequest(CommandRouterConstants.CommandRouterAction.SET_LAST_KNOWN_GATEWAY.getSubject(), null, jsonObject.toBuffer(), "application/json", message -> {
                    return this.getRequestResponseResult(message);
                }, newChildSpan);
            });
        }
        return mapResultAndFinishSpan(compose, requestResponseResult -> {
            switch (requestResponseResult.getStatus()) {
                case 204:
                    return null;
                default:
                    throw StatusCodeMapper.from((RequestResponseResult<?>) requestResponseResult);
            }
        }, newChildSpan).onFailure(th -> {
            this.log.debug("failed to set last known gateway(s) for tenant [{}]", str, th);
        }).mapEmpty();
    }

    @Override // org.eclipse.hono.client.command.CommandRouterClient
    public Future<Void> registerCommandConsumer(String str, String str2, boolean z, String str3, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        int seconds = (duration == null || duration.getSeconds() > 2147483647L) ? -1 : (int) duration.getSeconds();
        Map<String, Object> createDeviceIdProperties = createDeviceIdProperties(str2);
        createDeviceIdProperties.put(CommandConstants.MSG_PROPERTY_ADAPTER_INSTANCE_ID, str3);
        createDeviceIdProperties.put(MessageHelper.APP_PROPERTY_LIFESPAN, Integer.valueOf(seconds));
        Span newChildSpan = newChildSpan(spanContext, "register command consumer");
        TracingHelper.setDeviceTags(newChildSpan, str, str2);
        TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(newChildSpan, str3);
        newChildSpan.setTag(MessageHelper.APP_PROPERTY_LIFESPAN, Integer.valueOf(seconds));
        createDeviceIdProperties.put(MessageHelper.APP_PROPERTY_SEND_EVENT, Boolean.valueOf(z));
        return mapResultAndFinishSpan(getOrCreateClient(str).compose(requestResponseClient -> {
            return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.REGISTER_COMMAND_CONSUMER.getSubject(), createDeviceIdProperties, null, null, message -> {
                return this.getRequestResponseResult(message);
            }, newChildSpan);
        }), requestResponseResult -> {
            switch (requestResponseResult.getStatus()) {
                case 204:
                    return null;
                default:
                    throw StatusCodeMapper.from((RequestResponseResult<?>) requestResponseResult);
            }
        }, newChildSpan).mapEmpty();
    }

    @Override // org.eclipse.hono.client.command.CommandRouterClient
    public Future<Void> unregisterCommandConsumer(String str, String str2, boolean z, String str3, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Map<String, Object> createDeviceIdProperties = createDeviceIdProperties(str2);
        createDeviceIdProperties.put(CommandConstants.MSG_PROPERTY_ADAPTER_INSTANCE_ID, str3);
        Span newChildSpan = newChildSpan(spanContext, "unregister command consumer");
        TracingHelper.setDeviceTags(newChildSpan, str, str2);
        TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(newChildSpan, str3);
        createDeviceIdProperties.put(MessageHelper.APP_PROPERTY_SEND_EVENT, Boolean.valueOf(z));
        return getOrCreateClient(str).compose(requestResponseClient -> {
            return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.UNREGISTER_COMMAND_CONSUMER.getSubject(), createDeviceIdProperties, null, null, message -> {
                return this.getRequestResponseResult(message);
            }, newChildSpan);
        }).recover(th -> {
            Tags.HTTP_STATUS.set(newChildSpan, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(newChildSpan, th);
            return Future.failedFuture(th);
        }).map(requestResponseResult -> {
            Tags.HTTP_STATUS.set(newChildSpan, Integer.valueOf(requestResponseResult.getStatus()));
            if (requestResponseResult.isError() && requestResponseResult.getStatus() != 412) {
                Tags.ERROR.set(newChildSpan, Boolean.TRUE);
            }
            switch (requestResponseResult.getStatus()) {
                case 204:
                    return null;
                default:
                    throw StatusCodeMapper.from((RequestResponseResult<?>) requestResponseResult);
            }
        }).onComplete2(asyncResult -> {
            newChildSpan.finish();
        }).mapEmpty();
    }

    @Override // org.eclipse.hono.client.command.CommandRouterClient
    public Future<Void> enableCommandRouting(List<String> list, SpanContext spanContext) {
        Objects.requireNonNull(list);
        if (list.isEmpty()) {
            return Future.succeededFuture();
        }
        Span newChildSpan = newChildSpan(spanContext, "enable command routing");
        newChildSpan.log(Map.of("no_of_tenants", Integer.valueOf(list.size())));
        JsonArray jsonArray = new JsonArray(list);
        return mapResultAndFinishSpan(getOrCreateClient(list.get(0)).compose(requestResponseClient -> {
            return requestResponseClient.createAndSendRequest(CommandRouterConstants.CommandRouterAction.ENABLE_COMMAND_ROUTING.getSubject(), null, jsonArray.toBuffer(), "application/json", message -> {
                return this.getRequestResponseResult(message);
            }, newChildSpan);
        }), requestResponseResult -> {
            switch (requestResponseResult.getStatus()) {
                case 204:
                    this.log.info("successfully enabled routing of commands for {} tenants in Command Router", Integer.valueOf(list.size()));
                    return null;
                default:
                    ServiceInvocationException from = StatusCodeMapper.from((RequestResponseResult<?>) requestResponseResult);
                    this.log.info("failed to enable routing of commands in Command Router", (Throwable) from);
                    throw from;
            }
        }, newChildSpan).mapEmpty();
    }
}
