001/* 002 * VM-Operator 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import io.kubernetes.client.openapi.ApiException; 022import io.kubernetes.client.openapi.models.V1Pod; 023import io.kubernetes.client.openapi.models.V1PodList; 024import io.kubernetes.client.util.Watch.Response; 025import io.kubernetes.client.util.generic.options.ListOptions; 026import java.io.IOException; 027import java.time.Duration; 028import java.time.Instant; 029import java.util.Map; 030import java.util.Optional; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.logging.Level; 033import static org.jdrupes.vmoperator.common.Constants.APP_NAME; 034import static org.jdrupes.vmoperator.common.Constants.VM_OP_NAME; 035import org.jdrupes.vmoperator.common.K8sClient; 036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 037import org.jdrupes.vmoperator.common.K8sV1PodStub; 038import org.jdrupes.vmoperator.manager.events.ChannelDictionary; 039import org.jdrupes.vmoperator.manager.events.PodChanged; 040import org.jdrupes.vmoperator.manager.events.VmChannel; 041import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 042import org.jgrapes.core.Channel; 043import org.jgrapes.core.annotation.Handler; 044 045/** 046 * Watches for changes of pods that run VMs. 047 */ 048public class PodMonitor extends AbstractMonitor<V1Pod, V1PodList, VmChannel> { 049 050 private final ChannelDictionary<String, VmChannel, ?> channelDictionary; 051 052 private final Map<String, PendingChange> pendingChanges 053 = new ConcurrentHashMap<>(); 054 055 /** 056 * Instantiates a new pod monitor. 057 * 058 * @param componentChannel the component channel 059 * @param channelDictionary the channel dictionary 060 */ 061 public PodMonitor(Channel componentChannel, 062 ChannelDictionary<String, VmChannel, ?> channelDictionary) { 063 super(componentChannel, V1Pod.class, V1PodList.class); 064 this.channelDictionary = channelDictionary; 065 context(K8sV1PodStub.CONTEXT); 066 ListOptions options = new ListOptions(); 067 options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + "," 068 + "app.kubernetes.io/component=" + APP_NAME + "," 069 + "app.kubernetes.io/managed-by=" + VM_OP_NAME); 070 options(options); 071 } 072 073 @Override 074 protected void prepareMonitoring() throws IOException, ApiException { 075 client(new K8sClient()); 076 } 077 078 @Override 079 protected void handleChange(K8sClient client, Response<V1Pod> change) { 080 String vmName = change.object.getMetadata().getLabels() 081 .get("app.kubernetes.io/instance"); 082 if (vmName == null) { 083 return; 084 } 085 var channel = channelDictionary.channel(vmName).orElse(null); 086 var responseType = ResponseType.valueOf(change.type); 087 if (channel != null && channel.vmDefinition() != null) { 088 pendingChanges.remove(vmName); 089 channel.fire(new PodChanged(change.object, responseType)); 090 return; 091 } 092 093 // VM definition not available yet, may happen during startup 094 if (responseType == ResponseType.DELETED) { 095 return; 096 } 097 purgePendingChanges(); 098 logger.finer(() -> "Add pending pod change for " + vmName); 099 pendingChanges.put(vmName, new PendingChange(Instant.now(), change)); 100 } 101 102 private void purgePendingChanges() { 103 Instant tooOld = Instant.now().minus(Duration.ofMinutes(15)); 104 for (var itr = pendingChanges.entrySet().iterator(); itr.hasNext();) { 105 var change = itr.next(); 106 if (change.getValue().from().isBefore(tooOld)) { 107 itr.remove(); 108 logger.finer( 109 () -> "Cleaned pending pod change for " + change.getKey()); 110 } 111 } 112 } 113 114 /** 115 * Check for pending changes. 116 * 117 * @param event the event 118 * @param channel the channel 119 */ 120 @Handler 121 public void onVmResourceChanged(VmResourceChanged event, 122 VmChannel channel) { 123 Optional.ofNullable(pendingChanges.remove(event.vmDefinition().name())) 124 .map(PendingChange::change).ifPresent(change -> { 125 logger.finer(() -> "Firing pending pod change for " 126 + event.vmDefinition().name()); 127 channel.fire(new PodChanged(change.object, 128 ResponseType.valueOf(change.type))); 129 if (logger.isLoggable(Level.FINER) 130 && pendingChanges.isEmpty()) { 131 logger.finer("No pending pod changes left."); 132 } 133 }); 134 } 135 136 private record PendingChange(Instant from, Response<V1Pod> change) { 137 } 138 139}