001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.options.ListOptions;
027import java.io.IOException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.logging.Level;
032import org.jdrupes.vmoperator.common.K8s;
033import org.jdrupes.vmoperator.common.K8sClient;
034import org.jdrupes.vmoperator.common.K8sObserver;
035import org.jdrupes.vmoperator.manager.events.Exit;
036import org.jgrapes.core.Channel;
037import org.jgrapes.core.Component;
038import org.jgrapes.core.Components;
039import org.jgrapes.core.annotation.Handler;
040import org.jgrapes.core.events.Start;
041import org.jgrapes.core.events.Stop;
042import org.jgrapes.util.events.ConfigurationUpdate;
043
044/**
045 * A base class for monitoring VM related resources. When started,
046 * it creates observers for all versions of the the {@link APIResource}
047 * configured by {@link #context(APIResource)}. The APIResource is not
048 * passed to the constructor because in some cases it has to be
049 * evaluated lazily.
050 * 
051 * @param <O> the object type for the context
052 * @param <L> the object list type for the context
053 */
054public abstract class AbstractMonitor<O extends KubernetesObject,
055        L extends KubernetesListObject, C extends Channel> extends Component {
056
057    private final Class<O> objectClass;
058    private final Class<L> objectListClass;
059    private K8sClient client;
060    private APIResource context;
061    private String namespace;
062    private ListOptions options = new ListOptions();
063    private final AtomicInteger observerCounter = new AtomicInteger(0);
064
065    /**
066     * Initializes the instance.
067     *
068     * @param componentChannel the component channel
069     * @param objectClass the class of the Kubernetes object to watch
070     * @param objectListClass the class of the list of Kubernetes objects
071     * to watch
072     */
073    protected AbstractMonitor(Channel componentChannel,
074            Class<O> objectClass, Class<L> objectListClass) {
075        super(componentChannel);
076        this.objectClass = objectClass;
077        this.objectListClass = objectListClass;
078    }
079
080    /**
081     * Return the client.
082     * 
083     * @return the client
084     */
085    public K8sClient client() {
086        return client;
087    }
088
089    /**
090     * Sets the client to be used.
091     *
092     * @param client the client
093     * @return the abstract monitor
094     */
095    public AbstractMonitor<O, L, C> client(K8sClient client) {
096        this.client = client;
097        return this;
098    }
099
100    /**
101     * Return the observed namespace.
102     * 
103     * @return the namespace
104     */
105    public String namespace() {
106        return namespace;
107    }
108
109    /**
110     * Sets the namespace to be observed.
111     *
112     * @param namespace the namespaceToWatch to set
113     * @return the abstract monitor
114     */
115    public AbstractMonitor<O, L, C> namespace(String namespace) {
116        this.namespace = namespace;
117        return this;
118    }
119
120    /**
121     * Returns the options for selecting the objects to observe.
122     * 
123     * @return the options
124     */
125    public ListOptions options() {
126        return options;
127    }
128
129    /**
130     * Sets the options for selecting the objects to observe.
131     *
132     * @param options the options to set
133     * @return the abstract monitor
134     */
135    public AbstractMonitor<O, L, C> options(ListOptions options) {
136        this.options = options;
137        return this;
138    }
139
140    /**
141     * Returns the observed context.
142     * 
143     * @return the context
144     */
145    public APIResource context() {
146        return context;
147    }
148
149    /**
150     * Sets the context to observe.
151     *
152     * @param context the context
153     * @return the abstract monitor
154     */
155    public AbstractMonitor<O, L, C> context(APIResource context) {
156        this.context = context;
157        return this;
158    }
159
160    /**
161     * Looks for a key "namespace" in the configuration and, if found,
162     * sets the namespace to its value.
163     *
164     * @param event the event
165     */
166    @Handler
167    public void onConfigurationUpdate(ConfigurationUpdate event) {
168        event.structured(Components.manager(parent()).componentPath())
169            .ifPresent(c -> {
170                if (c.containsKey("namespace")) {
171                    namespace = (String) c.get("namespace");
172                }
173            });
174    }
175
176    /**
177     * Handle the start event. Configures the namespace, invokes
178     * {@link #prepareMonitoring()} and starts the observers.
179     *
180     * @param event the event
181     */
182    @Handler(priority = 10)
183    public void onStart(Start event) {
184        try {
185            // Get namespace
186            if (namespace == null) {
187                var path = Path
188                    .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
189                if (Files.isReadable(path)) {
190                    namespace
191                        = Files.lines(path).findFirst().orElse(null);
192                }
193            }
194
195            // Additional preparations by derived class
196            prepareMonitoring();
197            assert client != null;
198            assert context != null;
199            assert namespace != null;
200
201            // Monitor all versions
202            for (var version : context.getVersions()) {
203                createObserver(version);
204            }
205            registerAsGenerator();
206        } catch (IOException | ApiException e) {
207            logger.log(Level.SEVERE, e,
208                () -> "Cannot watch VMs, terminating.");
209            event.cancel(true);
210            fire(new Exit(1));
211        }
212    }
213
214    private void createObserver(String version) {
215        observerCounter.incrementAndGet();
216        new K8sObserver<>(objectClass, objectListClass, client,
217            K8s.preferred(context, version), namespace, options)
218                .handler(this::handleChange).onTerminated((o, t) -> {
219                    if (observerCounter.decrementAndGet() == 0) {
220                        unregisterAsGenerator();
221                    }
222                    // Exception has been logged already
223                    if (t != null) {
224                        fire(new Stop());
225                    }
226                }).start();
227    }
228
229    /**
230     * Invoked by {@link #onStart(Start)} after the namespace has
231     * been configured and before starting the observer. This is
232     * the last opportunity to invoke {@link #context(APIResource)}.
233     *
234     * @throws IOException Signals that an I/O exception has occurred.
235     * @throws ApiException the api exception
236     */
237    @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
238    protected void prepareMonitoring() throws IOException, ApiException {
239        // To be overridden by derived class.
240    }
241
242    /**
243     * Handle an observed change. The method is invoked by the observer
244     * thread(s). It is the responsibility of the implementing class to
245     * fire derived events on the appropriate event pipeline.
246     *
247     * @param client the client
248     * @param change the change
249     */
250    protected abstract void handleChange(K8sClient client, Response<O> change);
251}