package org.terracotta.management.resource.services.events;

import java.io.IOException;
import java.security.Principal;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriInfo;
import org.apache.cxf.staxutils.PropertiesExpandingStreamReader;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseBroadcaster;
import org.glassfish.jersey.server.ChunkedOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.management.ServiceLocator;
import org.terracotta.management.resource.events.EventEntityV2;
import org.terracotta.management.resource.services.events.EventServiceV2;

@Singleton
@Path("/v2/events")
/* loaded from: input_file:BOOT-INF/lib/ehcache-2.10.9.2.jar:rest-management-private-classpath/org/terracotta/management/resource/services/events/AllEventsResourceServiceImplV2.class_terracotta */
public class AllEventsResourceServiceImplV2 {
    private final EventServiceV2 eventService = (EventServiceV2) ServiceLocator.locate(EventServiceV2.class);
    private final Broadcaster broadcaster = new Broadcaster();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AllEventsResourceServiceImplV2.class);
    public static final int BATCH_SIZE = Integer.getInteger("TerracottaEventOutput.batch_size", 32).intValue();
    public static final long TIMER_INTERVAL = Long.getLong("TerracottaEventOutput.timer_interval", 917).longValue();
    public static final long MAX_IDLE_KEEPALIVE = Long.getLong("TerracottaEventOutput.max_idle_keepalive", 57917).longValue();
    private static final Timer flushTimer = new Timer("sse-flush-timer", true);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/ehcache-2.10.9.2.jar:rest-management-private-classpath/org/terracotta/management/resource/services/events/AllEventsResourceServiceImplV2$Broadcaster.class_terracotta */
    public class Broadcaster extends SseBroadcaster {
        private final Map<TerracottaEventOutput, TerracottaEventOutputFlushingMetadata> outputs;

        private Broadcaster() {
            this.outputs = new ConcurrentHashMap();
        }

        @Override // org.glassfish.jersey.server.Broadcaster, org.glassfish.jersey.server.BroadcasterListener
        public void onException(ChunkedOutput<OutboundEvent> chunkedOutput, Exception exc) {
            AllEventsResourceServiceImplV2.LOG.debug("Error writing to OutputEvent", (Throwable) exc);
            close(chunkedOutput);
        }

        @Override // org.glassfish.jersey.server.Broadcaster
        public <OUT extends ChunkedOutput<OutboundEvent>> boolean add(OUT out) {
            this.outputs.put((TerracottaEventOutput) out, new TerracottaEventOutputFlushingMetadata());
            return super.add((Broadcaster) out);
        }

        @Override // org.glassfish.jersey.server.Broadcaster, org.glassfish.jersey.server.BroadcasterListener
        public void onClose(ChunkedOutput<OutboundEvent> chunkedOutput) {
            this.outputs.remove(chunkedOutput);
            AllEventsResourceServiceImplV2.this.eventService.unregisterEventListener((EventServiceListener) chunkedOutput);
        }

        public void close(ChunkedOutput<OutboundEvent> chunkedOutput) {
            try {
                if (!chunkedOutput.isClosed()) {
                    chunkedOutput.close();
                }
            } catch (Exception e) {
                AllEventsResourceServiceImplV2.LOG.debug("Error closing SSE event output from timer", (Throwable) e);
            } finally {
                onClose(chunkedOutput);
                remove((Broadcaster) chunkedOutput);
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/ehcache-2.10.9.2.jar:rest-management-private-classpath/org/terracotta/management/resource/services/events/AllEventsResourceServiceImplV2$EventServiceListener.class_terracotta */
    public class EventServiceListener extends TerracottaEventOutput implements EventServiceV2.EventListener {
        private final String userName;

        public EventServiceListener(String str) {
            this.userName = str;
        }

        @Override // org.glassfish.jersey.server.ChunkedOutput
        public synchronized void write(OutboundEvent outboundEvent) throws IOException {
            if (isClosed()) {
                throw new IOException("closed");
            }
            TerracottaEventOutputFlushingMetadata terracottaEventOutputFlushingMetadata = (TerracottaEventOutputFlushingMetadata) AllEventsResourceServiceImplV2.this.broadcaster.outputs.get(this);
            terracottaEventOutputFlushingMetadata.accumulatedIdleTime.set(0L);
            int incrementAndGet = terracottaEventOutputFlushingMetadata.unflushedCount.incrementAndGet();
            try {
                super.write((EventServiceListener) outboundEvent);
                if (incrementAndGet != AllEventsResourceServiceImplV2.BATCH_SIZE) {
                    AllEventsResourceServiceImplV2.LOG.debug("A SSE event output accumulating {} unflushed events", Integer.valueOf(incrementAndGet));
                    return;
                }
                AllEventsResourceServiceImplV2.LOG.debug("A SSE event output reached {} unflushed events, flushing it", Integer.valueOf(incrementAndGet));
                terracottaEventOutputFlushingMetadata.unflushedCount.addAndGet(-incrementAndGet);
                super.flush();
            } catch (Throwable th) {
                if (incrementAndGet == AllEventsResourceServiceImplV2.BATCH_SIZE) {
                    AllEventsResourceServiceImplV2.LOG.debug("A SSE event output reached {} unflushed events, flushing it", Integer.valueOf(incrementAndGet));
                    terracottaEventOutputFlushingMetadata.unflushedCount.addAndGet(-incrementAndGet);
                    super.flush();
                } else {
                    AllEventsResourceServiceImplV2.LOG.debug("A SSE event output accumulating {} unflushed events", Integer.valueOf(incrementAndGet));
                }
                throw th;
            }
        }

        @Override // org.terracotta.management.resource.services.events.EventServiceV2.EventListener
        public void onEvent(EventEntityV2 eventEntityV2) {
            OutboundEvent.Builder builder = new OutboundEvent.Builder();
            builder.reconnectDelay(100L);
            builder.mediaType(MediaType.APPLICATION_JSON_TYPE);
            builder.name(EventEntityV2.class.getSimpleName());
            builder.data(EventEntityV2.class, (Object) eventEntityV2);
            try {
                write(builder.build());
            } catch (Exception e) {
                onError(e);
            }
            if (AllEventsResourceServiceImplV2.LOG.isDebugEnabled()) {
                AllEventsResourceServiceImplV2.LOG.debug(String.format("Event dispatched: {AgentId: %s, Type: %s, ApiVersion: %s, Representables: %s}", eventEntityV2.getAgentId(), eventEntityV2.getType(), eventEntityV2.getApiVersion(), eventEntityV2.getRootRepresentables()));
            }
        }

        @Override // org.terracotta.management.resource.services.events.EventServiceV2.EventListener
        public void onError(Throwable th) {
            AllEventsResourceServiceImplV2.LOG.debug("Error when waiting for management events.", th);
            try {
                AllEventsResourceServiceImplV2.this.broadcaster.close(this);
            } catch (Exception e) {
                AllEventsResourceServiceImplV2.LOG.debug("Error closing SSE event output", (Throwable) e);
            }
        }

        @Override // org.terracotta.management.resource.services.events.EventServiceV2.EventListener
        public String getUsername() {
            return this.userName;
        }

        @Override // org.glassfish.jersey.server.ChunkedOutput, javax.ws.rs.core.GenericType
        public String toString() {
            return getClass().getName() + PropertiesExpandingStreamReader.DELIMITER + Integer.toHexString(hashCode());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/ehcache-2.10.9.2.jar:rest-management-private-classpath/org/terracotta/management/resource/services/events/AllEventsResourceServiceImplV2$TerracottaEventOutputFlushingMetadata.class_terracotta */
    public static class TerracottaEventOutputFlushingMetadata {
        final AtomicInteger unflushedCount;
        final AtomicLong accumulatedIdleTime;

        private TerracottaEventOutputFlushingMetadata() {
            this.unflushedCount = new AtomicInteger();
            this.accumulatedIdleTime = new AtomicLong();
        }
    }

    public AllEventsResourceServiceImplV2() {
        LOG.debug("sse-flush-timer being used: {}", flushTimer);
        flushTimer.schedule(new TimerTask() { // from class: org.terracotta.management.resource.services.events.AllEventsResourceServiceImplV2.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                AllEventsResourceServiceImplV2.LOG.debug("There are {} registered SSE event output(s), checking them", Integer.valueOf(AllEventsResourceServiceImplV2.this.broadcaster.outputs.size()));
                for (Map.Entry entry : AllEventsResourceServiceImplV2.this.broadcaster.outputs.entrySet()) {
                    TerracottaEventOutput terracottaEventOutput = (TerracottaEventOutput) entry.getKey();
                    TerracottaEventOutputFlushingMetadata terracottaEventOutputFlushingMetadata = (TerracottaEventOutputFlushingMetadata) entry.getValue();
                    long addAndGet = terracottaEventOutputFlushingMetadata.accumulatedIdleTime.addAndGet(AllEventsResourceServiceImplV2.TIMER_INTERVAL);
                    int i = terracottaEventOutputFlushingMetadata.unflushedCount.get();
                    if (i > 0) {
                        AllEventsResourceServiceImplV2.LOG.debug("A SSE event output accumulated {} unflushed events during max interval, flushing it", Integer.valueOf(i));
                        try {
                            try {
                                terracottaEventOutput.flush();
                                terracottaEventOutputFlushingMetadata.unflushedCount.addAndGet(-i);
                            } catch (Exception e) {
                                AllEventsResourceServiceImplV2.LOG.debug("Error flushing SSE from timer, closing event output", (Throwable) e);
                                AllEventsResourceServiceImplV2.this.broadcaster.close(terracottaEventOutput);
                                terracottaEventOutputFlushingMetadata.unflushedCount.addAndGet(-i);
                            }
                        } catch (Throwable th) {
                            terracottaEventOutputFlushingMetadata.unflushedCount.addAndGet(-i);
                            throw th;
                        }
                    } else if (addAndGet >= AllEventsResourceServiceImplV2.MAX_IDLE_KEEPALIVE) {
                        AllEventsResourceServiceImplV2.LOG.debug("A SSE event output has been idle for too long {}, closing it", Long.valueOf(addAndGet));
                        AllEventsResourceServiceImplV2.this.broadcaster.close(terracottaEventOutput);
                    } else {
                        AllEventsResourceServiceImplV2.LOG.debug("A SSE event output accumulated 0 event during flush interval");
                    }
                }
            }
        }, TIMER_INTERVAL, TIMER_INTERVAL);
    }

    @GET
    @Produces({"text/event-stream"})
    public TerracottaEventOutput getServerSentEvents(@Context UriInfo uriInfo, @QueryParam("localOnly") boolean z, @Context HttpServletRequest httpServletRequest, @Context HttpServletResponse httpServletResponse) {
        Principal userPrincipal = httpServletRequest.getUserPrincipal();
        String name = userPrincipal != null ? userPrincipal.getName() : "tc_no_security_ctxt";
        EventServiceListener eventServiceListener = new EventServiceListener(name);
        LOG.debug("Invoking AllEventsResourceServiceImplV2.getServerSentEvents: info={}, localOnly={}, user={}", uriInfo.getRequestUri(), Boolean.valueOf(z), name);
        this.broadcaster.add((Broadcaster) eventServiceListener);
        this.eventService.registerEventListener(eventServiceListener, z);
        return eventServiceListener;
    }
}
