java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1566481621886_4397...

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1566481621886_4397...

根据Heartbeat of TaskManager with id和The heartbeat of ResourceManager with id在源码中找出这样的代码

private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> {

        private final JobMasterGateway jobMasterGateway;

        private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            jobMasterGateway.disconnectTaskManager(
                resourceID,
                new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
        }

        @Override
        public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
            for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) {
                schedulerNG.updateAccumulators(snapshot);
            }
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceId) {
            runAsync(() -> {
                log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);

                if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
                    reconnectToResourceManager(
                        new JobMasterException(
                            String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
            // nothing to do since the payload is of type Void
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

然后在这实例化

this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(selfGateway),rpcService.getScheduledExecutor(),log);

顺着去heartbeatServices瞅瞅了

/**
 * HeartbeatServices gives access to all services needed for heartbeating. This includes the
 * creation of heartbeat receivers and heartbeat senders.
 */
public class HeartbeatServices {

    /** Heartbeat interval for the created services. */
    protected final long heartbeatInterval;

    /** Heartbeat timeout for the created services. */
    protected final long heartbeatTimeout;

    public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
        Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
        Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat interval.");

        this.heartbeatInterval = heartbeatInterval;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    /**
     * Creates a heartbeat manager which does not actively send heartbeats.
     *
     * @param resourceId Resource Id which identifies the owner of the heartbeat manager
     * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
     *                          targets
     * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
     * @param log Logger to be used for the logging
     * @param <I> Type of the incoming payload
     * @param <O> Type of the outgoing payload
     * @return A new HeartbeatManager instance
     */
    public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
        ResourceID resourceId,
        HeartbeatListener<I, O> heartbeatListener,
        ScheduledExecutor scheduledExecutor,
        Logger log) {

        return new HeartbeatManagerImpl<>(
            heartbeatTimeout,
            resourceId,
            heartbeatListener,
            scheduledExecutor,
            scheduledExecutor,
            log);
    }

    /**
     * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
     *
     * @param resourceId Resource Id which identifies the owner of the heartbeat manager
     * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
     *                          targets
     * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
     * @param log Logger to be used for the logging
     * @param <I> Type of the incoming payload
     * @param <O> Type of the outgoing payload
     * @return A new HeartbeatManager instance which actively sends heartbeats
     */
    public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
        ResourceID resourceId,
        HeartbeatListener<I, O> heartbeatListener,
        ScheduledExecutor scheduledExecutor,
        Logger log) {

        return new HeartbeatManagerSenderImpl<>(
            heartbeatInterval,
            heartbeatTimeout,
            resourceId,
            heartbeatListener,
            scheduledExecutor,
            scheduledExecutor,
            log);
    }

    /**
     * Creates an HeartbeatServices instance from a {@link Configuration}.
     *
     * @param configuration Configuration to be used for the HeartbeatServices creation
     * @return An HeartbeatServices instance created from the given configuration
     */
    public static HeartbeatServices fromConfiguration(Configuration configuration) {
        long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);

        long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

        return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
    }
}

没错超时时间就在HeartbeatManagerOptions.HEARTBEAT_TIMEOUT

    /** Timeout for requesting and receiving heartbeat for both sender and receiver sides. */
    public static final ConfigOption<Long> HEARTBEAT_TIMEOUT =
            key("heartbeat.timeout")
            .defaultValue(50000L)
            .withDescription("Timeout for requesting and receiving heartbeat for both sender and receiver sides.");

引起心跳超时有可能是yarn压力比较大引起的,先暂时在conf/flink-conf.yaml将这个值调大一点,再观察。

#Timeout for requesting and receiving heartbeat for both sender and receiver sides.
heartbeat.timeout: 180000