001/* 002 * VM-Operator 003 * Copyright (C) 2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.common; 020 021import io.kubernetes.client.Discovery.APIResource; 022import io.kubernetes.client.common.KubernetesListObject; 023import io.kubernetes.client.common.KubernetesObject; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch.Response; 026import io.kubernetes.client.util.generic.GenericKubernetesApi; 027import io.kubernetes.client.util.generic.options.ListOptions; 028import java.time.Duration; 029import java.time.Instant; 030import java.util.Optional; 031import java.util.function.BiConsumer; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034import org.jgrapes.core.Components; 035 036/** 037 * An observer that watches namespaced resources in a given context and 038 * invokes a handler on changes. 039 * 040 * @param <O> the object type for the context 041 * @param <L> the object list type for the context 042 */ 043public class K8sObserver<O extends KubernetesObject, 044 L extends KubernetesListObject> { 045 046 /** 047 * The type of change reported by {@link Response} as enum. 048 */ 049 public enum ResponseType { 050 ADDED, MODIFIED, DELETED 051 } 052 053 protected final Logger logger = Logger.getLogger(getClass().getName()); 054 055 protected final K8sClient client; 056 protected final GenericKubernetesApi<O, L> api; 057 protected final APIResource context; 058 protected final String namespace; 059 protected final ListOptions options; 060 protected final Thread thread; 061 protected BiConsumer<K8sClient, Response<O>> handler; 062 protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated; 063 064 /** 065 * Create and start a new observer for objects in the given context 066 * (using preferred version) and namespace with the given options. 067 * 068 * @param objectClass the object class 069 * @param objectListClass the object list class 070 * @param client the client 071 * @param context the context 072 * @param namespace the namespace 073 * @param options the options 074 */ 075 @SuppressWarnings({ "PMD.AvoidCatchingThrowable", 076 "PMD.CognitiveComplexity", "PMD.AvoidCatchingGenericException" }) 077 public K8sObserver(Class<O> objectClass, Class<L> objectListClass, 078 K8sClient client, APIResource context, String namespace, 079 ListOptions options) { 080 this.client = client; 081 this.context = context; 082 this.namespace = namespace; 083 this.options = options; 084 085 api = new GenericKubernetesApi<>(objectClass, objectListClass, 086 context.getGroup(), context.getPreferredVersion(), 087 context.getResourcePlural(), client); 088 thread = (Components.useVirtualThreads() ? Thread.ofVirtual() 089 : Thread.ofPlatform()).unstarted(() -> { 090 try { 091 logger.fine(() -> "Observing " + context.getResourcePlural() 092 + " (" + context.getPreferredVersion() + ")" 093 + Optional.ofNullable(options.getLabelSelector()) 094 .map(ls -> " with labels " + ls).orElse("") 095 + " in " + namespace); 096 097 // Watch sometimes terminates without apparent reason. 098 while (!Thread.currentThread().isInterrupted()) { 099 Instant startedAt = Instant.now(); 100 try { 101 var changed 102 = api.watch(namespace, options).iterator(); 103 while (changed.hasNext()) { 104 var response = changed.next(); 105 logger.fine(() -> "Resource " 106 + context.getKind() + "/" 107 + response.object.getMetadata().getName() 108 + " " + response.type); 109 handler.accept(client, response); 110 } 111 } catch (ApiException | RuntimeException e) { 112 logger.log(Level.FINE, e, () -> "Problem watching" 113 + " resource " + context.getKind() 114 + " (will retry): " + e.getMessage()); 115 delayRestart(startedAt); 116 } 117 } 118 if (onTerminated != null) { 119 onTerminated.accept(this, null); 120 } 121 } catch (Throwable e) { 122 logger.log(Level.SEVERE, e, () -> "Probem watching: " 123 + e.getMessage()); 124 if (onTerminated != null) { 125 onTerminated.accept(this, e); 126 } 127 } 128 }); 129 } 130 131 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 132 private void delayRestart(Instant started) { 133 var runningFor = Duration 134 .between(started, Instant.now()).toMillis(); 135 if (runningFor < 5000) { 136 logger.log(Level.FINE, () -> "Waiting... "); 137 try { 138 Thread.sleep(5000 - runningFor); 139 } catch (InterruptedException e1) { // NOPMD 140 // Retry 141 } 142 logger.log(Level.FINE, () -> "Retrying"); 143 } 144 } 145 146 /** 147 * Sets the handler. 148 * 149 * @param handler the handler 150 * @return the observer 151 */ 152 public K8sObserver<O, L> 153 handler(BiConsumer<K8sClient, Response<O>> handler) { 154 this.handler = handler; 155 return this; 156 } 157 158 /** 159 * Sets a function to invoke if the observer terminates. First argument 160 * is this observer, the second is the throwable that caused the 161 * abnormal termination or `null` if the observer was terminated 162 * by {@link #stop()}. 163 * 164 * @param onTerminated the on terminated 165 * @return the observer 166 */ 167 public K8sObserver<O, L> onTerminated( 168 BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) { 169 this.onTerminated = onTerminated; 170 return this; 171 } 172 173 /** 174 * Start the observer. 175 * 176 * @return the observer 177 */ 178 public K8sObserver<O, L> start() { 179 if (handler == null) { 180 throw new IllegalStateException("No handler defined"); 181 } 182 thread.start(); 183 return this; 184 } 185 186 /** 187 * Stops the observer. 188 * 189 * @return the observer 190 */ 191 public K8sObserver<O, L> stop() { 192 thread.interrupt(); 193 return this; 194 } 195 196 /** 197 * Returns the client. 198 * 199 * @return the client 200 */ 201 public K8sClient client() { 202 return client; 203 } 204 205 /** 206 * Returns the context. 207 * 208 * @return the context 209 */ 210 public APIResource context() { 211 return context; 212 } 213 214 /** 215 * Returns the observed namespace. 216 * 217 * @return the namespace 218 */ 219 public String getNamespace() { 220 return namespace; 221 } 222 223 /** 224 * Returns the options for object selection. 225 * 226 * @return the list options 227 */ 228 public ListOptions options() { 229 return options; 230 } 231 232 @Override 233 public String toString() { 234 return "Observer for " + K8s.toString(context) + " " + namespace; 235 } 236 237}