001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.common;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.GenericKubernetesApi;
027import io.kubernetes.client.util.generic.options.ListOptions;
028import java.time.Duration;
029import java.time.Instant;
030import java.util.Optional;
031import java.util.function.BiConsumer;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034import org.jgrapes.core.Components;
035
036/**
037 * An observer that watches namespaced resources in a given context and
038 * invokes a handler on changes.
039 *
040 * @param <O> the object type for the context
041 * @param <L> the object list type for the context
042 */
043public class K8sObserver<O extends KubernetesObject,
044        L extends KubernetesListObject> {
045
046    /**
047     * The type of change reported by {@link Response} as enum.
048     */
049    public enum ResponseType {
050        ADDED, MODIFIED, DELETED
051    }
052
053    @SuppressWarnings("PMD.FieldNamingConventions")
054    protected final Logger logger = Logger.getLogger(getClass().getName());
055
056    protected final K8sClient client;
057    protected final GenericKubernetesApi<O, L> api;
058    protected final APIResource context;
059    protected final String namespace;
060    protected final ListOptions options;
061    protected final Thread thread;
062    protected BiConsumer<K8sClient, Response<O>> handler;
063    protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated;
064
065    /**
066     * Create and start a new observer for objects in the given context 
067     * (using preferred version) and namespace with the given options.
068     *
069     * @param objectClass the object class
070     * @param objectListClass the object list class
071     * @param client the client
072     * @param context the context
073     * @param namespace the namespace
074     * @param options the options
075     */
076    @SuppressWarnings({ "PMD.AvoidBranchingStatementAsLastInLoop",
077        "PMD.UseObjectForClearerAPI", "PMD.AvoidCatchingThrowable",
078        "PMD.CognitiveComplexity", "PMD.AvoidCatchingGenericException" })
079    public K8sObserver(Class<O> objectClass, Class<L> objectListClass,
080            K8sClient client, APIResource context, String namespace,
081            ListOptions options) {
082        this.client = client;
083        this.context = context;
084        this.namespace = namespace;
085        this.options = options;
086
087        api = new GenericKubernetesApi<>(objectClass, objectListClass,
088            context.getGroup(), context.getPreferredVersion(),
089            context.getResourcePlural(), client);
090        thread = (Components.useVirtualThreads() ? Thread.ofVirtual()
091            : Thread.ofPlatform()).unstarted(() -> {
092                try {
093                    logger.fine(() -> "Observing " + context.getResourcePlural()
094                        + " (" + context.getPreferredVersion() + ")"
095                        + Optional.ofNullable(options.getLabelSelector())
096                            .map(ls -> " with labels " + ls).orElse("")
097                        + " in " + namespace);
098
099                    // Watch sometimes terminates without apparent reason.
100                    while (!Thread.currentThread().isInterrupted()) {
101                        Instant startedAt = Instant.now();
102                        try {
103                            @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
104                            var changed
105                                = api.watch(namespace, options).iterator();
106                            while (changed.hasNext()) {
107                                var response = changed.next();
108                                logger.fine(() -> "Resource "
109                                    + context.getKind() + "/"
110                                    + response.object.getMetadata().getName()
111                                    + " " + response.type);
112                                handler.accept(client, response);
113                            }
114                        } catch (ApiException | RuntimeException e) {
115                            logger.log(Level.FINE, e, () -> "Problem watching"
116                                + " resource " + context.getKind()
117                                + " (will retry): " + e.getMessage());
118                            delayRestart(startedAt);
119                        }
120                    }
121                    if (onTerminated != null) {
122                        onTerminated.accept(this, null);
123                    }
124                } catch (Throwable e) {
125                    logger.log(Level.SEVERE, e, () -> "Probem watching: "
126                        + e.getMessage());
127                    if (onTerminated != null) {
128                        onTerminated.accept(this, e);
129                    }
130                }
131            });
132    }
133
134    @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
135    private void delayRestart(Instant started) {
136        var runningFor = Duration
137            .between(started, Instant.now()).toMillis();
138        if (runningFor < 5000) {
139            logger.log(Level.FINE, () -> "Waiting... ");
140            try {
141                Thread.sleep(5000 - runningFor);
142            } catch (InterruptedException e1) { // NOPMD
143                // Retry
144            }
145            logger.log(Level.FINE, () -> "Retrying");
146        }
147    }
148
149    /**
150     * Sets the handler.
151     *
152     * @param handler the handler
153     * @return the observer
154     */
155    public K8sObserver<O, L>
156            handler(BiConsumer<K8sClient, Response<O>> handler) {
157        this.handler = handler;
158        return this;
159    }
160
161    /**
162     * Sets a function to invoke if the observer terminates. First argument
163     * is this observer, the second is the throwable that caused the
164     * abnormal termination or `null` if the observer was terminated
165     * by {@link #stop()}.
166     *
167     * @param onTerminated the on terminated
168     * @return the observer
169     */
170    public K8sObserver<O, L> onTerminated(
171            BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) {
172        this.onTerminated = onTerminated;
173        return this;
174    }
175
176    /**
177     * Start the observer.
178     *
179     * @return the observer
180     */
181    public K8sObserver<O, L> start() {
182        if (handler == null) {
183            throw new IllegalStateException("No handler defined");
184        }
185        thread.start();
186        return this;
187    }
188
189    /**
190     * Stops the observer.
191     *
192     * @return the observer
193     */
194    public K8sObserver<O, L> stop() {
195        thread.interrupt();
196        return this;
197    }
198
199    /**
200     * Returns the client.
201     *
202     * @return the client
203     */
204    public K8sClient client() {
205        return client;
206    }
207
208    /**
209     * Returns the context.
210     * 
211     * @return the context
212     */
213    public APIResource context() {
214        return context;
215    }
216
217    /**
218     * Returns the observed namespace.
219     * 
220     * @return the namespace
221     */
222    public String getNamespace() {
223        return namespace;
224    }
225
226    /**
227     * Returns the options for object selection.
228     *
229     * @return the list options
230     */
231    public ListOptions options() {
232        return options;
233    }
234
235    @Override
236    @SuppressWarnings("PMD.UseLocaleWithCaseConversions")
237    public String toString() {
238        return "Observer for " + K8s.toString(context) + " " + namespace;
239    }
240
241}