001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.common;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.GenericKubernetesApi;
027import io.kubernetes.client.util.generic.options.ListOptions;
028import java.time.Duration;
029import java.time.Instant;
030import java.util.Optional;
031import java.util.function.BiConsumer;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034import org.jgrapes.core.Components;
035
036/**
037 * An observer that watches namespaced resources in a given context and
038 * invokes a handler on changes.
039 *
040 * @param <O> the object type for the context
041 * @param <L> the object list type for the context
042 */
043public class K8sObserver<O extends KubernetesObject,
044        L extends KubernetesListObject> {
045
046    /**
047     * The type of change reported by {@link Response} as enum.
048     */
049    public enum ResponseType {
050        ADDED, MODIFIED, DELETED
051    }
052
053    protected final Logger logger = Logger.getLogger(getClass().getName());
054
055    protected final K8sClient client;
056    protected final GenericKubernetesApi<O, L> api;
057    protected final APIResource context;
058    protected final String namespace;
059    protected final ListOptions options;
060    protected final Thread thread;
061    protected BiConsumer<K8sClient, Response<O>> handler;
062    protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated;
063
064    /**
065     * Create and start a new observer for objects in the given context 
066     * (using preferred version) and namespace with the given options.
067     *
068     * @param objectClass the object class
069     * @param objectListClass the object list class
070     * @param client the client
071     * @param context the context
072     * @param namespace the namespace
073     * @param options the options
074     */
075    @SuppressWarnings({ "PMD.AvoidCatchingThrowable",
076        "PMD.CognitiveComplexity", "PMD.AvoidCatchingGenericException" })
077    public K8sObserver(Class<O> objectClass, Class<L> objectListClass,
078            K8sClient client, APIResource context, String namespace,
079            ListOptions options) {
080        this.client = client;
081        this.context = context;
082        this.namespace = namespace;
083        this.options = options;
084
085        api = new GenericKubernetesApi<>(objectClass, objectListClass,
086            context.getGroup(), context.getPreferredVersion(),
087            context.getResourcePlural(), client);
088        thread = (Components.useVirtualThreads() ? Thread.ofVirtual()
089            : Thread.ofPlatform()).unstarted(() -> {
090                try {
091                    logger.fine(() -> "Observing " + context.getResourcePlural()
092                        + " (" + context.getPreferredVersion() + ")"
093                        + Optional.ofNullable(options.getLabelSelector())
094                            .map(ls -> " with labels " + ls).orElse("")
095                        + " in " + namespace);
096
097                    // Watch sometimes terminates without apparent reason.
098                    while (!Thread.currentThread().isInterrupted()) {
099                        Instant startedAt = Instant.now();
100                        try {
101                            var changed
102                                = api.watch(namespace, options).iterator();
103                            while (changed.hasNext()) {
104                                var response = changed.next();
105                                logger.fine(() -> "Resource "
106                                    + context.getKind() + "/"
107                                    + response.object.getMetadata().getName()
108                                    + " " + response.type);
109                                handler.accept(client, response);
110                            }
111                        } catch (ApiException | RuntimeException e) {
112                            logger.log(Level.FINE, e, () -> "Problem watching"
113                                + " resource " + context.getKind()
114                                + " (will retry): " + e.getMessage());
115                            delayRestart(startedAt);
116                        }
117                    }
118                    if (onTerminated != null) {
119                        onTerminated.accept(this, null);
120                    }
121                } catch (Throwable e) {
122                    logger.log(Level.SEVERE, e, () -> "Probem watching: "
123                        + e.getMessage());
124                    if (onTerminated != null) {
125                        onTerminated.accept(this, e);
126                    }
127                }
128            });
129    }
130
131    @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
132    private void delayRestart(Instant started) {
133        var runningFor = Duration
134            .between(started, Instant.now()).toMillis();
135        if (runningFor < 5000) {
136            logger.log(Level.FINE, () -> "Waiting... ");
137            try {
138                Thread.sleep(5000 - runningFor);
139            } catch (InterruptedException e1) { // NOPMD
140                // Retry
141            }
142            logger.log(Level.FINE, () -> "Retrying");
143        }
144    }
145
146    /**
147     * Sets the handler.
148     *
149     * @param handler the handler
150     * @return the observer
151     */
152    public K8sObserver<O, L>
153            handler(BiConsumer<K8sClient, Response<O>> handler) {
154        this.handler = handler;
155        return this;
156    }
157
158    /**
159     * Sets a function to invoke if the observer terminates. First argument
160     * is this observer, the second is the throwable that caused the
161     * abnormal termination or `null` if the observer was terminated
162     * by {@link #stop()}.
163     *
164     * @param onTerminated the on terminated
165     * @return the observer
166     */
167    public K8sObserver<O, L> onTerminated(
168            BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) {
169        this.onTerminated = onTerminated;
170        return this;
171    }
172
173    /**
174     * Start the observer.
175     *
176     * @return the observer
177     */
178    public K8sObserver<O, L> start() {
179        if (handler == null) {
180            throw new IllegalStateException("No handler defined");
181        }
182        thread.start();
183        return this;
184    }
185
186    /**
187     * Stops the observer.
188     *
189     * @return the observer
190     */
191    public K8sObserver<O, L> stop() {
192        thread.interrupt();
193        return this;
194    }
195
196    /**
197     * Returns the client.
198     *
199     * @return the client
200     */
201    public K8sClient client() {
202        return client;
203    }
204
205    /**
206     * Returns the context.
207     * 
208     * @return the context
209     */
210    public APIResource context() {
211        return context;
212    }
213
214    /**
215     * Returns the observed namespace.
216     * 
217     * @return the namespace
218     */
219    public String getNamespace() {
220        return namespace;
221    }
222
223    /**
224     * Returns the options for object selection.
225     *
226     * @return the list options
227     */
228    public ListOptions options() {
229        return options;
230    }
231
232    @Override
233    public String toString() {
234        return "Observer for " + K8s.toString(context) + " " + namespace;
235    }
236
237}