001/* 002 * VM-Operator 003 * Copyright (C) 2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.common; 020 021import io.kubernetes.client.Discovery.APIResource; 022import io.kubernetes.client.common.KubernetesListObject; 023import io.kubernetes.client.common.KubernetesObject; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch.Response; 026import io.kubernetes.client.util.generic.GenericKubernetesApi; 027import io.kubernetes.client.util.generic.options.ListOptions; 028import java.time.Duration; 029import java.time.Instant; 030import java.util.Optional; 031import java.util.function.BiConsumer; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034import org.jgrapes.core.Components; 035 036/** 037 * An observer that watches namespaced resources in a given context and 038 * invokes a handler on changes. 039 * 040 * @param <O> the object type for the context 041 * @param <L> the object list type for the context 042 */ 043public class K8sObserver<O extends KubernetesObject, 044 L extends KubernetesListObject> { 045 046 /** 047 * The type of change reported by {@link Response} as enum. 048 */ 049 public enum ResponseType { 050 ADDED, MODIFIED, DELETED 051 } 052 053 @SuppressWarnings("PMD.FieldNamingConventions") 054 protected final Logger logger = Logger.getLogger(getClass().getName()); 055 056 protected final K8sClient client; 057 protected final GenericKubernetesApi<O, L> api; 058 protected final APIResource context; 059 protected final String namespace; 060 protected final ListOptions options; 061 protected final Thread thread; 062 protected BiConsumer<K8sClient, Response<O>> handler; 063 protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated; 064 065 /** 066 * Create and start a new observer for objects in the given context 067 * (using preferred version) and namespace with the given options. 068 * 069 * @param objectClass the object class 070 * @param objectListClass the object list class 071 * @param client the client 072 * @param context the context 073 * @param namespace the namespace 074 * @param options the options 075 */ 076 @SuppressWarnings({ "PMD.AvoidBranchingStatementAsLastInLoop", 077 "PMD.UseObjectForClearerAPI", "PMD.AvoidCatchingThrowable", 078 "PMD.CognitiveComplexity", "PMD.AvoidCatchingGenericException" }) 079 public K8sObserver(Class<O> objectClass, Class<L> objectListClass, 080 K8sClient client, APIResource context, String namespace, 081 ListOptions options) { 082 this.client = client; 083 this.context = context; 084 this.namespace = namespace; 085 this.options = options; 086 087 api = new GenericKubernetesApi<>(objectClass, objectListClass, 088 context.getGroup(), context.getPreferredVersion(), 089 context.getResourcePlural(), client); 090 thread = (Components.useVirtualThreads() ? Thread.ofVirtual() 091 : Thread.ofPlatform()).unstarted(() -> { 092 try { 093 logger.fine(() -> "Observing " + context.getResourcePlural() 094 + " (" + context.getPreferredVersion() + ")" 095 + Optional.ofNullable(options.getLabelSelector()) 096 .map(ls -> " with labels " + ls).orElse("") 097 + " in " + namespace); 098 099 // Watch sometimes terminates without apparent reason. 100 while (!Thread.currentThread().isInterrupted()) { 101 Instant startedAt = Instant.now(); 102 try { 103 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 104 var changed 105 = api.watch(namespace, options).iterator(); 106 while (changed.hasNext()) { 107 var response = changed.next(); 108 logger.fine(() -> "Resource " 109 + context.getKind() + "/" 110 + response.object.getMetadata().getName() 111 + " " + response.type); 112 handler.accept(client, response); 113 } 114 } catch (ApiException | RuntimeException e) { 115 logger.log(Level.FINE, e, () -> "Problem watching" 116 + " resource " + context.getKind() 117 + " (will retry): " + e.getMessage()); 118 delayRestart(startedAt); 119 } 120 } 121 if (onTerminated != null) { 122 onTerminated.accept(this, null); 123 } 124 } catch (Throwable e) { 125 logger.log(Level.SEVERE, e, () -> "Probem watching: " 126 + e.getMessage()); 127 if (onTerminated != null) { 128 onTerminated.accept(this, e); 129 } 130 } 131 }); 132 } 133 134 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 135 private void delayRestart(Instant started) { 136 var runningFor = Duration 137 .between(started, Instant.now()).toMillis(); 138 if (runningFor < 5000) { 139 logger.log(Level.FINE, () -> "Waiting... "); 140 try { 141 Thread.sleep(5000 - runningFor); 142 } catch (InterruptedException e1) { // NOPMD 143 // Retry 144 } 145 logger.log(Level.FINE, () -> "Retrying"); 146 } 147 } 148 149 /** 150 * Sets the handler. 151 * 152 * @param handler the handler 153 * @return the observer 154 */ 155 public K8sObserver<O, L> 156 handler(BiConsumer<K8sClient, Response<O>> handler) { 157 this.handler = handler; 158 return this; 159 } 160 161 /** 162 * Sets a function to invoke if the observer terminates. First argument 163 * is this observer, the second is the throwable that caused the 164 * abnormal termination or `null` if the observer was terminated 165 * by {@link #stop()}. 166 * 167 * @param onTerminated the on terminated 168 * @return the observer 169 */ 170 public K8sObserver<O, L> onTerminated( 171 BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) { 172 this.onTerminated = onTerminated; 173 return this; 174 } 175 176 /** 177 * Start the observer. 178 * 179 * @return the observer 180 */ 181 public K8sObserver<O, L> start() { 182 if (handler == null) { 183 throw new IllegalStateException("No handler defined"); 184 } 185 thread.start(); 186 return this; 187 } 188 189 /** 190 * Stops the observer. 191 * 192 * @return the observer 193 */ 194 public K8sObserver<O, L> stop() { 195 thread.interrupt(); 196 return this; 197 } 198 199 /** 200 * Returns the client. 201 * 202 * @return the client 203 */ 204 public K8sClient client() { 205 return client; 206 } 207 208 /** 209 * Returns the context. 210 * 211 * @return the context 212 */ 213 public APIResource context() { 214 return context; 215 } 216 217 /** 218 * Returns the observed namespace. 219 * 220 * @return the namespace 221 */ 222 public String getNamespace() { 223 return namespace; 224 } 225 226 /** 227 * Returns the options for object selection. 228 * 229 * @return the list options 230 */ 231 public ListOptions options() { 232 return options; 233 } 234 235 @Override 236 @SuppressWarnings("PMD.UseLocaleWithCaseConversions") 237 public String toString() { 238 return "Observer for " + K8s.toString(context) + " " + namespace; 239 } 240 241}