001/* 002 * VM-Operator 003 * Copyright (C) 2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import io.kubernetes.client.Discovery.APIResource; 022import io.kubernetes.client.common.KubernetesListObject; 023import io.kubernetes.client.common.KubernetesObject; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch.Response; 026import io.kubernetes.client.util.generic.options.ListOptions; 027import java.io.IOException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.logging.Level; 032import org.jdrupes.vmoperator.common.K8s; 033import org.jdrupes.vmoperator.common.K8sClient; 034import org.jdrupes.vmoperator.common.K8sObserver; 035import org.jdrupes.vmoperator.manager.events.Exit; 036import org.jgrapes.core.Channel; 037import org.jgrapes.core.Component; 038import org.jgrapes.core.Components; 039import org.jgrapes.core.annotation.Handler; 040import org.jgrapes.core.events.Start; 041import org.jgrapes.core.events.Stop; 042import org.jgrapes.util.events.ConfigurationUpdate; 043 044/** 045 * A base class for monitoring VM related resources. When started, 046 * it creates observers for all versions of the the {@link APIResource} 047 * configured by {@link #context(APIResource)}. The APIResource is not 048 * passed to the constructor because in some cases it has to be 049 * evaluated lazily. 050 * 051 * @param <O> the object type for the context 052 * @param <L> the object list type for the context 053 */ 054public abstract class AbstractMonitor<O extends KubernetesObject, 055 L extends KubernetesListObject, C extends Channel> extends Component { 056 057 private final Class<O> objectClass; 058 private final Class<L> objectListClass; 059 private K8sClient client; 060 private APIResource context; 061 private String namespace; 062 private ListOptions options = new ListOptions(); 063 private final AtomicInteger observerCounter = new AtomicInteger(0); 064 065 /** 066 * Initializes the instance. 067 * 068 * @param componentChannel the component channel 069 * @param objectClass the class of the Kubernetes object to watch 070 * @param objectListClass the class of the list of Kubernetes objects 071 * to watch 072 */ 073 protected AbstractMonitor(Channel componentChannel, 074 Class<O> objectClass, Class<L> objectListClass) { 075 super(componentChannel); 076 this.objectClass = objectClass; 077 this.objectListClass = objectListClass; 078 } 079 080 /** 081 * Return the client. 082 * 083 * @return the client 084 */ 085 public K8sClient client() { 086 return client; 087 } 088 089 /** 090 * Sets the client to be used. 091 * 092 * @param client the client 093 * @return the abstract monitor 094 */ 095 public AbstractMonitor<O, L, C> client(K8sClient client) { 096 this.client = client; 097 return this; 098 } 099 100 /** 101 * Return the observed namespace. 102 * 103 * @return the namespace 104 */ 105 public String namespace() { 106 return namespace; 107 } 108 109 /** 110 * Sets the namespace to be observed. 111 * 112 * @param namespace the namespaceToWatch to set 113 * @return the abstract monitor 114 */ 115 public AbstractMonitor<O, L, C> namespace(String namespace) { 116 this.namespace = namespace; 117 return this; 118 } 119 120 /** 121 * Returns the options for selecting the objects to observe. 122 * 123 * @return the options 124 */ 125 public ListOptions options() { 126 return options; 127 } 128 129 /** 130 * Sets the options for selecting the objects to observe. 131 * 132 * @param options the options to set 133 * @return the abstract monitor 134 */ 135 public AbstractMonitor<O, L, C> options(ListOptions options) { 136 this.options = options; 137 return this; 138 } 139 140 /** 141 * Returns the observed context. 142 * 143 * @return the context 144 */ 145 public APIResource context() { 146 return context; 147 } 148 149 /** 150 * Sets the context to observe. 151 * 152 * @param context the context 153 * @return the abstract monitor 154 */ 155 public AbstractMonitor<O, L, C> context(APIResource context) { 156 this.context = context; 157 return this; 158 } 159 160 /** 161 * Looks for a key "namespace" in the configuration and, if found, 162 * sets the namespace to its value. 163 * 164 * @param event the event 165 */ 166 @Handler 167 public void onConfigurationUpdate(ConfigurationUpdate event) { 168 event.structured(Components.manager(parent()).componentPath()) 169 .ifPresent(c -> { 170 if (c.containsKey("namespace")) { 171 namespace = (String) c.get("namespace"); 172 } 173 }); 174 } 175 176 /** 177 * Handle the start event. Configures the namespace, invokes 178 * {@link #prepareMonitoring()} and starts the observers. 179 * 180 * @param event the event 181 */ 182 @Handler(priority = 10) 183 public void onStart(Start event) { 184 try { 185 // Get namespace 186 if (namespace == null) { 187 var path = Path 188 .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); 189 if (Files.isReadable(path)) { 190 namespace 191 = Files.lines(path).findFirst().orElse(null); 192 } 193 } 194 195 // Additional preparations by derived class 196 prepareMonitoring(); 197 assert client != null; 198 assert context != null; 199 assert namespace != null; 200 201 // Monitor all versions 202 for (var version : context.getVersions()) { 203 createObserver(version); 204 } 205 registerAsGenerator(); 206 } catch (IOException | ApiException e) { 207 logger.log(Level.SEVERE, e, 208 () -> "Cannot watch VMs, terminating."); 209 event.cancel(true); 210 fire(new Exit(1)); 211 } 212 } 213 214 private void createObserver(String version) { 215 observerCounter.incrementAndGet(); 216 new K8sObserver<>(objectClass, objectListClass, client, 217 K8s.preferred(context, version), namespace, options) 218 .handler(this::handleChange).onTerminated((o, t) -> { 219 if (observerCounter.decrementAndGet() == 0) { 220 unregisterAsGenerator(); 221 } 222 // Exception has been logged already 223 if (t != null) { 224 fire(new Stop()); 225 } 226 }).start(); 227 } 228 229 /** 230 * Invoked by {@link #onStart(Start)} after the namespace has 231 * been configured and before starting the observer. This is 232 * the last opportunity to invoke {@link #context(APIResource)}. 233 * 234 * @throws IOException Signals that an I/O exception has occurred. 235 * @throws ApiException the api exception 236 */ 237 @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") 238 protected void prepareMonitoring() throws IOException, ApiException { 239 // To be overridden by derived class. 240 } 241 242 /** 243 * Handle an observed change. The method is invoked by the observer 244 * thread(s). It is the responsibility of the implementing class to 245 * fire derived events on the appropriate event pipeline. 246 * 247 * @param client the client 248 * @param change the change 249 */ 250 protected abstract void handleChange(K8sClient client, Response<O> change); 251}